8000 Make all synchronous transaction methods set skipScheduler for network requests by goedderz · Pull Request #14810 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Make all synchronous transaction methods set skipScheduler for network requests #14810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
devel
-----

* Prevent some possible deadlocks under high load regarding transactions and
document operations, and also improve performance slightly.

* Hide help text fragment about VST connection strings in client tools that
do not support VST.

Expand Down
68 changes: 41 additions & 27 deletions arangod/Cluster/ClusterMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ T addFigures(VPackSlice const& v1, VPackSlice const& v2,
template <typename ShardDocsMap>
Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
LogicalCollection const& coll,
ShardDocsMap const& shards) {
ShardDocsMap const& shards,
transaction::MethodsApi api) {
TRI_ASSERT(state.isCoordinator());
TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION));

Expand All @@ -155,11 +156,12 @@ Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
leaders.emplace(leader);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders);
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders, api);
}

// begin transaction on shard leaders
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) {
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards,
transaction::MethodsApi api) {
TRI_ASSERT(trx.state()->isCoordinator());
TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED));
ClusterTrxMethods::SortedServersSet leaders{};
Expand All @@ -169,7 +171,7 @@ Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap
leaders.emplace(srv);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders);
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders, api);
}

/// @brief add the correct header for the shard
Expand Down Expand Up @@ -1206,7 +1208,8 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature& feature,

futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& cname,
OperationOptions const& options) {
OperationOptions const& options,
transaction::MethodsApi api) {
std::vector<std::pair<std::string, uint64_t>> result;

// Set a few variables needed for our work:
Expand All @@ -1224,7 +1227,8 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged) {
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous)
.get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res, options));
}
Expand All @@ -1233,6 +1237,7 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;

if (NameValidator::isSystemName(cname)) {
// system collection (e.g. _apps, _jobs, _graphs...
Expand Down Expand Up @@ -1415,10 +1420,9 @@ Result selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string co
/// for their documents.
////////////////////////////////////////////////////////////////////////////////

Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const& trx,
LogicalCollection& coll,
VPackSlice const slice,
arangodb::OperationOptions const& options) {
futures::Future<OperationResult> createDocumentOnCoordinator(
transaction::Methods const& trx, LogicalCollection& coll, VPackSlice const slice,
OperationOptions const& options, transaction::MethodsApi api) {
const std::string collid = std::to_string(coll.id().id());

// create vars used in this function
Expand All @@ -1444,7 +1448,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
Future<Result> f = makeFuture(Result());
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))]
Expand All @@ -1459,6 +1463,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::ReturnNewString, (options.returnNew ? "true" : "false"))
.param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false"))
Expand Down Expand Up @@ -1541,9 +1546,9 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
/// @brief remove a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Methods& trx,
LogicalCollection& coll, VPackSlice const slice,
arangodb::OperationOptions const& options) {
futures::Future<OperationResult> removeDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice const slice,
OperationOptions const& options, transaction::MethodsApi api) {
// Set a few variables needed for our work:

// First determine the collection ID from the name:
Expand Down Expand Up @@ -1574,6 +1579,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false"))
.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"));
Expand All @@ -1587,7 +1593,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) {
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1650,7 +1656,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) {
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
f = ::beginTransactionOnAllLeaders(trx, *shardIds, api);
}

return std::move(f).thenValue([=, &trx](Result&& r) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -1698,7 +1704,8 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> truncateCollectionOnCoordinator(
transaction::Methods& trx, std::string const& collname, OperationOptions const& options) {
transaction::Methods& trx, std::string const& collname,
OperationOptions const& options, transaction::MethodsApi api) {
Result res;
// Set a few variables needed for our work:
ClusterInfo& ci = trx.vocbase().server().getFeature<ClusterFeature>().clusterInfo();
Expand All @@ -1717,7 +1724,8 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(

// lazily begin transactions on all leader shards
if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) {
res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous)
.get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res, options));
}
Expand All @@ -1727,6 +1735,7 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(600.0);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false"));

