8000 Make all synchronous transaction methods set skipScheduler for network requests by goedderz · Pull Request #14810 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
devel
-----

* Prevent some possible deadlocks under high load regarding transactions and
document operations, and also improve performance slightly.

* Hide help text fragment about VST connection strings in client tools that
do not support VST.

Expand Down
68 changes: 41 additions & 27 deletions arangod/Cluster/ClusterMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ T addFigures(VPackSlice const& v1, VPackSlice const& v2,
template <typename ShardDocsMap>
Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
LogicalCollection const& coll,
ShardDocsMap const& shards) {
ShardDocsMap const& shards,
transaction::MethodsApi api) {
TRI_ASSERT(state.isCoordinator());
TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION));

Expand All @@ -155,11 +156,12 @@ Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
leaders.emplace(leader);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders);
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders, api);
}

// begin transaction on shard leaders
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) {
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards,
transaction::MethodsApi api) {
TRI_ASSERT(trx.state()->isCoordinator());
TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED));
ClusterTrxMethods::SortedServersSet leaders{};
Expand All @@ -169,7 +171,7 @@ Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap
leaders.emplace(srv);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders);
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders, api);
}

/// @brief add the correct header for the shard
Expand Down Expand Up @@ -1206,7 +1208,8 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature& feature,

futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& cname,
OperationOptions const& options) {
OperationOptions const& options,
transaction::MethodsApi api) {
std::vector<std::pair<std::string, uint64_t>> result;

// Set a few variables needed for our work:
Expand All @@ -1224,7 +1227,8 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged) {
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous)
.get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res, options));
}
Expand All @@ -1233,6 +1237,7 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;

if (NameValidator::isSystemName(cname)) {
// system collection (e.g. _apps, _jobs, _graphs...
Expand Down Expand Up @@ -1415,10 +1420,9 @@ Result selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string co
/// for their documents.
////////////////////////////////////////////////////////////////////////////////

Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const& trx,
LogicalCollection& coll,
VPackSlice const slice,
arangodb::OperationOptions const& options) {
futures::Future<OperationResult> createDocumentOnCoordinator(
transaction::Methods const& trx, LogicalCollection& coll, VPackSlice const slice,
OperationOptions const& options, transaction::MethodsApi api) {
const std::string collid = std::to_string(coll.id().id());

// create vars used in this function
Expand All @@ -1444,7 +1448,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
Future<Result> f = makeFuture(Result());
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))]
Expand All @@ -1459,6 +1463,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::ReturnNewString, (options.returnNew ? "true" : "false"))
.param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false"))
Expand Down Expand Up @@ -1541,9 +1546,9 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
/// @brief remove a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Methods& trx,
LogicalCollection& coll, VPackSlice const slice,
arangodb::OperationOptions const& options) {
futures::Future<OperationResult> removeDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice const slice,
OperationOptions const& options, transaction::MethodsApi api) {
// Set a few variables needed for our work:

// First determine the collection ID from the name:
Expand Down Expand Up @@ -1574,6 +1579,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false"))
.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"));
Expand All @@ -1587,7 +1593,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) {
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1650,7 +1656,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) {
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
f = ::beginTransactionOnAllLeaders(trx, *shardIds, api);
}

return std::move(f).thenValue([=, &trx](Result&& r) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -1698,7 +1704,8 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> truncateCollectionOnCoordinator(
transaction::Methods& trx, std::string const& collname, OperationOptions const& options) {
transaction::Methods& trx, std::string const& collname,
OperationOptions const& options, transaction::MethodsApi api) {
Result res;
// Set a few variables needed for our work:
ClusterInfo& ci = trx.vocbase().server().getFeature<ClusterFeature>().clusterInfo();
Expand All @@ -1717,7 +1724,8 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(

// lazily begin transactions on all leader shards
if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) {
res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous)
.get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res, options));
}
Expand All @@ -1727,6 +1735,7 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(600.0);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false"));

