8000 Make all synchronous transaction methods set skipScheduler for network requests by goedderz · Pull Request #14810 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Make all synchronous transaction methods set skipScheduler for network requests #14810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Set skipScheduler in countInternal
  • Loading branch information
goedderz committed Sep 21, 2021
commit 23514f86fdc5fe3dd84b1d54ff85e942d38a45f4
40 changes: 24 additions & 16 deletions arangod/Cluster/ClusterMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ T addFigures(VPackSlice const& v1, VPackSlice const& v2,
template <typename ShardDocsMap>
Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
LogicalCollection const& coll,
ShardDocsMap const& shards) {
ShardDocsMap const& shards,
transaction::MethodsApi api) {
TRI_ASSERT(state.isCoordinator());
TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION));

Expand All @@ -154,12 +155,12 @@ Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
leaders.emplace(leader);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders,
transaction::MethodsApi::Asynchronous);
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders, api);
}

// begin transaction on shard leaders
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) {
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards,
transaction::MethodsApi api) {
TRI_ASSERT(trx.state()->isCoordinator());
TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED));
ClusterTrxMethods::SortedServersSet leaders{};
Expand All @@ -169,8 +170,7 @@ Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap
leaders.emplace(srv);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders,
transaction::MethodsApi::Asynchronous);
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders, api);
}

/// @brief add the correct header for the shard
10000 Expand Down Expand Up @@ -1207,7 +1207,8 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature& feature,

futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& cname,
OperationOptions const& options) {
OperationOptions const& options,
transaction::MethodsApi api) {
std::vector<std::pair<std::string, uint64_t>> result;

// Set a few variables needed for our work:
Expand All @@ -1225,7 +1226,7 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged) {
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, api).get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res, options));
}
Expand All @@ -1234,6 +1235,7 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;

if (TRI_vocbase_t::IsSystemName(cname)) {
// system collection (e.g. _apps, _jobs, _graphs...
Expand Down Expand Up @@ -1445,7 +1447,8 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
Future<Result> f = makeFuture(Result());
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap,
transaction::MethodsApi::Asynchronous);
}

return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1588,7 +1591,8 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) {
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap,
transaction::MethodsApi::Asynchronous);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1651,7 +1655,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) {
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
f = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Asynchronous);
}

return std::move(f).thenValue([=, &trx](Result&& r) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -1718,7 +1722,8 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(

// lazily begin transactions on all leader shards
if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) {
res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Asynchronous)
.get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res, options));
}
Expand Down Expand Up @@ -1827,7 +1832,8 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,

Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap,
transaction::MethodsApi::Asynchronous);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1905,7 +1911,8 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
// We contact all shards with the complete body and ignore NOT_FOUND

if (isManaged) { // lazily begin the transaction
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Asynchronous)
.get();
if (res.fail()) {
return makeFuture(OperationResult(res, options));
}
Expand Down Expand Up @@ -2358,7 +2365,8 @@ Future<OperationResult> modifyDocumentOnCoordinator(

Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap,
transaction::MethodsApi::Asynchronous);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result&& r) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -2430,7 +2438,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(

Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) { // lazily begin the transaction
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
f = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Asynchronous);
}

return std::move(f).thenValue([=, &trx](Result&&) mutable -> Future<OperationResult> {
Expand Down
4 changes: 3 additions & 1 deletion arangod/Cluster/ClusterMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Network/types.h"
#include "Rest/CommonDefines.h"
#include "Rest/GeneralResponse.h"
#include "Transaction/MethodsApi.h"
#include "Utils/OperationResult.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/voc-types.h"
Expand Down Expand Up @@ -119,7 +120,8 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature&,

futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& collname,
OperationOptions const& options);
OperationOptions const& options,
arangodb::transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief gets the selectivity estimates from DBservers
Expand Down
10 changes: 5 additions & 5 deletions arangod/Transaction/Methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1865,21 +1865,21 @@ futures::Future<OperationResult> transaction::Methods::countAsync(
/// @brief count the number of documents in a collection
futures::Future<OperationResult> transaction::Methods::countCoordinator(
std::string const& collectionName, transaction::CountType type,
OperationOptions const& options) {
OperationOptions const& options, MethodsApi api) {
// First determine the collection ID from the name:
auto colptr = resolver()->getCollectionStructCluster(collectionName);
if (colptr == nullptr) {
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, options));
}

return countCoordinatorHelper(colptr, collectionName, type, options);
return countCoordinatorHelper(colptr, collectionName, type, options, api);
}

#endif

futures::Future<OperationResult> transaction::Methods::countCoordinatorHelper(
std::shared_ptr<LogicalCollection> const& collinfo, std::string const& collectionName,
transaction::CountType type, OperationOptions const& options) {
transaction::CountType type, OperationOptions const& options, MethodsApi api) {
TRI_ASSERT(collinfo != nullptr);
auto& cache = collinfo->countCache();

Expand All @@ -1893,7 +1893,7 @@ futures::Future<OperationResult> transaction::Methods::countCoordinatorHelper(

if (documents == CountCache::NotPopulated) {
// no cache hit, or detailed results requested
return arangodb::countOnCoordinator(*this, collectionName, options)
return arangodb::countOnCoordinator(*this, collectionName, options, api)
.thenValue([&cache, type, options](OperationResult&& res) -> OperationResult {
if (res.fail()) {
return std::move(res);
Expand Down Expand Up @@ -2718,7 +2718,7 @@ futures::Future<OperationResult> Methods::countInternal(std::string const& colle
TRI_ASSERT(_state->status() == transaction::Status::RUNNING);

if (_state->isCoordinator()) {
return countCoordinator(collectionName, type, options);
return countCoordinator(collectionName, type, options, api);
}

if (type == CountType::Detailed) {
Expand Down
6 changes: 3 additions & 3 deletions arangod/Transaction/Methods.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ class Methods {
auto truncateInternal(std::string const& collectionName, OperationOptions const& options,
MethodsApi api) -> Future<OperationResult>;
// is virtual for IgnoreNoAccessMethods
// TODO set skipScheduler: true for network requests
ENTERPRISE_VIRT auto countInternal(std::string const& collectionName, CountType type,
OperationOptions const& options, MethodsApi api)
-> futures::Future<OperationResult>;
Expand All @@ -481,11 +480,12 @@ class Methods {

futures::Future<OperationResult> countCoordinator(std::string const& collectionName,
CountType type,
OperationOptions const& options);
OperationOptions const& options,
MethodsApi api);

futures::Future<OperationResult> countCoordinatorHelper(
std::shared_ptr<LogicalCollection> const& collinfo, std::string const& collectionName,
CountType type, OperationOptions const& options);
CountType type, OperationOptions const& options, MethodsApi api);

OperationResult countLocal(std::string const& collectionName, CountType type,
OperationOptions const& options);
Expand Down
0