diff --git a/CHANGELOG b/CHANGELOG index 2d4ebe2c4d76..5186d72b033b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ devel ----- +* Prevent some possible deadlocks under high load regarding transactions and + document operations, and also improve performance slightly. + * Hide help text fragment about VST connection strings in client tools that do not support VST. diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 10b177f67af6..8e05a55ecd4e 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -137,7 +137,8 @@ T addFigures(VPackSlice const& v1, VPackSlice const& v2, template Future beginTransactionOnSomeLeaders(TransactionState& state, LogicalCollection const& coll, - ShardDocsMap const& shards) { + ShardDocsMap const& shards, + transaction::MethodsApi api) { TRI_ASSERT(state.isCoordinator()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); @@ -155,11 +156,12 @@ Future beginTransactionOnSomeLeaders(TransactionState& state, leaders.emplace(leader); } } - return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders); + return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders, api); } // begin transaction on shard leaders -Future beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) { +Future beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards, + transaction::MethodsApi api) { TRI_ASSERT(trx.state()->isCoordinator()); TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)); ClusterTrxMethods::SortedServersSet leaders{}; @@ -169,7 +171,7 @@ Future beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap leaders.emplace(srv); } } - return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders); + return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders, api); } /// @brief add the correct header for the shard @@ -1206,7 +1208,8 @@ futures::Future figuresOnCoordinator(ClusterFeature& feature, futures::Future countOnCoordinator(transaction::Methods& trx, std::string const& cname, - OperationOptions const& options) { + OperationOptions const& options, + transaction::MethodsApi api) { std::vector> result; // Set a few variables needed for our work: @@ -1224,7 +1227,8 @@ futures::Future countOnCoordinator(transaction::Methods& trx, std::shared_ptr shardIds = collinfo->shardIds(); const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged) { - Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); + Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous) + .get(); if (res.fail()) { return futures::makeFuture(OperationResult(res, options)); } @@ -1233,6 +1237,7 @@ futures::Future countOnCoordinator(transaction::Methods& trx, network::RequestOptions reqOpts; reqOpts.database = dbname; reqOpts.retryNotFound = true; + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; if (NameValidator::isSystemName(cname)) { // system collection (e.g. _apps, _jobs, _graphs... @@ -1415,10 +1420,9 @@ Result selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string co /// for their documents. //////////////////////////////////////////////////////////////////////////////// -Future createDocumentOnCoordinator(transaction::Methods const& trx, - LogicalCollection& coll, - VPackSlice const slice, - arangodb::OperationOptions const& options) { +futures::Future createDocumentOnCoordinator( + transaction::Methods const& trx, LogicalCollection& coll, VPackSlice const slice, + OperationOptions const& options, transaction::MethodsApi api) { const std::string collid = std::to_string(coll.id().id()); // create vars used in this function @@ -1444,7 +1448,7 @@ Future createDocumentOnCoordinator(transaction::Methods const& Future f = makeFuture(Result()); const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders - f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); + f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api); } return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))] @@ -1459,6 +1463,7 @@ Future createDocumentOnCoordinator(transaction::Methods const& reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT); reqOpts.retryNotFound = true; + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false")) .param(StaticStrings::ReturnNewString, (options.returnNew ? "true" : "false")) .param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false")) @@ -1541,9 +1546,9 @@ Future createDocumentOnCoordinator(transaction::Methods const& /// @brief remove a document in a coordinator //////////////////////////////////////////////////////////////////////////////// -Future removeDocumentOnCoordinator(arangodb::transaction::Methods& trx, - LogicalCollection& coll, VPackSlice const slice, - arangodb::OperationOptions const& options) { +futures::Future removeDocumentOnCoordinator( + transaction::Methods& trx, LogicalCollection& coll, VPackSlice const slice, + OperationOptions const& options, transaction::MethodsApi api) { // Set a few variables needed for our work: // First determine the collection ID from the name: @@ -1574,6 +1579,7 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT); reqOpts.retryNotFound = true; + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false")) .param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false")) .param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false")); @@ -1587,7 +1593,7 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho // lazily begin transactions on leaders Future f = makeFuture(Result()); if (isManaged && opCtx.shardMap.size() > 1) { - f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); + f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api); } return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))] @@ -1650,7 +1656,7 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho // lazily begin transactions on leaders Future f = makeFuture(Result()); if (isManaged && shardIds->size() > 1) { - f = ::beginTransactionOnAllLeaders(trx, *shardIds); + f = ::beginTransactionOnAllLeaders(trx, *shardIds, api); } return std::move(f).thenValue([=, &trx](Result&& r) mutable -> Future { @@ -1698,7 +1704,8 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho //////////////////////////////////////////////////////////////////////////////// futures::Future truncateCollectionOnCoordinator( - transaction::Methods& trx, std::string const& collname, OperationOptions const& options) { + transaction::Methods& trx, std::string const& collname, + OperationOptions const& options, transaction::MethodsApi api) { Result res; // Set a few variables needed for our work: ClusterInfo& ci = trx.vocbase().server().getFeature().clusterInfo(); @@ -1717,7 +1724,8 @@ futures::Future truncateCollectionOnCoordinator( // lazily begin transactions on all leader shards if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) { - res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); + res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous) + .get(); if (res.fail()) { return futures::makeFuture(OperationResult(res, options)); } @@ -1727,6 +1735,7 @@ futures::Future truncateCollectionOnCoordinator( reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(600.0); reqOpts.retryNotFound = true; + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false")); std::vector> futures; @@ -1767,7 +1776,8 @@ futures::Future truncateCollectionOnCoordinator( Future getDocumentOnCoordinator(transaction::Methods& trx, LogicalCollection& coll, VPackSlice slice, - OperationOptions const& options) { + OperationOptions const& options, + transaction::MethodsApi api) { // Set a few variables needed for our work: const std::string collid = std::to_string(coll.id().id()); @@ -1807,6 +1817,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.retryNotFound = true; + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; reqOpts.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false")); fuerte::RestVerb restVerb; @@ -1826,7 +1837,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, Future f = makeFuture(Result()); if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction - f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); + f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api); } return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))] @@ -1904,7 +1915,8 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, // We contact all shards with the complete body and ignore NOT_FOUND if (isManaged) { // lazily begin the transaction - Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); + Result res = ::beginTransactionOnAllLeaders(trx, *shardIds, transaction::MethodsApi::Synchronous) + .get(); if (res.fail()) { return makeFuture(OperationResult(res, options)); } @@ -2263,9 +2275,10 @@ void fetchVerticesFromEngines( /// @brief modify a document in a coordinator //////////////////////////////////////////////////////////////////////////////// -Future modifyDocumentOnCoordinator( - transaction::Methods& trx, LogicalCollection& coll, VPackSlice const& slice, - arangodb::OperationOptions const& options, bool const isPatch) { +futures::Future modifyDocumentOnCoordinator( + transaction::Methods& trx, LogicalCollection& coll, + arangodb::velocypack::Slice const& slice, OperationOptions const& options, + bool isPatch, transaction::MethodsApi api) { // Set a few variables needed for our work: // First determine the collection ID from the name: @@ -2327,6 +2340,7 @@ Future modifyDocumentOnCoordinator( reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT); reqOpts.retryNotFound = true; + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false")) .param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false")) .param(StaticStrings::SkipDocumentValidation, (options.validate ? "false" : "true")) @@ -2357,7 +2371,7 @@ Future modifyDocumentOnCoordinator( Future f = makeFuture(Result()); if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders - f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); + f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap, api); } return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result&& r) mutable -> Future { @@ -2429,7 +2443,7 @@ Future modifyDocumentOnCoordinator( Future f = makeFuture(Result()); if (isManaged && shardIds->size() > 1) { // lazily begin the transaction - f = ::beginTransactionOnAllLeaders(trx, *shardIds); + f = ::beginTransactionOnAllLeaders(trx, *shardIds, api); } return std::move(f).thenValue([=, &trx](Result&&) mutable -> Future { @@ -4357,7 +4371,7 @@ arangodb::Result getEngineStatsFromDBServers(ClusterFeature& feature, auto* pool = feature.server().getFeature().pool(); network::RequestOptions reqOpts; - reqOpts.skipScheduler = false; + reqOpts.skipScheduler = true; std::vector> futures; futures.reserve(DBservers.size()); diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 7157f1239a4f..cce4516f8e8c 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -33,6 +33,7 @@ #include "Network/types.h" #include "Rest/CommonDefines.h" #include "Rest/GeneralResponse.h" +#include "Transaction/MethodsApi.h" #include "Utils/OperationResult.h" #include "VocBase/LogicalCollection.h" #include "VocBase/voc-types.h" @@ -119,7 +120,8 @@ futures::Future figuresOnCoordinator(ClusterFeature&, futures::Future countOnCoordinator(transaction::Methods& trx, std::string const& collname, - OperationOptions const& options); + OperationOptions const& options, + arangodb::transaction::MethodsApi api); //////////////////////////////////////////////////////////////////////////////// /// @brief gets the selectivity estimates from DBservers @@ -135,17 +137,16 @@ Result selectivityEstimatesOnCoordinator(ClusterFeature&, std::string const& dbn //////////////////////////////////////////////////////////////////////////////// futures::Future createDocumentOnCoordinator( - transaction::Methods const& trx, LogicalCollection&, VPackSlice const slice, - OperationOptions const& options); + transaction::Methods const& trx, LogicalCollection& coll, VPackSlice slice, + OperationOptions const& options, transaction::MethodsApi api); //////////////////////////////////////////////////////////////////////////////// /// @brief remove a document in a coordinator //////////////////////////////////////////////////////////////////////////////// -futures::Future removeDocumentOnCoordinator(transaction::Methods& trx, - LogicalCollection&, - VPackSlice const slice, - OperationOptions const& options); +futures::Future removeDocumentOnCoordinator( + transaction::Methods& trx, LogicalCollection& coll, VPackSlice slice, + OperationOptions const& options, transaction::MethodsApi api); //////////////////////////////////////////////////////////////////////////////// /// @brief get a document in a coordinator @@ -153,7 +154,8 @@ futures::Future removeDocumentOnCoordinator(transaction::Method futures::Future getDocumentOnCoordinator(transaction::Methods& trx, LogicalCollection&, VPackSlice slice, - OperationOptions const& options); + OperationOptions const& options, + transaction::MethodsApi api); /// @brief fetch edges from TraverserEngines /// Contacts all TraverserEngines placed @@ -210,14 +212,16 @@ void fetchVerticesFromEngines( futures::Future modifyDocumentOnCoordinator( transaction::Methods& trx, LogicalCollection& coll, - arangodb::velocypack::Slice const& slice, OperationOptions const& options, bool isPatch); + arangodb::velocypack::Slice const& slice, OperationOptions const& options, + bool isPatch, transaction::MethodsApi api); //////////////////////////////////////////////////////////////////////////////// /// @brief truncate a cluster collection on a coordinator //////////////////////////////////////////////////////////////////////////////// futures::Future truncateCollectionOnCoordinator( - transaction::Methods& trx, std::string const& collname, OperationOptions const& options); + transaction::Methods& trx, std::string const& collname, + OperationOptions const& options, transaction::MethodsApi api); //////////////////////////////////////////////////////////////////////////////// /// @brief flush Wal on all DBservers diff --git a/arangod/Cluster/ClusterTrxMethods.cpp b/arangod/Cluster/ClusterTrxMethods.cpp index 82e846899e2f..4743e43bb88f 100644 --- a/arangod/Cluster/ClusterTrxMethods.cpp +++ b/arangod/Cluster/ClusterTrxMethods.cpp @@ -38,6 +38,7 @@ #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" +#include "Transaction/MethodsApi.h" #include "VocBase/LogicalCollection.h" #include @@ -132,7 +133,8 @@ void buildTransactionBody(TransactionState& state, ServerID const& server, /// @brief lazily begin a transaction on subordinate servers Future beginTransactionRequest(TransactionState& state, - ServerID const& server) { + ServerID const& server, + transaction::MethodsApi api) { TransactionId tid = state.id().child(); TRI_ASSERT(!tid.isLegacyTransactionId()); @@ -150,6 +152,7 @@ Future beginTransactionRequest(TransactionState& state, // responses that are close to the timeout value have a chance of getting // back to us (note: the 5 is arbitrary here, could as well be 3.0 or 10.0) reqOpts.timeout = network::Timeout(lockTimeout + 5.0); + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; auto* pool = state.vocbase().server().getFeature().pool(); network::Headers headers; @@ -210,7 +213,8 @@ Result checkTransactionResult(TransactionId desiredTid, transaction::Status desS } Future commitAbortTransaction(arangodb::TransactionState* state, - transaction::Status status) { + transaction::Status status, + transaction::MethodsApi api) { TRI_ASSERT(state->isRunning()); if (state->knownServers().empty()) { @@ -226,6 +230,7 @@ Future commitAbortTransaction(arangodb::TransactionState* state, network::RequestOptions reqOpts; reqOpts.database = state->vocbase().name(); + reqOpts.skipScheduler = api == transaction::MethodsApi::Synchronous; TransactionId tidPlus = state->id().child(); std::string const path = "/_api/transaction/" + std::to_string(tidPlus.id()); @@ -331,10 +336,11 @@ Future commitAbortTransaction(arangodb::TransactionState* state, }); } -Future commitAbortTransaction(transaction::Methods& trx, transaction::Status status) { +Future commitAbortTransaction(transaction::Methods& trx, transaction::Status status, + transaction::MethodsApi api) { arangodb::TransactionState* state = trx.state(); TRI_ASSERT(trx.isMainTransaction()); - return commitAbortTransaction(state, status); + return commitAbortTransaction(state, status, api); } } // namespace @@ -348,8 +354,9 @@ bool IsServerIdLessThan::operator()(ServerID const& lhs, ServerID const& rhs) co } /// @brief begin a transaction on all leaders -Future beginTransactionOnLeaders(TransactionState& state, - ClusterTrxMethods::SortedServersSet const& leaders) { +Future beginTransactionOnLeaders(TransactionState& state, ClusterTrxMethods::SortedServersSet const& leaders, + // everything in this function is done synchronously, so the `api` parameter is currently unused. + [[maybe_unused]] transaction::MethodsApi api) { TRI_ASSERT(state.isCoordinator()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); Result res; @@ -377,9 +384,10 @@ Future beginTransactionOnLeaders(TransactionState& state, if (state.knowsServer(leader)) { continue; // already sent a begin transaction there } - requests.emplace_back(::beginTransactionRequest(state, leader)); + requests.emplace_back( + ::beginTransactionRequest(state, leader, transaction::MethodsApi::Synchronous)); } - + // use original lock timeout here state.options().lockTimeout = oldLockTimeout; @@ -429,8 +437,9 @@ Future beginTransactionOnLeaders(TransactionState& state, // abortTransaction on knownServers() and wait for them if (!state.knownServers().empty()) { - Result resetRes = - commitAbortTransaction(&state, transaction::Status::ABORTED).get(); + Result resetRes = commitAbortTransaction(&state, transaction::Status::ABORTED, + transaction::MethodsApi::Synchronous) + .get(); if (resetRes.fail()) { // return here if cleanup failed - this needs to be a success return resetRes; @@ -454,7 +463,7 @@ Future beginTransactionOnLeaders(TransactionState& state, serverBefore = leader; #endif - auto resp = ::beginTransactionRequest(state, leader); + auto resp = ::beginTransactionRequest(state, leader, transaction::MethodsApi::Synchronous); auto const& resolvedResponse = resp.get(); if (resolvedResponse.fail()) { return resolvedResponse.combinedResult(); @@ -468,13 +477,13 @@ Future beginTransactionOnLeaders(TransactionState& state, } /// @brief commit a transaction on a subordinate -Future commitTransaction(transaction::Methods& trx) { - return commitAbortTransaction(trx, transaction::Status::COMMITTED); +Future commitTransaction(transaction::Methods& trx, transaction::MethodsApi api) { + return commitAbortTransaction(trx, transaction::Status::COMMITTED, api); } /// @brief commit a transaction on a subordinate -Future abortTransaction(transaction::Methods& trx) { - return commitAbortTransaction(trx, transaction::Status::ABORTED); +Future abortTransaction(transaction::Methods& trx, transaction::MethodsApi api) { + return commitAbortTransaction(trx, transaction::Status::ABORTED, api); } /// @brief set the transaction ID header diff --git a/arangod/Cluster/ClusterTrxMethods.h b/arangod/Cluster/ClusterTrxMethods.h index feeb8c7d4c86..d765647412d2 100644 --- a/arangod/Cluster/ClusterTrxMethods.h +++ b/arangod/Cluster/ClusterTrxMethods.h @@ -25,6 +25,7 @@ #include "Basics/Common.h" #include "Futures/Future.h" +#include "Transaction/MethodsApi.h" #include "VocBase/LogicalCollection.h" #include "VocBase/voc-types.h" @@ -46,14 +47,17 @@ struct IsServerIdLessThan { using SortedServersSet = std::set; /// @brief begin a transaction on all followers -Future beginTransactionOnLeaders(TransactionState&, - SortedServersSet const& leaders); +Future beginTransactionOnLeaders(TransactionState&, + ClusterTrxMethods::SortedServersSet const& leaders, + transaction::MethodsApi api); /// @brief commit a transaction on a subordinate -Future commitTransaction(transaction::Methods& trx); +Future commitTransaction(transaction::Methods& trx, + transaction::MethodsApi api); /// @brief commit a transaction on a subordinate -Future abortTransaction(transaction::Methods& trx); +Future abortTransaction(transaction::Methods& trx, + transaction::MethodsApi api); /// @brief add the transaction ID header for servers template diff --git a/arangod/ClusterEngine/ClusterTransactionState.cpp b/arangod/ClusterEngine/ClusterTransactionState.cpp index 44cd73685442..504abf8786be 100644 --- a/arangod/ClusterEngine/ClusterTransactionState.cpp +++ b/arangod/ClusterEngine/ClusterTransactionState.cpp @@ -102,7 +102,9 @@ Result ClusterTransactionState::beginTransaction(transaction::Hints hints) { // if there is only one server we may defer the lazy locking // until the first actual operation (should save one request) if (leaders.size() > 1) { - res = ClusterTrxMethods::beginTransactionOnLeaders(*this, leaders).get(); + res = ClusterTrxMethods::beginTransactionOnLeaders(*this, leaders, + transaction::MethodsApi::Synchronous) + .get(); if (res.fail()) { // something is wrong return res; } diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 31172125d5c9..42ca99b7c64e 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -508,94 +508,31 @@ Result transaction::Methods::begin() { return Result(); } +Result Methods::commit() { + return commitInternal(MethodsApi::Synchronous).get(); +} + /// @brief commit / finish the transaction Future transaction::Methods::commitAsync() { - TRI_IF_FAILURE("TransactionCommitFail") { return Result(TRI_ERROR_DEBUG); } - - if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { - // transaction not created or not running - return Result(TRI_ERROR_TRANSACTION_INTERNAL, - "transaction not running on commit"); - } - - if (!_state->isReadOnlyTransaction()) { - auto const& exec = ExecContext::current(); - bool cancelRW = ServerState::readOnly() && !exec.isSuperuser(); - if (exec.isCanceled() || cancelRW) { - return Result(TRI_ERROR_ARANGO_READ_ONLY, "server is in read-only mode"); - } - } - - if (!_mainTransaction) { - return futures::makeFuture(Result()); - } - - auto f = futures::makeFuture(Result()); - if (_state->isRunningInCluster()) { - // first commit transaction on subordinate servers - f = ClusterTrxMethods::commitTransaction(*this); - } - - return std::move(f).thenValue([this](Result res) -> Result { - if (res.fail()) { // do not commit locally - LOG_TOPIC("5743a", WARN, Logger::TRANSACTIONS) - << "failed to commit on subordinates: '" << res.errorMessage() << "'"; - return res; - } - - res = _state->commitTransaction(this); - if (res.ok()) { - applyStatusChangeCallbacks(*this, Status::COMMITTED); - } + return commitInternal(MethodsApi::Asynchronous); +} - return res; - }); +Result Methods::abort() { + return abortInternal(MethodsApi::Synchronous).get(); } /// @brief abort the transaction Future transaction::Methods::abortAsync() { - if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { - // transaction not created or not running - return Result(TRI_ERROR_TRANSACTION_INTERNAL, - "transaction not running on abort"); - } - - if (!_mainTransaction) { - return futures::makeFuture(Result()); - } - - auto f = futures::makeFuture(Result()); - if (_state->isRunningInCluster()) { - // first commit transaction on subordinate servers - f = ClusterTrxMethods::abortTransaction(*this); - } - - return std::move(f).thenValue([this](Result res) -> Result { - if (res.fail()) { // do not commit locally - LOG_TOPIC("d89a8", WARN, Logger::TRANSACTIONS) - << "failed to abort on subordinates: " << res.errorMessage(); - } // abort locally anyway - - res = _state->abortTransaction(this); - if (res.ok()) { - applyStatusChangeCallbacks(*this, Status::ABORTED); - } + return abortInternal(MethodsApi::Asynchronous); +} - return res; - }); +Result Methods::finish(Result const& res) { + return finishInternal(res, MethodsApi::Synchronous).get(); } /// @brief finish a transaction (commit or abort), based on the previous state Future transaction::Methods::finishAsync(Result const& res) { - if (res.ok()) { - // there was no previous error, so we'll commit - return this->commitAsync(); - } - - // there was a previous error, so we'll abort - return this->abortAsync().thenValue([res](Result ignore) { - return res; // return original error - }); + return finishInternal(res, MethodsApi::Asynchronous); } /// @brief return the transaction id @@ -758,7 +695,9 @@ Result transaction::Methods::documentFastPath(std::string const& collectionName, } if (_state->isCoordinator()) { - OperationResult opRes = documentCoordinator(collectionName, value, options).get(); + OperationResult opRes = + documentCoordinator(collectionName, value, options, MethodsApi::Synchronous) + .get(); if (!opRes.fail()) { result.add(opRes.slice()); } @@ -833,33 +772,23 @@ Future addTracking(Future&& f, F&& func) { } } +OperationResult Methods::document(std::string const& collectionName, + VPackSlice value, + OperationOptions const& options) { + return documentInternal(collectionName, value, options, MethodsApi::Synchronous).get(); +} + /// @brief return one or multiple documents from a collection Future transaction::Methods::documentAsync(std::string const& cname, VPackSlice value, - OperationOptions& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - if (!value.isObject() && !value.isArray()) { - // must provide a document object or an array of documents - events::ReadDocument(vocbase().name(), cname, value, options, - TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - } - - if (_state->isCoordinator()) { - return addTracking(documentCoordinator(cname, value, options), - [=](OperationResult&& opRes) { - events::ReadDocument(vocbase().name(), cname, value, opRes.options, opRes.errorNumber()); - return std::move(opRes); - }); - } - return documentLocal(cname, value, options); + OperationOptions const& options) { + return documentInternal(cname, value, options, MethodsApi::Asynchronous); } /// @brief read one or multiple documents in a collection, coordinator #ifndef USE_ENTERPRISE Future transaction::Methods::documentCoordinator( - std::string const& collectionName, VPackSlice value, OperationOptions const& options) { + std::string const& collectionName, VPackSlice value, OperationOptions const& options, MethodsApi api) { if (!value.isArray()) { arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(value)); if (key.empty()) { @@ -872,7 +801,7 @@ Future transaction::Methods::documentCoordinator( return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, options)); } - return arangodb::getDocumentOnCoordinator(*this, *colptr, value, options); + return arangodb::getDocumentOnCoordinator(*this, *colptr, value, options, api); } #endif @@ -956,39 +885,19 @@ Future transaction::Methods::documentLocal(std::string const& c options, countErrorCodes)); } + +OperationResult Methods::insert(std::string const& cname, VPackSlice value, + OperationOptions const& options) { + return insertInternal(cname, value, options, MethodsApi::Synchronous).get(); +} + /// @brief create one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::insertAsync(std::string const& cname, VPackSlice value, OperationOptions const& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - if (!value.isObject() && !value.isArray()) { - // must provide a document object or an array of documents - events::CreateDocument(vocbase().name(), cname, value, options, - TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - } - if (value.isArray() && value.length() == 0) { - events::CreateDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); - return emptyResult(options); - } - - auto f = Future::makeEmpty(); - if (_state->isCoordinator()) { - f = insertCoordinator(cname, value, options); - } else { - OperationOptions optionsCopy = options; - f = insertLocal(cname, value, optionsCopy); - } - - return addTracking(std::move(f), [=](OperationResult&& opRes) { - events::CreateDocument(vocbase().name(), cname, - (opRes.ok() && opRes.options.returnNew) ? opRes.slice() : value, - opRes.options, opRes.errorNumber()); - return std::move(opRes); - }); + return insertInternal(cname, value, options, MethodsApi::Asynchronous); } /// @brief create one or multiple documents in a collection, coordinator @@ -997,12 +906,13 @@ Future transaction::Methods::insertAsync(std::string const& cna #ifndef USE_ENTERPRISE Future transaction::Methods::insertCoordinator(std::string const& collectionName, VPackSlice value, - OperationOptions const& options) { + OperationOptions const& options, + MethodsApi api) { auto colptr = resolver()->getCollectionStructCluster(collectionName); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, options)); } - return arangodb::createDocumentOnCoordinator(*this, *colptr, value, options); + return arangodb::createDocumentOnCoordinator(*this, *colptr, value, options, api); } #endif @@ -1281,38 +1191,18 @@ Future transaction::Methods::insertLocal(std::string const& cna options, std::move(errorCounter))); } +OperationResult Methods::update(std::string const& cname, VPackSlice updateValue, + OperationOptions const& options) { + return updateInternal(cname, updateValue, options, MethodsApi::Synchronous).get(); +} + /// @brief update/patch one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::updateAsync(std::string const& cname, VPackSlice newValue, OperationOptions const& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - if (!newValue.isObject() && !newValue.isArray()) { - // must provide a document object or an array of documents - events::ModifyDocument(vocbase().name(), cname, newValue, options, - TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - } - if (newValue.isArray() && newValue.length() == 0) { - events::ModifyDocument(vocbase().name(), cname, newValue, options, - TRI_ERROR_NO_ERROR); - return emptyResult(options); - } - - auto f = Future::makeEmpty(); - if (_state->isCoordinator()) { - f = modifyCoordinator(cname, newValue, options, TRI_VOC_DOCUMENT_OPERATION_UPDATE); - } else { - OperationOptions optionsCopy = options; - f = modifyLocal(cname, newValue, optionsCopy, TRI_VOC_DOCUMENT_OPERATION_UPDATE); - } - return addTracking(std::move(f), [=](OperationResult&& opRes) { - events::ModifyDocument(vocbase().name(), cname, newValue, opRes.options, - opRes.errorNumber()); - return std::move(opRes); - }); + return updateInternal(cname, newValue, options, MethodsApi::Asynchronous); } /// @brief update one or multiple documents in a collection, coordinator @@ -1320,8 +1210,8 @@ Future transaction::Methods::updateAsync(std::string const& cna /// if it fails, clean up after itself #ifndef USE_ENTERPRISE Future transaction::Methods::modifyCoordinator( - std::string const& cname, VPackSlice newValue, - OperationOptions const& options, TRI_voc_document_operation_e operation) { + std::string const& cname, VPackSlice newValue, OperationOptions const& options, + TRI_voc_document_operation_e operation, MethodsApi api) { if (!newValue.isArray()) { arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(newValue)); if (key.empty()) { @@ -1335,42 +1225,22 @@ Future transaction::Methods::modifyCoordinator( } const bool isPatch = (TRI_VOC_DOCUMENT_OPERATION_UPDATE == operation); - return arangodb::modifyDocumentOnCoordinator(*this, *colptr, newValue, options, isPatch); + return arangodb::modifyDocumentOnCoordinator(*this, *colptr, newValue, options, isPatch, api); } #endif +OperationResult Methods::replace(std::string const& cname, VPackSlice replaceValue, + OperationOptions const& options) { + return replaceInternal(cname, replaceValue, options, MethodsApi::Synchronous).get(); +} + /// @brief replace one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::replaceAsync(std::string const& cname, VPackSlice newValue, OperationOptions const& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - if (!newValue.isObject() && !newValue.isArray()) { - // must provide a document object or an array of documents - events::ReplaceDocument(vocbase().name(), cname, newValue, options, - TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - } - if (newValue.isArray() && newValue.length() == 0) { - events::ReplaceDocument(vocbase().name(), cname, newValue, options, - TRI_ERROR_NO_ERROR); - return futures::makeFuture(emptyResult(options)); - } - - auto f = Future::makeEmpty(); - if (_state->isCoordinator()) { - f = modifyCoordinator(cname, newValue, options, TRI_VOC_DOCUMENT_OPERATION_REPLACE); - } else { - OperationOptions optionsCopy = options; - f = modifyLocal(cname, newValue, optionsCopy, TRI_VOC_DOCUMENT_OPERATION_REPLACE); - } - return addTracking(std::move(f), [=](OperationResult&& opRes) { - events::ReplaceDocument(vocbase().name(), cname, newValue, opRes.options, - opRes.errorNumber()); - return std::move(opRes); - }); + return replaceInternal(cname, newValue, options, MethodsApi::Asynchronous); } /// @brief replace one or multiple documents in a collection, local @@ -1557,37 +1427,18 @@ Future transaction::Methods::modifyLocal(std::string const& col std::move(options), std::move(errorCounter)); } +OperationResult Methods::remove(std::string const& collectionName, + VPackSlice value, OperationOptions const& options) { + return removeInternal(collectionName, value, options, MethodsApi::Synchronous).get(); +} + /// @brief remove one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future transaction::Methods::removeAsync(std::string const& cname, VPackSlice value, OperationOptions const& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - if (!value.isObject() && !value.isArray() && !value.isString()) { - // must provide a document object or an array of documents - events::DeleteDocument(vocbase().name(), cname, value, options, - TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); - } - if (value.isArray() && value.length() == 0) { - events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); - return emptyResult(options); - } - - auto f = Future::makeEmpty(); - if (_state->isCoordinator()) { - f = removeCoordinator(cname, value, options); - } else { - OperationOptions optionsCopy = options; - f = removeLocal(cname, value, optionsCopy); - } - return addTracking(std::move(f), [=](OperationResult&& opRes) { - events::DeleteDocument(vocbase().name(), cname, value, opRes.options, - opRes.errorNumber()); - return std::move(opRes); - }); + return removeInternal(cname, value, options, MethodsApi::Asynchronous); } /// @brief remove one or multiple documents in a collection, coordinator @@ -1596,12 +1447,13 @@ Future transaction::Methods::removeAsync(std::string const& cna #ifndef USE_ENTERPRISE Future transaction::Methods::removeCoordinator(std::string const& cname, VPackSlice value, - OperationOptions const& options) { + OperationOptions const& options, + MethodsApi api) { auto colptr = resolver()->getCollectionStructCluster(cname); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, options)); } - return arangodb::removeDocumentOnCoordinator(*this, *colptr, value, options); + return arangodb::removeDocumentOnCoordinator(*this, *colptr, value, options, api); } #endif @@ -1827,28 +1679,23 @@ OperationResult transaction::Methods::allLocal(std::string const& collectionName return OperationResult(Result(), resultBuilder.steal(), options); } +OperationResult Methods::truncate(std::string const& collectionName, + OperationOptions const& options) { + return truncateInternal(collectionName, options, MethodsApi::Synchronous).get(); +} + /// @brief remove all documents in a collection Future transaction::Methods::truncateAsync(std::string const& collectionName, OperationOptions const& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - OperationOptions optionsCopy = options; - auto cb = [this, collectionName](OperationResult res) { - events::TruncateCollection(vocbase().name(), collectionName, res); - return res; - }; - - if (_state->isCoordinator()) { - return truncateCoordinator(collectionName, optionsCopy).thenValue(cb); - } - return truncateLocal(collectionName, optionsCopy).thenValue(cb); + return truncateInternal(collectionName, options, MethodsApi::Asynchronous); } /// @brief remove all documents in a collection, coordinator #ifndef USE_ENTERPRISE Future transaction::Methods::truncateCoordinator(std::string const& collectionName, - OperationOptions& options) { - return arangodb::truncateCollectionOnCoordinator(*this, collectionName, options); + OperationOptions& options, + MethodsApi api) { + return arangodb::truncateCollectionOnCoordinator(*this, collectionName, options, api); } #endif @@ -2022,44 +1869,37 @@ Future transaction::Methods::truncateLocal(std::string const& c return futures::makeFuture(OperationResult(res, options)); } +OperationResult Methods::count(std::string const& collectionName, + CountType type, OperationOptions const& options) { + return countInternal(collectionName, type, options, MethodsApi::Synchronous).get(); +} + /// @brief count the number of documents in a collection futures::Future transaction::Methods::countAsync( std::string const& collectionName, transaction::CountType type, OperationOptions const& options) { - TRI_ASSERT(_state->status() == transaction::Status::RUNNING); - - if (_state->isCoordinator()) { - return countCoordinator(collectionName, type, options); - } - - if (type == CountType::Detailed) { - // we are a single-server... we cannot provide detailed per-shard counts, - // so just downgrade the request to a normal request - type = CountType::Normal; - } - - return futures::makeFuture(countLocal(collectionName, type, options)); + return countInternal(collectionName, type, options, MethodsApi::Asynchronous); } #ifndef USE_ENTERPRISE /// @brief count the number of documents in a collection futures::Future transaction::Methods::countCoordinator( std::string const& collectionName, transaction::CountType type, - OperationOptions const& options) { + OperationOptions const& options, MethodsApi api) { // First determine the collection ID from the name: auto colptr = resolver()->getCollectionStructCluster(collectionName); if (colptr == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, options)); } - return countCoordinatorHelper(colptr, collectionName, type, options); + return countCoordinatorHelper(colptr, collectionName, type, options, api); } #endif futures::Future transaction::Methods::countCoordinatorHelper( std::shared_ptr const& collinfo, std::string const& collectionName, - transaction::CountType type, OperationOptions const& options) { + transaction::CountType type, OperationOptions const& options, MethodsApi api) { TRI_ASSERT(collinfo != nullptr); auto& cache = collinfo->countCache(); @@ -2073,7 +1913,7 @@ futures::Future transaction::Methods::countCoordinatorHelper( if (documents == CountCache::NotPopulated) { // no cache hit, or detailed results requested - return arangodb::countOnCoordinator(*this, collectionName, options) + return arangodb::countOnCoordinator(*this, collectionName, options, api) .thenValue([&cache, type, options](OperationResult&& res) -> OperationResult { if (res.fail()) { return std::move(res); @@ -2648,6 +2488,268 @@ Future Methods::replicateOperations( return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } +Future Methods::commitInternal(MethodsApi api) { + TRI_IF_FAILURE("TransactionCommitFail") { return Result(TRI_ERROR_DEBUG); } + + if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { + // transaction not created or not running + return Result(TRI_ERROR_TRANSACTION_INTERNAL, + "transaction not running on commit"); + } + + if (!_state->isReadOnlyTransaction()) { + auto const& exec = ExecContext::current(); + bool cancelRW = ServerState::readOnly() && !exec.isSuperuser(); + if (exec.isCanceled() || cancelRW) { + return Result(TRI_ERROR_ARANGO_READ_ONLY, "server is in read-only mode"); + } + } + + if (!_mainTransaction) { + return futures::makeFuture(Result()); + } + + auto f = futures::makeFuture(Result()); + if (_state->isRunningInCluster()) { + // first commit transaction on subordinate servers + f = ClusterTrxMethods::commitTransaction(*this, api); + } + + return std::move(f).thenValue([this](Result res) -> Result { + if (res.fail()) { // do not commit locally + LOG_TOPIC("5743a", WARN, Logger::TRANSACTIONS) + << "failed to commit on subordinates: '" << res.errorMessage() << "'"; + return res; + } + + res = _state->commitTransaction(this); + if (res.ok()) { + applyStatusChangeCallbacks(*this, Status::COMMITTED); + } + + return res; + }); +} + +Future Methods::abortInternal(MethodsApi api) { + if (_state == nullptr || _state->status() != transaction::Status::RUNNING) { + // transaction not created or not running + return Result(TRI_ERROR_TRANSACTION_INTERNAL, + "transaction not running on abort"); + } + + if (!_mainTransaction) { + return futures::makeFuture(Result()); + } + + auto f = futures::makeFuture(Result()); + if (_state->isRunningInCluster()) { + // first commit transaction on subordinate servers + f = ClusterTrxMethods::abortTransaction(*this, api); + } + + return std::move(f).thenValue([this](Result res) -> Result { + if (res.fail()) { // do not commit locally + LOG_TOPIC("d89a8", WARN, Logger::TRANSACTIONS) + << "failed to abort on subordinates: " << res.errorMessage(); + } // abort locally anyway + + res = _state->abortTransaction(this); + if (res.ok()) { + applyStatusChangeCallbacks(*this, Status::ABORTED); + } + + return res; + }); +} + +Future Methods::finishInternal(Result const& res, MethodsApi api) { + if (res.ok()) { + // there was no previous error, so we'll commit + return this->commitInternal(api); + } + + // there was a previous error, so we'll abort + return this->abortInternal(api).thenValue([res](Result const& ignore) { + return res; // return original error + }); +} + +Future Methods::documentInternal(std::string const& cname, VPackSlice value, + OperationOptions const& options, MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + if (!value.isObject() && !value.isArray()) { + // must provide a document object or an array of documents + events::ReadDocument(vocbase().name(), cname, value, options, + TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + } + + if (_state->isCoordinator()) { + return addTracking(documentCoordinator(cname, value, options, api), + [=](OperationResult&& opRes) { + events::ReadDocument(vocbase().name(), cname, value, + opRes.options, opRes.errorNumber()); + return std::move(opRes); + }); + } + return documentLocal(cname, value, options); +} + +Future Methods::insertInternal(std::string const& cname, VPackSlice value, + OperationOptions const& options, MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + if (!value.isObject() && !value.isArray()) { + // must provide a document object or an array of documents + events::CreateDocument(vocbase().name(), cname, value, options, + TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + } + if (value.isArray() && value.length() == 0) { + events::CreateDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); + return emptyResult(options); + } + + auto f = Future::makeEmpty(); + if (_state->isCoordinator()) { + f = insertCoordinator(cname, value, options, api); + } else { + OperationOptions optionsCopy = options; + f = insertLocal(cname, value, optionsCopy); + } + + return addTracking(std::move(f), [=](OperationResult&& opRes) { + events::CreateDocument(vocbase().name(), cname, + (opRes.ok() && opRes.options.returnNew) ? opRes.slice() : value, + opRes.options, opRes.errorNumber()); + return std::move(opRes); + }); +} + +Future Methods::updateInternal(std::string const& cname, VPackSlice newValue, + OperationOptions const& options, MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + if (!newValue.isObject() && !newValue.isArray()) { + // must provide a document object or an array of documents + events::ModifyDocument(vocbase().name(), cname, newValue, options, + TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + } + if (newValue.isArray() && newValue.length() == 0) { + events::ModifyDocument(vocbase().name(), cname, newValue, options, TRI_ERROR_NO_ERROR); + return emptyResult(options); + } + + auto f = Future::makeEmpty(); + if (_state->isCoordinator()) { + f = modifyCoordinator(cname, newValue, options, TRI_VOC_DOCUMENT_OPERATION_UPDATE, api); + } else { + OperationOptions optionsCopy = options; + f = modifyLocal(cname, newValue, optionsCopy, TRI_VOC_DOCUMENT_OPERATION_UPDATE); + } + return addTracking(std::move(f), [=](OperationResult&& opRes) { + events::ModifyDocument(vocbase().name(), cname, newValue, opRes.options, + opRes.errorNumber()); + return std::move(opRes); + }); +} + +Future Methods::replaceInternal(std::string const& cname, VPackSlice newValue, + OperationOptions const& options, MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + if (!newValue.isObject() && !newValue.isArray()) { + // must provide a document object or an array of documents + events::ReplaceDocument(vocbase().name(), cname, newValue, options, + TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + } + if (newValue.isArray() && newValue.length() == 0) { + events::ReplaceDocument(vocbase().name(), cname, newValue, options, TRI_ERROR_NO_ERROR); + return futures::makeFuture(emptyResult(options)); + } + + auto f = Future::makeEmpty(); + if (_state->isCoordinator()) { + f = modifyCoordinator(cname, newValue, options, TRI_VOC_DOCUMENT_OPERATION_REPLACE, api); + } else { + OperationOptions optionsCopy = options; + f = modifyLocal(cname, newValue, optionsCopy, TRI_VOC_DOCUMENT_OPERATION_REPLACE); + } + return addTracking(std::move(f), [=](OperationResult&& opRes) { + events::ReplaceDocument(vocbase().name(), cname, newValue, opRes.options, + opRes.errorNumber()); + return std::move(opRes); + }); +} + +Future Methods::removeInternal(std::string const& cname, VPackSlice value, + OperationOptions const& options, MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + if (!value.isObject() && !value.isArray() && !value.isString()) { + // must provide a document object or an array of documents + events::DeleteDocument(vocbase().name(), cname, value, options, + TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); + } + if (value.isArray() && value.length() == 0) { + events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); + return emptyResult(options); + } + + auto f = Future::makeEmpty(); + if (_state->isCoordinator()) { + f = removeCoordinator(cname, value, options, api); + } else { + OperationOptions optionsCopy = options; + f = removeLocal(cname, value, optionsCopy); + } + return addTracking(std::move(f), [=](OperationResult&& opRes) { + events::DeleteDocument(vocbase().name(), cname, value, opRes.options, + opRes.errorNumber()); + return std::move(opRes); + }); +} + +Future Methods::truncateInternal(std::string const& collectionName, + OperationOptions const& options, MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + OperationOptions optionsCopy = options; + auto cb = [this, collectionName](OperationResult res) { + events::TruncateCollection(vocbase().name(), collectionName, res); + return res; + }; + + if (_state->isCoordinator()) { + return truncateCoordinator(collectionName, optionsCopy, api).thenValue(cb); + } + return truncateLocal(collectionName, optionsCopy).thenValue(cb); +} + +futures::Future Methods::countInternal(std::string const& collectionName, + CountType type, + OperationOptions const& options, + MethodsApi api) { + TRI_ASSERT(_state->status() == transaction::Status::RUNNING); + + if (_state->isCoordinator()) { + return countCoordinator(collectionName, type, options, api); + } + + if (type == CountType::Detailed) { + // we are a single-server... we cannot provide detailed per-shard counts, + // so just downgrade the request to a normal request + type = CountType::Normal; + } + + return futures::makeFuture(countLocal(collectionName, type, options)); +} + #ifndef USE_ENTERPRISE ErrorCode Methods::validateSmartJoinAttribute(LogicalCollection const&, arangodb::velocypack::Slice) { diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index e3a1dcbbbf72..6560b898ae46 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -33,6 +33,7 @@ #include "Rest/CommonDefines.h" #include "Transaction/CountCache.h" #include "Transaction/Hints.h" +#include "Transaction/MethodsApi.h" #include "Transaction/Options.h" #include "Transaction/Status.h" #include "Utils/OperationResult.h" @@ -88,21 +89,15 @@ namespace transaction { class Methods { public: - template using Future = futures::Future; using IndexHandle = std::shared_ptr; // legacy using VPackSlice = arangodb::velocypack::Slice; - /// @brief transaction::Methods - private: Methods() = delete; Methods(Methods const&) = delete; - Methods& operator=(Methods const&) = delete; - public: - /// @brief create the transaction explicit Methods(std::shared_ptr const& ctx, transaction::Options const& options = transaction::Options()); @@ -203,17 +198,17 @@ class Methods { Result begin(); /// @deprecated use async variant - Result commit() { return commitAsync().get(); } + Result commit(); /// @brief commit / finish the transaction Future commitAsync(); /// @deprecated use async variant - Result abort() { return abortAsync().get(); } + Result abort(); /// @brief abort the transaction Future abortAsync(); /// @deprecated use async variant - Result finish(Result const& res) { return finishAsync(res).get(); } + Result finish(Result const& res); /// @brief finish a transaction (commit or abort), based on the previous state Future finishAsync(Result const& res); @@ -272,47 +267,37 @@ class Methods { /// @brief return one or multiple documents from a collection /// @deprecated use async variant - ENTERPRISE_VIRT OperationResult document(std::string const& collectionName, - VPackSlice value, - OperationOptions& options) { - return documentAsync(collectionName, value, options).get(); - } + [[deprecated]] OperationResult document(std::string const& collectionName, VPackSlice value, + OperationOptions const& options); /// @brief return one or multiple documents from a collection - Future documentAsync(std::string const& collectionName, - VPackSlice value, OperationOptions& options); + Future documentAsync(std::string const& cname, + VPackSlice value, + OperationOptions const& options); /// @deprecated use async variant - OperationResult insert(std::string const& cname, - VPackSlice value, - OperationOptions const& options) { - return this->insertAsync(cname, value, options).get(); - } + [[deprecated]] OperationResult insert(std::string const& cname, VPackSlice value, + OperationOptions const& options); /// @brief create one or multiple documents in a collection /// The single-document variant of this operation will either succeed or, /// if it fails, clean up after itself - Future insertAsync(std::string const& collectionName, - VPackSlice value, + Future insertAsync(std::string const& collectionName, VPackSlice value, OperationOptions const& options); - + /// @deprecated use async variant - OperationResult update(std::string const& cname, VPackSlice updateValue, - OperationOptions const& options) { - return this->updateAsync(cname, updateValue, options).get(); - } + [[deprecated]] OperationResult update(std::string const& cname, VPackSlice updateValue, + OperationOptions const& options); /// @brief update/patch one or multiple documents in a collection. /// The single-document variant of this operation will either succeed or, /// if it fails, clean up after itself Future updateAsync(std::string const& collectionName, VPackSlice updateValue, OperationOptions const& options); - + /// @deprecated use async variant - OperationResult replace(std::string const& cname, VPackSlice replaceValue, - OperationOptions const& options) { - return this->replaceAsync(cname, replaceValue, options).get(); - } + [[deprecated]] OperationResult replace(std::string const& cname, VPackSlice replaceValue, + OperationOptions const& options); /// @brief replace one or multiple documents in a collection. /// The single-document variant of this operation will either succeed or, @@ -321,40 +306,34 @@ class Methods { OperationOptions const& options); /// @deprecated use async variant - OperationResult remove(std::string const& collectionName, - VPackSlice value, OperationOptions const& options) { - return removeAsync(collectionName, value, options).get(); - } + [[deprecated]] OperationResult remove(std::string const& collectionName, VPackSlice value, + OperationOptions const& options); /// @brief remove one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself - Future removeAsync(std::string const& collectionName, - VPackSlice value, OperationOptions const& options); + Future removeAsync(std::string const& collectionName, VPackSlice value, + OperationOptions const& options); /// @brief fetches all documents in a collection ENTERPRISE_VIRT OperationResult all(std::string const& collectionName, uint64_t skip, uint64_t limit, OperationOptions const& options); /// @brief deprecated use async variant - OperationResult truncate(std::string const& collectionName, OperationOptions const& options) { - return this->truncateAsync(collectionName, options).get(); - } + [[deprecated]] OperationResult truncate(std::string const& collectionName, + OperationOptions const& options); /// @brief remove all documents in a collection Future truncateAsync(std::string const& collectionName, OperationOptions const& options); /// deprecated, use async variant - virtual OperationResult count(std::string const& collectionName, - CountType type, OperationOptions const& options) { - return countAsync(collectionName, type, options).get(); - } + [[deprecated]] OperationResult count(std::string const& collectionName, CountType type, + OperationOptions const& options); /// @brief count the number of documents in a collection - virtual futures::Future countAsync(std::string const& collectionName, - CountType type, - OperationOptions const& options); + futures::Future countAsync(std::string const& collectionName, CountType type, + OperationOptions const& options); /// @brief factory for IndexIterator objects from AQL /// note: the caller must have read-locked the underlying collection when @@ -411,16 +390,17 @@ class Methods { RevisionId oldRid, ManagedDocumentResult const* oldDoc, ManagedDocumentResult const* newDoc); - Future documentCoordinator(std::string const& collectionName, - VPackSlice value, - OperationOptions const& options); + futures::Future documentCoordinator(std::string const& collectionName, + VPackSlice const value, + OperationOptions const& options, + MethodsApi api); Future documentLocal(std::string const& collectionName, VPackSlice value, OperationOptions const& options); Future insertCoordinator(std::string const& collectionName, - VPackSlice value, - OperationOptions const& options); + VPackSlice value, OperationOptions const& options, + MethodsApi api); Future insertLocal(std::string const& collectionName, VPackSlice value, OperationOptions& options); @@ -428,7 +408,8 @@ class Methods { Future modifyCoordinator(std::string const& collectionName, VPackSlice newValue, OperationOptions const& options, - TRI_voc_document_operation_e operation); + TRI_voc_document_operation_e operation, + MethodsApi api); Future modifyLocal(std::string const& collectionName, VPackSlice newValue, @@ -436,8 +417,8 @@ class Methods { TRI_voc_document_operation_e operation); Future removeCoordinator(std::string const& collectionName, - VPackSlice value, - OperationOptions const& options); + VPackSlice value, OperationOptions const& options, + transaction::MethodsApi api); Future removeLocal(std::string const& collectionName, VPackSlice value, @@ -455,12 +436,38 @@ class Methods { OperationResult anyLocal(std::string const& collectionName, OperationOptions const& options); Future truncateCoordinator(std::string const& collectionName, - OperationOptions& options); + OperationOptions& options, MethodsApi api); Future truncateLocal(std::string const& collectionName, OperationOptions& options); protected: + + // The internal methods distinguish between the synchronous and asynchronous + // APIs via an additional parameter, so `skipScheduler` can be set for network + // requests. + auto commitInternal(MethodsApi api) -> Future; + auto abortInternal(MethodsApi api) -> Future; + auto finishInternal(Result const& res, MethodsApi api) -> Future; + // is virtual for IgnoreNoAccessMethods + ENTERPRISE_VIRT auto documentInternal(std::string const& cname, VPackSlice value, + OperationOptions const& options, MethodsApi api) + -> Future; + auto insertInternal(std::string const& collectionName, VPackSlice value, + OperationOptions const& options, MethodsApi api) -> Future; + auto updateInternal(std::string const& collectionName, VPackSlice updateValue, + OperationOptions const& options, MethodsApi api) -> Future; + auto replaceInternal(std::string const& collectionName, VPackSlice replaceValue, + OperationOptions const& options, MethodsApi api) -> Future; + auto removeInternal(std::string const& collectionName, VPackSlice value, + OperationOptions const& options, MethodsApi api) -> Future; + auto truncateInternal(std::string const& collectionName, OperationOptions const& options, + MethodsApi api) -> Future; + // is virtual for IgnoreNoAccessMethods + ENTERPRISE_VIRT auto countInternal(std::string const& collectionName, CountType type, + OperationOptions const& options, MethodsApi api) + -> futures::Future; + /// @brief return the transaction collection for a document collection TransactionCollection* trxCollection(DataSourceId cid, AccessMode::Type type = AccessMode::Type::READ) const; @@ -470,11 +477,12 @@ class Methods { futures::Future countCoordinator(std::string const& collectionName, CountType type, - OperationOptions const& options); + OperationOptions const& options, + MethodsApi api); futures::Future countCoordinatorHelper( std::shared_ptr const& collinfo, std::string const& collectionName, - CountType type, OperationOptions const& options); + CountType type, OperationOptions const& options, MethodsApi api); OperationResult countLocal(std::string const& collectionName, CountType type, OperationOptions const& options); @@ -502,7 +510,6 @@ class Methods { std::unordered_set const& excludePositions, FollowerInfo& followerInfo); - private: /// @brief transaction hints transaction::Hints _localHints; @@ -515,4 +522,3 @@ class Methods { } // namespace transaction } // namespace arangodb - diff --git a/arangod/Transaction/MethodsApi.h b/arangod/Transaction/MethodsApi.h new file mode 100644 index 000000000000..9c5ba158aeee --- /dev/null +++ b/arangod/Transaction/MethodsApi.h @@ -0,0 +1,30 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2021-2021 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +namespace arangodb::transaction { +enum class MethodsApi { + Asynchronous, + Synchronous +}; +}