std::vector<Future<network::Response>> futures;
Expand Down Expand Up @@ -1767,7 +1776,8 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(

Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options) {
OperationOptions const& options,
transaction::MethodsApi api) {
// Set a few variables needed for our work:

const std::string collid = std::to_string(coll.id().id());
Expand Down < BB4C tool-tip id="tooltip-c8ea1812-5faf-4c55-9f21-ce295ca00a66" for="expand-up-link-170-diff-89b0f119bc016df828d5f7a4fc62c8efa16287dd386784fb5a386f879d1b3d85" popover="manual" data-direction="ne" data-type="label" data-view-component="true" class="sr-only position-absolute">Expand Up @@ -1807,6 +1817,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"));

fuerte::RestVerb restVerb;
Expand All @@ -1826,7 +1837,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,

Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
Expand Down Expand Up @@ -1904,7 +1915,8 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
// We contact all shards with the complete body and ignore NOT_FOUND

if (isManaged) { // lazily begin the transaction
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous)
.get();
if (res.fail()) {
return makeFuture(OperationResult(res, options));
}
Expand Down Expand Up @@ -2263,9 +2275,10 @@ void fetchVerticesFromEngines(
/// @brief modify a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice const& slice,
arangodb::OperationOptions const& options, bool const isPatch) {
futures::Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll,
arangodb::velocypack::Slice const& slice, OperationOptions const& options,
bool isPatch, transaction::MethodsApi api) {
// Set a few variables needed for our work:

// First determine the collection ID from the name:
Expand Down Expand Up @@ -2327,6 +2340,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"))
.param(StaticStrings::SkipDocumentValidation, (options.validate ? "false" : "true"))
Expand Down Expand Up @@ -2357,7 +2371,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(

Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api);
}

return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result&& r) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -2429,7 +2443,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(

Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) { // lazily begin the transaction
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
f = ::beginTransactionOnAllLeaders(trx, *shardIds, api);
}

return std::move(f).thenValue([=, &trx](Result&&) mutable -> Future<OperationResult> {
Expand Down Expand Up @@ -4357,7 +4371,7 @@ arangodb::Result getEngineStatsFromDBServers(ClusterFeature& feature,
auto* pool = feature.server().getFeature<NetworkFeature>().pool();

network::RequestOptions reqOpts;
reqOpts.skipScheduler = false;
reqOpts.skipScheduler = true;
std::vector<Future<network::Response>> futures;
futures.reserve(DBservers.size());

Expand Down
24 changes: 14 additions & 10 deletions arangod/Cluster/ClusterMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Network/types.h"
#include "Rest/CommonDefines.h"
#include "Rest/GeneralResponse.h"
#include "Transaction/MethodsApi.h"
#include "Utils/OperationResult.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/voc-types.h"
Expand Down Expand Up @@ -119,7 +120,8 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature&,

futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& collname,
OperationOptions const& options);
OperationOptions const& options,
arangodb::transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief gets the selectivity estimates from DBservers
Expand All @@ -135,25 +137,25 @@ Result selectivityEstimatesOnCoordinator(ClusterFeature&, std::string const& dbn
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> createDocumentOnCoordinator(
transaction::Methods const& trx, LogicalCollection&, VPackSlice const slice,
OperationOptions const& options);
transaction::Methods const& trx, LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief remove a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> removeDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection&,
VPackSlice const slice,
OperationOptions const& options);
futures::Future<OperationResult> removeDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief get a document in a coordinator
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection&, VPackSlice slice,
OperationOptions const& options);
OperationOptions const& options,
transaction::MethodsApi api);

/// @brief fetch edges from TraverserEngines
/// Contacts all TraverserEngines placed
Expand Down Expand Up @@ -210,14 +212,16 @@ void fetchVerticesFromEngines(

futures::Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll,
arangodb::velocypack::Slice const& slice, OperationOptions const& options, bool isPatch);
arangodb::velocypack::Slice const& slice, OperationOptions const& options,
bool isPatch, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief truncate a cluster collection on a coordinator
////////////////////////////////////////////////////////////////////////////////

futures::Future<OperationResult> truncateCollectionOnCoordinator(
transaction::Methods& trx, std::string const& collname, OperationOptions const& options);
5675 transaction::Methods& trx, std::string const& collname,
OperationOptions const& options, transaction::MethodsApi api);

////////////////////////////////////////////////////////////////////////////////
/// @brief flush Wal on all DBservers
Expand Down
Loading
0