std::vector<Future<network::Response>> futures;
Expand Down Expand Up @@ -1767,7 +1776,8 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(

Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options) {
OperationOptions const& options,
transaction::MethodsApi api) {
// Set a few variables needed for our work:

const std::string collid = std::to_string(coll.id().id());
Expand Down Expand Up @@ -1807,6 +1817,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"));

fuerte::RestVerb restVerb;
Expand All @@ -1826,7 +1837,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,

Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1904,7 +1915,8 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
// We contact all shards with the complete body and ignore NOT_FOUND

if (isManaged) { // lazily begin the transaction
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous)
.get();
if (res.fail()) {
return makeFuture(OperationResult(res, options));
}
Expand Down Expand Up @@ -2263,9 +2275,10 @@ void fetchVerticesFromEngines(
/// @brief modify a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice const& slice,
arangodb::OperationOptions const& options, bool const isPatch) {
futures::Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll,
arangodb::velocypack::Slice const& slice, OperationOptions const& options,
bool isPatch, transaction::MethodsApi api) {
// Set a few variables needed for our work:

// First determine the collection ID from the name:
Expand Down Expand Up @@ -2327,6 +2340,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"))
.param(StaticStrings::SkipDocumentValidation, (options.validate ? "false" : "true"))
Expand Down Expand Up @@ -2357,7 +2371,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(

Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result&& r) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -2429,7 +2443,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(

Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) { // lazily begin the transaction
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
f = ::beginTransactionOnAllLeaders(trx, *shardIds, api);
}

return std::move(f).thenValue([=, &trx](Result&&) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -4357,7 +4371,7 @@ arangodb::Result getEngineStatsFromDBServers(ClusterFeature& feature,
auto* pool = feature.server().getFeature<NetworkFeature>().pool();

network::RequestOptions reqOpts;
reqOpts.skipScheduler = false;
reqOpts.skipScheduler = true;
std::vector<Future<network::Response>> futures;
futures.reserve(DBservers.size());

Expand Down
24 changes: 14 additions & 10 deletions arangod/Cluster/ClusterMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Network/types.h"
#include "Rest/CommonDefines.h"
#include "Rest/GeneralResponse.h"
#include "Transaction/MethodsApi.h"
#include "Utils/OperationResult.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/voc-types.h"
Expand Down Expand Up @@ -119,7 +120,8 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature&,

futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& collname,
OperationOptions const& options);
OperationOptions const& options,
arangodb::transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief gets the selectivity estimates from DBservers
Expand All @@ -135,25 +137,25 @@ Result selectivityEstimatesOnCoordinator(ClusterFeature&, std::string const& dbn
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> createDocumentOnCoordinator(
transaction::Methods const& trx, LogicalCollection&, VPackSlice const slice,
OperationOptions const& options);
transaction::Methods const& trx, LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief remove a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> removeDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection&,
VPackSlice const slice,
OperationOptions const& options);
futures::Future<OperationResult> removeDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief get a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection&, VPackSlice slice,
OperationOptions const& options);
OperationOptions const& options,
transaction::MethodsApi api);

/// @brief fetch edges from TraverserEngines
/// Contacts all TraverserEngines placed
Expand Down Expand Up @@ -210,14 +212,16 @@ void fetchVerticesFromEngines(

futures::Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll,
arangodb::velocypack::Slice const& slice, OperationOptions const& options, bool isPatch);
arangodb::velocypack::Slice const& slice, OperationOptions const& options,
bool isPatch, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief truncate a cluster collection on a coordinator
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> truncateCollectionOnCoordinator(
transaction::Methods& trx, std::string const& collname, OperationOptions const& options);
transaction::Methods& trx, std::string const& collname,
OperationOptions const& options, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief flush Wal on all DBservers
Expand Down
Loading
0