diff --git a/CHANGELOG b/CHANGELOG index 34ea04e4d2ef..9faf9cd6fb66 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ devel ----- +* Fix locking of AQL queries write queries on DB servers. + * APM-112: invalid use of OPTIONS in AQL queries will now raise a warning in the query. The feature is useful to detect misspelled attribute names in OPTIONS, e.g. diff --git a/arangod/Aql/ClusterQuery.cpp b/arangod/Aql/ClusterQuery.cpp index 71c82ad560ab..6c3b8865ca78 100644 --- a/arangod/Aql/ClusterQuery.cpp +++ b/arangod/Aql/ClusterQuery.cpp @@ -30,6 +30,7 @@ #include "Aql/QueryRegistry.h" #include "Aql/QueryProfile.h" #include "Cluster/ServerState.h" +#include "Random/RandomGenerator.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Context.h" #include "RestServer/QueryRegistryFeature.h" @@ -102,6 +103,13 @@ void ClusterQuery::prepareClusterQuery(VPackSlice querySlice, if (_trx->state()->isDBServer()) { _trx->state()->acceptAnalyzersRevision(analyzersRevision); } + + TRI_IF_FAILURE("Query::setupLockTimeout") { + if (RandomGenerator::interval(uint32_t(100)) >= 95) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_LOCK_TIMEOUT); + } + } + Result res = _trx->begin(); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); diff --git a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp index 289c9511bbcc..595dde01c260 100644 --- a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp +++ b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp @@ -402,8 +402,20 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( ErrorCode cleanupReason = TRI_ERROR_CLUSTER_TIMEOUT; auto cleanupGuard = scopeGuard([this, &serverToQueryId, &cleanupReason]() { - // Fire and forget - std::ignore = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId); + try { + transaction::Methods& trx = _query.trxForOptimization(); + auto requests = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId); + if (!trx.isMainTransaction()) { + // for AQL queries in streaming transactions, we will wait for the + // complete shutdown to have finished before we return to the caller. + // this is done so that there will be no 2 AQL queries in the same + // streaming transaction at the same time + futures::collectAll(requests).wait(); + } + } catch (std::exception const& ex) { + LOG_TOPIC("2a9fe", WARN, Logger::AQL) + << "unable to clean up query snippets: " << ex.what(); + } }); NetworkFeature const& nf = _query.vocbase().server().getFeature(); @@ -412,6 +424,9 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( // nullptr only happens on controlled shutdown return {TRI_ERROR_SHUTTING_DOWN}; } + + // remember which servers we add during our setup request + ::arangodb::containers::HashSet serversAdded; transaction::Methods& trx = _query.trxForOptimization(); std::vector> networkCalls{}; @@ -427,9 +442,15 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( } TRI_IF_FAILURE("Query::setupTimeoutFailSequence") { - options.timeout = network::Timeout(0.5); + double t = 0.5; + TRI_IF_FAILURE("Query::setupTimeoutFailSequenceRandom") { + if (RandomGenerator::interval(uint32_t(100)) >= 95) { + t = 3.0; + } + } + options.timeout = network::Timeout(t); } - + /// cluster global query id, under which the query will be registered /// on DB servers from 3.8 onwards. QueryId clusterQueryId = _query.vocbase().server().getFeature().clusterInfo().uniqid(); @@ -450,6 +471,13 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( continue; } + if (!trx.state()->knownServers().contains(server)) { + // we are about to add this server to the transaction. + // remember it, so we can roll the addition back for + // the second setup request if we need to + serversAdded.emplace(server); + } + networkCalls.emplace_back( buildSetupRequest(trx, server, infoSlice, didCreateEngine, snippetIds, serverToQueryId, serverToQueryIdLock, pool, options)); @@ -463,7 +491,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( // We can directly report a non TRI_ERROR_LOCK_TIMEOUT // error as we need to abort after. // Otherwise we need to report - Result res{TRI_ERROR_NO_ERROR}; + Result res; for (auto const& tryRes : responses) { auto response = tryRes.get(); if (response.fail()) { @@ -478,7 +506,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( } // Return what we have, this will be ok() if and only // if none of the requests failed. - // If will be LOCK_TIMEOUT if and only if the only error + // It will be LOCK_TIMEOUT if and only if the only error // we see was LOCK_TIMEOUT. return res; }); @@ -490,26 +518,59 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( return fastPathResult.get(); } + // we got a lock timeout response for the fast path locking... { // in case of fast path failure, we need to cleanup engines auto requests = cleanupEngines(fastPathResult.get().errorNumber(), _query.vocbase().name(), serverToQueryId); - // Wait for all requests to complete. + // Wait for all cleanup requests to complete. // So we know that all Transactions are aborted. - // We do NOT care for the actual result. - futures::collectAll(requests).wait(); - snippetIds.clear(); + Result res; + for (auto& tryRes : requests) { + network::Response const& response = tryRes.get(); + if (response.fail()) { + // note first error, but continue iterating over all results + LOG_TOPIC("2d319", DEBUG, Logger::AQL) + << "received error from server " << response.destination + << " during query cleanup: " << response.combinedResult().errorMessage(); + res.reset(response.combinedResult()); + } + } + if (res.fail()) { + // unable to do a proper cleanup. + // it is not safe to go on here. + cleanupGuard.cancel(); + cleanupReason = res.errorNumber(); + return res; + } } - + + // fast path locking rolled back successfully! + snippetIds.clear(); + + // revert the addition of servers by us + for (auto const& s : serversAdded) { + trx.state()->removeKnownServer(s); + } + // we must generate a new query id, because the fast path setup has failed clusterQueryId = _query.vocbase().server().getFeature().clusterInfo().uniqid(); + if (trx.isMainTransaction() && !trx.state()->isReadOnlyTransaction()) { + // when we are not in a streaming transaction, it is ok to roll a new trx id. + // it is not ok to change the trx id inside a streaming transaction, + // because then the caller would not be able to "talk" to the transaction + // any further. + // note: read-only transactions do not need to reroll their id, as there will + // be no locks taken. + trx.state()->coordinatorRerollTransactionId(); + } + // set back to default lock timeout for slow path fallback _query.setLockTimeout(oldLockTimeout); LOG_TOPIC("f5022", DEBUG, Logger::AQL) << "Potential deadlock detected, using slow path for locking. This " "is expected if exclusive locks are used."; - trx.state()->coordinatorRerollTransactionId(); // Make sure we always use the same ordering on servers std::sort(engineInformation.begin(), engineInformation.end(), @@ -566,7 +627,7 @@ Result EngineInfoContainerDBServerServerBased::parseResponse( QueryId& globalQueryId) const { if (!response.isObject() || !response.get("result").isObject()) { LOG_TOPIC("0c3f2", WARN, Logger::AQL) << "Received error information from " - << server << " : " << response.toJson(); + << server << ": " << response.toJson(); if (response.hasKey(StaticStrings::ErrorNum) && response.hasKey(StaticStrings::ErrorMessage)) { return network::resultFromBody(response, TRI_ERROR_CLUSTER_AQL_COMMUNICATION) @@ -680,7 +741,7 @@ std::vector EngineInfoContainerDBServerServerBased VPackBuffer body; VPackBuilder builder(body); builder.openObject(); - builder.add("code", VPackValue(to_string(errorCode))); + builder.add("code", VPackValue(errorCode)); builder.close(); requests.reserve(serverQueryIds.size()); for (auto const& [server, queryId] : serverQueryIds) { diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index ea30d8680bfd..3ed18ac1ade8 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -178,7 +178,7 @@ Result ExecutionEngine::createBlocks(std::vector const& nodes, // put it into our cache: cache.try_emplace(en, eb); } - return {TRI_ERROR_NO_ERROR}; + return {}; } /// @brief create the engine diff --git a/arangod/Aql/ExecutionEngine.h b/arangod/Aql/ExecutionEngine.h index 32c4da8de8af..0dfac1c5ec2c 100644 --- a/arangod/Aql/ExecutionEngine.h +++ b/arangod/Aql/ExecutionEngine.h @@ -84,8 +84,7 @@ class ExecutionEngine { EngineId engineId() const { return _engineId; } - - + /// @brief get the root block TEST_VIRTUAL ExecutionBlock* root() const; diff --git a/arangod/Aql/Functions.cpp b/arangod/Aql/Functions.cpp index e0b67dc2c4f0..9a12ab1b0d1f 100644 --- a/arangod/Aql/Functions.cpp +++ b/arangod/Aql/Functions.cpp @@ -845,7 +845,8 @@ void unsetOrKeep(transaction::Methods* trx, VPackSlice const& value, /// @brief Helper function to get a document by it's identifier /// Lazy Locks the collection if necessary. -void getDocumentByIdentifier(transaction::Methods* trx, std::string& collectionName, +void getDocumentByIdentifier(transaction::Methods* trx, OperationOptions const& options, + std::string& collectionName, std::string const& identifier, bool ignoreError, VPackBuilder& result) { transaction::BuilderLeaser searchBuilder(trx); @@ -874,7 +875,7 @@ void getDocumentByIdentifier(transaction::Methods* trx, std::string& collectionN Result res; try { - res = trx->documentFastPath(collectionName, searchBuilder->slice(), result); + res = trx->documentFastPath(collectionName, searchBuilder->slice(), options, result); } catch (arangodb::basics::Exception const& ex) { res.reset(ex.code()); } @@ -6604,6 +6605,9 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const // cppcheck-suppress variableScope static char const* AFN = "DOCUMENT"; + OperationOptions options; + options.documentCallFromAql = true; + transaction::Methods* trx = &expressionContext->trx(); auto* vopts = &trx->vpackOptions(); if (parameters.size() == 1) { @@ -6612,7 +6616,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const if (id.isString()) { std::string identifier(id.slice().copyString()); std::string colName; - ::getDocumentByIdentifier(trx, colName, identifier, true, *builder.get()); + ::getDocumentByIdentifier(trx, options, colName, identifier, true, *builder.get()); if (builder->isEmpty()) { // not found return AqlValue(AqlValueHintNull()); @@ -6627,7 +6631,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const if (next.isString()) { std::string identifier = next.copyString(); std::string colName; - ::getDocumentByIdentifier(trx, colName, identifier, true, *builder.get()); + ::getDocumentByIdentifier(trx, options, colName, identifier, true, *builder.get()); } } builder->close(); @@ -6647,7 +6651,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const if (id.isString()) { transaction::BuilderLeaser builder(trx); std::string identifier(id.slice().copyString()); - ::getDocumentByIdentifier(trx, collectionName, identifier, true, *builder.get()); + ::getDocumentByIdentifier(trx, options, collectionName, identifier, true, *builder.get()); if (builder->isEmpty()) { return AqlValue(AqlValueHintNull()); } @@ -6663,7 +6667,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const for (auto const& next : VPackArrayIterator(idSlice)) { if (next.isString()) { std::string identifier(next.copyString()); - ::getDocumentByIdentifier(trx, collectionName, identifier, true, + ::getDocumentByIdentifier(trx, options, collectionName, identifier, true, *builder.get()); } } diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index e179faf63d56..763a604fdf4d 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -1177,6 +1177,7 @@ ExecutionState Query::cleanupPlanAndEngine(ErrorCode errorCode, bool sync) { _sharedState->waitForAsyncWakeup(); state = cleanupTrxAndEngines(errorCode); } + return state; } return cleanupTrxAndEngines(errorCode); diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 34a42ef69c50..5cc844c1030e 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -206,7 +206,6 @@ class Query : public QueryContext { aql::ServerQueryIdList& serverQueryIds() { return _serverQueryIds; } aql::ExecutionStats& executionStats() { return _execStats; } - // Debug method to kill a query at a specific position // during execution. It internally asserts that the query // is actually visible through other APIS (e.g. current queries) diff --git a/arangod/Aql/QueryRegistry.cpp b/arangod/Aql/QueryRegistry.cpp index 28f1260ba63b..8dcce66e9a3c 100644 --- a/arangod/Aql/QueryRegistry.cpp +++ b/arangod/Aql/QueryRegistry.cpp @@ -483,6 +483,7 @@ void QueryRegistry::registerSnippets(SnippetList const& snippets) { void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept { TRI_ASSERT(ServerState::instance()->isCoordinator()); + int iterations = 0; while (true) { WRITE_LOCKER(guard, _lock); @@ -494,8 +495,6 @@ void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept { continue; } if (it->second._isOpen) { // engine still in use - LOG_TOPIC("33cfb", WARN, arangodb::Logger::AQL) - << "engine snippet '" << it->first << "' is still in use"; continue; } _engines.erase(it); @@ -505,6 +504,15 @@ void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept { if (remain == 0) { break; } + if (iterations == 0) { + LOG_TOPIC("33cfb", DEBUG, arangodb::Logger::AQL) + << remain << " engine snippet(s) still in use on query shutdown"; + } else if (iterations == 100) { + LOG_TOPIC("df7c7", WARN, arangodb::Logger::AQL) + << remain << " engine snippet(s) still in use on query shutdown"; + } + ++iterations; + std::this_thread::yield(); } } diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index e66892d62328..677779889f0c 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -52,6 +52,7 @@ #include "Transaction/Methods.h" #include +#include #include using namespace arangodb; @@ -60,13 +61,17 @@ using namespace arangodb::aql; using VelocyPackHelper = arangodb::basics::VelocyPackHelper; +namespace { +VPackStringRef const writeKey("write"); +VPackStringRef const exclusiveKey("exclusive"); +} // namespace + RestAqlHandler::RestAqlHandler(application_features::ApplicationServer& server, GeneralRequest* request, GeneralResponse* response, QueryRegistry* qr) : RestVocbaseBaseHandler(server, request, response), _queryRegistry(qr), - _engine(nullptr), - _qId(0) { + _engine(nullptr) { TRI_ASSERT(_queryRegistry != nullptr); } @@ -112,9 +117,15 @@ void RestAqlHandler::setupClusterQuery() { TRI_IF_FAILURE("Query::setupTimeoutFailSequence") { // simulate lock timeout during query setup - std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + uint32_t r = 100; + TRI_IF_FAILURE("Query::setupTimeoutFailSequenceRandom") { + r = RandomGenerator::interval(uint32_t(100)); + } + if (r >= 96) { + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + } } - + bool success = false; VPackSlice querySlice = this->parseVPackBody(success); if (!success) { @@ -231,6 +242,8 @@ void RestAqlHandler::setupClusterQuery() { options.ttl = _queryRegistry->defaultTTL(); } + AccessMode::Type access = AccessMode::Type::READ; + // TODO: technically we could change the code in prepareClusterQuery to parse // the collection info directly // Build the collection information @@ -238,7 +251,7 @@ void RestAqlHandler::setupClusterQuery() { collectionBuilder.openArray(); for (auto const& lockInf : VPackObjectIterator(lockInfoSlice)) { if (!lockInf.value.isArray()) { - LOG_TOPIC("1dc00", ERR, arangodb::Logger::AQL) + LOG_TOPIC("1dc00", WARN, arangodb::Logger::AQL) << "Invalid VelocyPack: \"lockInfo." << lockInf.key.copyString() << "\" is required but not an array."; generateError(rest::ResponseCode::BAD, TRI_ERROR_INTERNAL, @@ -249,7 +262,7 @@ void RestAqlHandler::setupClusterQuery() { } for (VPackSlice col : VPackArrayIterator(lockInf.value)) { if (!col.isString()) { - LOG_TOPIC("9e29f", ERR, arangodb::Logger::AQL) + LOG_TOPIC("9e29f", WARN, arangodb::Logger::AQL) << "Invalid VelocyPack: \"lockInfo." << lockInf.key.copyString() << "\" is required but not an array."; generateError(rest::ResponseCode::BAD, TRI_ERROR_INTERNAL, @@ -262,18 +275,23 @@ void RestAqlHandler::setupClusterQuery() { collectionBuilder.add("name", col); collectionBuilder.add("type", lockInf.key); collectionBuilder.close(); + + if (!AccessMode::isWriteOrExclusive(access) && lockInf.key.isEqualString(::writeKey)) { + access = AccessMode::Type::WRITE; + } else if (!AccessMode::isExclusive(access) && lockInf.key.isEqualString(::exclusiveKey)) { + access = AccessMode::Type::EXCLUSIVE; + } } } collectionBuilder.close(); - // simon: making this write breaks queries where DOCUMENT function - // is used in a coordinator-snippet above a DBServer-snippet - AccessMode::Type access = AccessMode::Type::READ; - const double ttl = options.ttl; + double const ttl = options.ttl; // creates a StandaloneContext or a leased context auto q = std::make_unique(clusterQueryId, createTransactionContext(access), std::move(options)); + + TRI_ASSERT(clusterQueryId == 0 || clusterQueryId == q->id()); VPackBufferUInt8 buffer; VPackBuilder answerBuilder(buffer); @@ -288,6 +306,7 @@ void RestAqlHandler::setupClusterQuery() { // coordinators answerBuilder.add("queryId", VPackValue(q->id())); } + QueryAnalyzerRevisions analyzersRevision; auto revisionRes = analyzersRevision.fromVelocyPack(querySlice); if(ADB_UNLIKELY(revisionRes.fail())) { @@ -327,6 +346,7 @@ void RestAqlHandler::setupClusterQuery() { } _queryRegistry->insertQuery(std::move(q), ttl, std::move(rGuard)); + generateResult(rest::ResponseCode::OK, std::move(buffer)); } @@ -351,12 +371,12 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, std::string co [self = shared_from_this()] { return self->wakeupHandler(); }); } - TRI_ASSERT(_qId > 0); TRI_ASSERT(_engine != nullptr); + TRI_ASSERT(std::to_string(_engine->engineId()) == idString); if (_engine->getQuery().queryOptions().profile >= ProfileLevel::TraceOne) { LOG_TOPIC("1bf67", INFO, Logger::QUERIES) - << "[query#" << _qId << "] remote request received: " << operation + << "[query#" << idString << "] remote request received: " << operation << " registryId=" << idString; } @@ -469,14 +489,12 @@ RestStatus RestAqlHandler::continueExecute() { void RestAqlHandler::shutdownExecute(bool isFinalized) noexcept { try { - if (isFinalized) { - if (_engine) { - _engine->sharedState()->resetWakeupHandler(); - } - if (_qId != 0) { - _engine = nullptr; - _queryRegistry->closeEngine(_qId); - } + if (isFinalized && _engine != nullptr) { + auto qId = _engine->engineId(); + _engine->sharedState()->resetWakeupHandler(); + _engine = nullptr; + + _queryRegistry->closeEngine(qId); } } catch (arangodb::basics::Exception const& ex) { LOG_TOPIC("f73b8", INFO, Logger::FIXME) @@ -494,8 +512,7 @@ void RestAqlHandler::shutdownExecute(bool isFinalized) noexcept { // dig out the query from ID, handle errors ExecutionEngine* RestAqlHandler::findEngine(std::string const& idString) { TRI_ASSERT(_engine == nullptr); - TRI_ASSERT(_qId == 0); - _qId = arangodb::basics::StringUtils::uint64(idString); + uint64_t qId = arangodb::basics::StringUtils::uint64(idString); // sleep for 10ms each time, wait for at most 30 seconds... static int64_t const SingleWaitPeriod = 10 * 1000; @@ -513,7 +530,7 @@ ExecutionEngine* RestAqlHandler::findEngine(std::string const& idString) { break; } try { - q = _queryRegistry->openExecutionEngine(_qId); + q = _queryRegistry->openExecutionEngine(qId); // we got the query (or it was not found - at least no one else // can now have access to the same query) break; @@ -526,13 +543,12 @@ ExecutionEngine* RestAqlHandler::findEngine(std::string const& idString) { if (q == nullptr) { LOG_TOPIC_IF("baef6", ERR, Logger::AQL, iterations == MaxIterations) - << "Timeout waiting for query " << _qId; - _qId = 0; + << "Timeout waiting for query " << qId; generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_QUERY_NOT_FOUND, "query ID " + idString + " not found"); } - TRI_ASSERT(q == nullptr || _qId > 0); + TRI_ASSERT(q == nullptr || q->engineId() == qId); return q; } diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index fcd4cf7ef459..c61f1818eec9 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -33,7 +33,6 @@ namespace arangodb { namespace aql { class Query; class QueryRegistry; -enum class SerializationFormat; /// @brief shard control request handler class RestAqlHandler : public RestVocbaseBaseHandler { @@ -138,9 +137,6 @@ class RestAqlHandler : public RestVocbaseBaseHandler { QueryRegistry* _queryRegistry; aql::ExecutionEngine* _engine; - - // id of current query - QueryId _qId; }; } // namespace aql } // namespace arangodb diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 51a657fbf6bf..653bbb1ad9e5 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -1799,7 +1799,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, } // lazily begin transactions on leaders - const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); + bool const isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); // Some stuff to prepare cluster-internal requests: @@ -1842,6 +1842,9 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, for (auto const& it : opCtx.shardMap) { network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); + if (options.documentCallFromAql) { + headers.try_emplace(StaticStrings::AqlDocumentCall, "true"); + } std::string url; VPackBuffer buffer; @@ -1924,6 +1927,9 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, if (addMatch) { headers.try_emplace("if-match", slice.get(StaticStrings::RevString).copyString()); } + if (options.documentCallFromAql) { + headers.try_emplace(StaticStrings::AqlDocumentCall, "true"); + } futures.emplace_back(network::sendRequestRetry( pool, "shard:" + shard, restVerb, diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 5cd4e97fd060..7ca0a4e87169 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -3432,7 +3432,7 @@ Result RestReplicationHandler::createBlockingTransaction( auto rGuard = std::make_unique( ci.rebootTracker().callMeOnChange(RebootTracker::PeerState(serverId, rebootId), std::move(f), std::move(comment))); - auto ctx = mgr->leaseManagedTrx(id, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(id, AccessMode::Type::WRITE, /*isSideUser*/ false); if (!ctx) { // Trx does not exist. So we assume it got cancelled. @@ -3512,7 +3512,7 @@ ResultT RestReplicationHandler::computeCollectionChecksum( } try { - auto ctx = mgr->leaseManagedTrx(id, AccessMode::Type::READ); + auto ctx = mgr->leaseManagedTrx(id, AccessMode::Type::READ, /*isSideUser*/ false); if (!ctx) { // Trx does not exist. So we assume it got cancelled. return ResultT::error(TRI_ERROR_TRANSACTION_INTERNAL, diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.cpp b/arangod/RestHandler/RestVocbaseBaseHandler.cpp index 9bcfcfb64fb5..481058d69b83 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.cpp +++ b/arangod/RestHandler/RestVocbaseBaseHandler.cpp @@ -588,16 +588,30 @@ std::unique_ptr RestVocbaseBaseHandler::createTransaction( } } } - - auto ctx = mgr->leaseManagedTrx(tid, type); + + // if we have a read operation and the x-arango-aql-document-call header is set, + // it means this is a request by the DOCUMENT function inside an AQL query. in + // this case, we cannot be sure to lease the transaction context successfully, + // because the AQL query may have already acquired the write lock on the context + // for the entire duration of the query. if this is the case, then the query + // already has the lock, and it is ok if we lease the context here without + // acquiring it again. + bool const isSideUser = + (ServerState::instance()->isDBServer() && + AccessMode::isRead(type) && + !_request->header(StaticStrings::AqlDocumentCall).empty()); + + std::shared_ptr ctx = mgr->leaseManagedTrx(tid, type, isSideUser); + if (!ctx) { - LOG_TOPIC("e94ea", DEBUG, Logger::TRANSACTIONS) << "Transaction with id '" << tid << "' not found"; - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_NOT_FOUND, std::string("transaction '") + std::to_string(tid.id()) + "' not found"); + LOG_TOPIC("e94ea", DEBUG, Logger::TRANSACTIONS) << "Transaction with id " << tid << " not found"; + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_NOT_FOUND, std::string("transaction ") + std::to_string(tid.id()) + " not found"); } std::unique_ptr trx; if (ServerState::instance()->isDBServer() && !opOptions.isSynchronousReplicationFrom.empty()) { TRI_ASSERT(AccessMode::isWriteOrExclusive(type)); + // inject at least the required collection name trx = std::make_unique(std::move(ctx), collectionName, type); } else { trx = std::make_unique(std::move(ctx)); @@ -605,7 +619,7 @@ std::unique_ptr RestVocbaseBaseHandler::createTransaction( return trx; } -/// @brief create proper transaction context, inclusing the proper IDs +/// @brief create proper transaction context, including the proper IDs std::shared_ptr RestVocbaseBaseHandler::createTransactionContext(AccessMode::Type mode) const { bool found = false; std::string const& value = _request->header(StaticStrings::TransactionId, found); @@ -646,7 +660,7 @@ std::shared_ptr RestVocbaseBaseHandler::createTransactionC } } - auto ctx = mgr->leaseManagedTrx(tid, mode); + auto ctx = mgr->leaseManagedTrx(tid, mode, /*isSideUser*/ false); if (!ctx) { LOG_TOPIC("2cfed", DEBUG, Logger::TRANSACTIONS) << "Transaction with id '" << tid << "' not found"; THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_NOT_FOUND, diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 25f16a22e422..3901867d6d52 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -987,6 +987,8 @@ Result RocksDBCollection::insert(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const slice, arangodb::ManagedDocumentResult& resultMdr, OperationOptions& options) { + RocksDBTransactionStateGuard transactionStateGuard(RocksDBTransactionState::toState(trx)); + TRI_ASSERT(!RocksDBTransactionState::toState(trx)->isReadOnlyTransaction()); ::WriteTimeTracker timeTracker(_statistics._rocksdb_insert_sec, _statistics, options); @@ -1053,6 +1055,8 @@ Result RocksDBCollection::update(transaction::Methods* trx, velocypack::Slice newSlice, ManagedDocumentResult& resultMdr, OperationOptions& options, ManagedDocumentResult& previousMdr) { + RocksDBTransactionStateGuard transactionStateGuard(RocksDBTransactionState::toState(trx)); + TRI_ASSERT(!RocksDBTransactionState::toState(trx)->isReadOnlyTransaction()); ::WriteTimeTracker timeTracker(_statistics._rocksdb_update_sec, _statistics, options); @@ -1164,6 +1168,8 @@ Result RocksDBCollection::replace(transaction::Methods* trx, velocypack::Slice newSlice, ManagedDocumentResult& resultMdr, OperationOptions& options, ManagedDocumentResult& previousMdr) { + RocksDBTransactionStateGuard transactionStateGuard(RocksDBTransactionState::toState(trx)); + TRI_ASSERT(!RocksDBTransactionState::toState(trx)->isReadOnlyTransaction()); ::WriteTimeTracker timeTracker(_statistics._rocksdb_replace_sec, _statistics, options); @@ -1274,6 +1280,8 @@ Result RocksDBCollection::replace(transaction::Methods* trx, Result RocksDBCollection::remove(transaction::Methods& trx, velocypack::Slice slice, ManagedDocumentResult& previousMdr, OperationOptions& options) { + RocksDBTransactionStateGuard transactionStateGuard(RocksDBTransactionState::toState(&trx)); + TRI_ASSERT(!RocksDBTransactionState::toState(&trx)->isReadOnlyTransaction()); ::WriteTimeTracker timeTracker(_statistics._rocksdb_remove_sec, _statistics, options); diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index c749e7130241..072cd9e64f82 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -76,6 +76,9 @@ RocksDBTransactionState::RocksDBTransactionState(TRI_vocbase_t& vocbase, Transac _numUpdates(0), _numRemoves(0), _numRollbacks(0), +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + _users(0), +#endif _parallel(false) {} /// @brief free a transaction container @@ -84,6 +87,16 @@ RocksDBTransactionState::~RocksDBTransactionState() { _status = transaction::Status::ABORTED; } +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE +void RocksDBTransactionState::use() noexcept { + TRI_ASSERT(_users.fetch_add(1, std::memory_order_relaxed) == 0); +} + +void RocksDBTransactionState::unuse() noexcept { + TRI_ASSERT(_users.fetch_sub(1, std::memory_order_relaxed) == 1); +} +#endif + /// @brief start a transaction Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { LOG_TRX("0c057", TRACE, this) @@ -751,7 +764,18 @@ rocksdb::SequenceNumber RocksDBTransactionState::beginSeq() const { rocksdb::TransactionDB* db = engine.db(); return db->GetLatestSequenceNumber(); } - + +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE +RocksDBTransactionStateGuard::RocksDBTransactionStateGuard(RocksDBTransactionState* state) noexcept + : _state(state) { + _state->use(); +} + +RocksDBTransactionStateGuard::~RocksDBTransactionStateGuard() { + _state->unuse(); +} +#endif + /// @brief constructor, leases a builder RocksDBKeyLeaser::RocksDBKeyLeaser(transaction::Methods* trx) : _ctx(trx->transactionContextPtr()), diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.h b/arangod/RocksDBEngine/RocksDBTransactionState.h index e0f40a0ef05d..fc6e21115b27 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.h +++ b/arangod/RocksDBEngine/RocksDBTransactionState.h @@ -38,6 +38,10 @@ #include "VocBase/Identifiers/IndexId.h" #include "VocBase/voc-types.h" +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE +#include +#endif + struct TRI_vocbase_t; namespace rocksdb { @@ -169,6 +173,12 @@ class RocksDBTransactionState final : public TransactionState { rocksdb::SequenceNumber beginSeq() const; +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + /// @brief only needed for RocksDBTransactionStateGuard + void use() noexcept; + void unuse() noexcept; +#endif + private: /// @brief create a new rocksdb transaction void createTransaction(); @@ -223,10 +233,29 @@ class RocksDBTransactionState final : public TransactionState { /// resetted on intermediate commit uint64_t _numRollbacks; +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + std::atomic _users; +#endif + /// @brief if true there key buffers will no longer be shared bool _parallel; }; +/// @brief a struct that makes sure that the same RocksDBTransactionState +/// is not used by different write operations in parallel. will only do +/// something in maintainer mode, and do nothing in release mode! +struct RocksDBTransactionStateGuard { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + explicit RocksDBTransactionStateGuard(RocksDBTransactionState* state) noexcept; + ~RocksDBTransactionStateGuard(); + + RocksDBTransactionState* _state; +#else + explicit RocksDBTransactionStateGuard(RocksDBTransactionState* /*state*/) noexcept {} + ~RocksDBTransactionStateGuard() = default; +#endif +}; + class RocksDBKeyLeaser { public: explicit RocksDBKeyLeaser(transaction::Methods*); diff --git a/arangod/Transaction/ClusterUtils.cpp b/arangod/Transaction/ClusterUtils.cpp index 6c8d78cae86d..5a1b259e028e 100644 --- a/arangod/Transaction/ClusterUtils.cpp +++ b/arangod/Transaction/ClusterUtils.cpp @@ -43,10 +43,10 @@ void abortTransactions(LogicalCollection& coll) { } bool didWork = mgr->abortManagedTrx( - [&coll](TransactionState const& state, std::string const & /*user*/) -> bool { + [&coll](TransactionState const& state, std::string const& /*user*/) -> bool { TransactionCollection* tcoll = state.collection(coll.id(), AccessMode::Type::NONE); - return tcoll != nullptr; - }); + return tcoll != nullptr; + }); LOG_TOPIC_IF("7eda2", INFO, Logger::TRANSACTIONS, didWork) << "aborted leader transactions on shard " << coll.id() << "'"; @@ -58,7 +58,7 @@ void abortLeaderTransactionsOnShard(DataSourceId cid) { TRI_ASSERT(mgr != nullptr); bool didWork = mgr->abortManagedTrx( - [cid](TransactionState const& state, std::string const & /*user*/) -> bool { + [cid](TransactionState const& state, std::string const& /*user*/) -> bool { if (state.id().isLeaderTransactionId()) { TransactionCollection* tcoll = state.collection(cid, AccessMode::Type::NONE); return tcoll != nullptr; @@ -76,7 +76,7 @@ void abortFollowerTransactionsOnShard(DataSourceId cid) { TRI_ASSERT(mgr != nullptr); bool didWork = mgr->abortManagedTrx( - [cid](TransactionState const& state, std::string const & /*user*/) -> bool { + [cid](TransactionState const& state, std::string const& /*user*/) -> bool { if (state.id().isFollowerTransactionId()) { TransactionCollection* tcoll = state.collection(cid, AccessMode::Type::NONE); return tcoll != nullptr; @@ -120,7 +120,7 @@ void abortTransactionsWithFailedServers(ClusterInfo& ci) { // abort all transaction started by a certain coordinator didWork = mgr->abortManagedTrx( - [&](TransactionState const& state, std::string const & /*user*/) -> bool { + [&](TransactionState const& state, std::string const& /*user*/) -> bool { uint32_t serverId = state.id().serverId(); if (serverId != 0) { ServerID coordId = ci.getCoordinatorByShortID(serverId); diff --git a/arangod/Transaction/Manager.cpp b/arangod/Transaction/Manager.cpp index f693322e9de3..23cd88c7fd34 100644 --- a/arangod/Transaction/Manager.cpp +++ b/arangod/Transaction/Manager.cpp @@ -70,6 +70,7 @@ bool authorized(std::string const& user) { } std::string currentUser() { return arangodb::ExecContext::current().user(); } + } // namespace namespace arangodb { @@ -143,6 +144,8 @@ uint64_t Manager::getActiveTransactionCount() { auto role = ServerState::instance()->getRole(); if ((ServerState::isSingleServer(role) || ServerState::isCoordinator(role))) { + TRI_IF_FAILURE("lowStreamingIdleTimeout") { return 5.0; } + return feature.streamingIdleTimeout(); } return idleTTLDBServer; @@ -154,6 +157,7 @@ Manager::ManagedTrx::ManagedTrx(ManagerFeature const& feature, MetaType t, doubl : type(t), intermediateCommits(false), wasExpired(false), + sideUsers(0), finalStatus(Status::UNDEFINED), timeToLive(ttl), expiryTime(TRI_microtime() + Manager::ttlForType(feature, t)), @@ -186,7 +190,7 @@ Manager::ManagedTrx::~ManagedTrx() { try { transaction::Options opts; transaction::ManagedContext ctx(TransactionId{2}, state, - /*responsibleForCommit*/ true); + /*responsibleForCommit*/ true, /*cloned*/ false); MGMethods trx( std::shared_ptr( std::shared_ptr(), &ctx), opts); // own state now @@ -700,8 +704,10 @@ Result Manager::ensureManagedTrx(TRI_vocbase_t& vocbase, TransactionId tid, /// @brief lease the transaction, increases nesting std::shared_ptr Manager::leaseManagedTrx(TransactionId tid, - AccessMode::Type mode) { + AccessMode::Type mode, + bool isSideUser) { TRI_ASSERT(mode != AccessMode::Type::NONE); + if (_disallowInserts.load(std::memory_order_acquire)) { return nullptr; } @@ -720,9 +726,10 @@ std::shared_ptr Manager::leaseManagedTrx(TransactionId tid mode = AccessMode::Type::WRITE; } + TRI_ASSERT(!isSideUser || AccessMode::isRead(mode)); + size_t const bucket = getBucket(tid); int i = 0; - std::shared_ptr state; do { READ_LOCKER(locker, _transactions[bucket]._lock); @@ -743,19 +750,41 @@ std::shared_ptr Manager::leaseManagedTrx(TransactionId tid "not allowed to write lock an AQL transaction"); } if (mtrx.rwlock.tryLockWrite()) { - state = mtrx.state; - break; + return buildManagedContextUnderLock(tid, mtrx); } + // continue the loop after a small pause } else { + TRI_ASSERT(mode == AccessMode::Type::READ); + // even for side user leases, first try acquiring the read lock if (mtrx.rwlock.tryLockRead()) { - TRI_ASSERT(mode == AccessMode::Type::READ); - state = mtrx.state; - break; + return buildManagedContextUnderLock(tid, mtrx); + } + if (isSideUser) { + // number of side users is atomically increased under the bucket's read lock. + // due to us holding the bucket's read lock here, there can be no other threads + // concurrently aborting/commiting the transaction (these operations acquire + // the write lock on the transaction's bucket). + mtrx.sideUsers.fetch_add(1, std::memory_order_relaxed); + // note: we are intentionally _not_ acquiring the lock on the transaction here, + // as we expect another operation to have acquired it already! + try { + std::shared_ptr state = mtrx.state; + TRI_ASSERT(state != nullptr); + return std::make_shared(tid, std::move(state), TransactionContextSideUser{}); + } catch (...) { + // roll back our increase of the number of side users + auto previous = mtrx.sideUsers.fetch_sub(1, std::memory_order_relaxed); + TRI_ASSERT(previous > 0); + throw; + } } + THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_LOCKED, std::string("cannot read-lock, transaction ") + std::to_string(tid.id()) + " is already in use"); } + + locker.unlock(); // failure; // simon: never allow concurrent use of transactions // either busy loop until we get the lock or throw an error @@ -763,8 +792,6 @@ std::shared_ptr Manager::leaseManagedTrx(TransactionId tid LOG_TOPIC("abd72", TRACE, Logger::TRANSACTIONS) << "transaction " << tid << " is already in use (RO)"; - locker.unlock(); // failure; - // simon: Two allowed scenarios: // 1. User sends concurrent write (CRUD) requests, (which was never intended to be possible) // but now we do have to kind of support it otherwise shitty apps break @@ -787,18 +814,10 @@ std::shared_ptr Manager::leaseManagedTrx(TransactionId tid } std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } while (true); - - if (state) { - return std::make_shared(tid, std::move(state), - /*responsibleForCommit*/ false); - } - TRI_ASSERT(false); // should be unreachable - return nullptr; } -void Manager::returnManagedTrx(TransactionId tid) noexcept { +void Manager::returnManagedTrx(TransactionId tid, bool isSideUser) noexcept { bool isSoftAborted = false; { @@ -815,13 +834,24 @@ void Manager::returnManagedTrx(TransactionId tid) noexcept { TRI_ASSERT(it->second.state != nullptr); - // garbageCollection might soft abort used transactions - isSoftAborted = it->second.expiryTime == 0; - if (!isSoftAborted) { - it->second.updateExpiry(); - } + if (isSideUser) { + // number of side users is atomically decreased under the bucket's read lock. + // due to us holding the bucket's read lock here, there can be no other threads + // concurrently aborting/commiting the transaction (these operations acquire + // the write lock on the transaction's bucket). + auto previous = it->second.sideUsers.fetch_sub(1, std::memory_order_relaxed); + TRI_ASSERT(previous > 0); + // note: we are intentionally _not_ releasing the lock on the transaction here, + // because we have not acquired it before! + } else { + // garbageCollection might soft abort used transactions + isSoftAborted = it->second.expiryTime == 0; + if (!isSoftAborted) { + it->second.updateExpiry(); + } - it->second.rwlock.unlock(); + it->second.rwlock.unlock(); + } } // it is important that we release the write lock for the bucket here, @@ -832,6 +862,7 @@ void Manager::returnManagedTrx(TransactionId tid) noexcept { TRI_IF_FAILURE("returnManagedTrxForceSoftAbort") { isSoftAborted = true; } if (isSoftAborted) { + TRI_ASSERT(!isSideUser); abortManagedTrx(tid, "" /* any database */); } } @@ -862,7 +893,7 @@ transaction::Status Manager::getManagedTrxStatus(TransactionId tid, Result Manager::statusChangeWithTimeout(TransactionId tid, std::string const& database, transaction::Status status) { double startTime = 0.0; - constexpr double maxWaitTime = 2.0; + constexpr double maxWaitTime = 3.0; Result res; while (true) { res = updateTransaction(tid, status, false, database); @@ -920,8 +951,10 @@ Result Manager::updateTransaction(TransactionId tid, transaction::Status status, } return res.reset(TRI_ERROR_TRANSACTION_NOT_FOUND, std::move(msg)); } - if (!::authorized(it->second.user) || - (!database.empty() && it->second.db != database)) { + + ManagedTrx& mtrx = it->second; + if (!::authorized(mtrx.user) || + (!database.empty() && mtrx.db != database)) { std::string msg = "transaction " + std::to_string(tid.id()); if (it == buck._managed.end()) { msg += " not found"; @@ -937,14 +970,22 @@ Result Manager::updateTransaction(TransactionId tid, transaction::Status status, return res.reset(TRI_ERROR_TRANSACTION_NOT_FOUND, std::move(msg)); } - ManagedTrx& mtrx = it->second; + + // in order to modify the transaction's status, we need the write lock here, + // plus we must ensure that the number of sideUsers is 0. TRY_WRITE_LOCKER(tryGuard, mtrx.rwlock); - if (!tryGuard.isLocked()) { - LOG_TOPIC("dfc30", DEBUG, Logger::TRANSACTIONS) << "transaction " << tid << " is in use"; - return res.reset(TRI_ERROR_LOCKED, - std::string("read lock failed, transaction ") + - std::to_string(tid.id()) + " is in use"); + bool canAccessTrx = tryGuard.isLocked(); + if (canAccessTrx) { + canAccessTrx &= (mtrx.sideUsers.load(std::memory_order_relaxed) == 0); } + if (!canAccessTrx) { + std::string msg("updating transaction status failed. transaction "); + msg.append(std::to_string(tid.id())); + msg.append(" is in use"); + LOG_TOPIC("dfc30", DEBUG, Logger::TRANSACTIONS) << msg; + return res.reset(TRI_ERROR_LOCKED, std::move(msg)); + } + TRI_ASSERT(tryGuard.isLocked()); if (mtrx.type == MetaType::StandaloneAQL) { @@ -984,6 +1025,7 @@ Result Manager::updateTransaction(TransactionId tid, transaction::Status status, std::swap(state, mtrx.state); TRI_ASSERT(mtrx.state == nullptr); + // type is changed under the transaction's write lock and the bucket's write lock mtrx.type = MetaType::Tombstone; if (state->numCommits() > 0) { // note that we have performed a commit or an intermediate commit. @@ -1020,7 +1062,7 @@ Result Manager::updateTransaction(TransactionId tid, transaction::Status status, transaction::Options trxOpts; MGMethods trx(std::make_shared(tid, std::move(state), - /*responsibleForCommit*/ true), + /*responsibleForCommit*/ true, /*cloned*/ false), trxOpts); TRI_ASSERT(trx.state()->isRunning()); TRI_ASSERT(trx.isMainTransaction()); @@ -1194,9 +1236,8 @@ bool Manager::abortManagedTrx(std::function Manager::buildManagedContextUnderLock(TransactionId tid, Manager::ManagedTrx& mtrx) { + try { + std::shared_ptr state = mtrx.state; + // the make_shared can throw, and in this case it is important that we + // release the lock we have + return std::make_shared(tid, std::move(state), /*responsibleForCommit*/ false, /*cloned*/ false); + } catch (...) { + // release lock in case something went wrong + mtrx.rwlock.unlock(); + throw; + } +} + } // namespace transaction } // namespace arangodb diff --git a/arangod/Transaction/Manager.h b/arangod/Transaction/Manager.h index b9523d40c951..4bd9647ba992 100644 --- a/arangod/Transaction/Manager.h +++ b/arangod/Transaction/Manager.h @@ -30,6 +30,7 @@ #include "Basics/ResultT.h" #include "Cluster/CallbackGuard.h" #include "Logger/LogMacros.h" +#include "Transaction/SmartContext.h" #include "Transaction/Status.h" #include "VocBase/AccessMode.h" #include "VocBase/Identifiers/TransactionId.h" @@ -85,6 +86,13 @@ class Manager final { bool intermediateCommits; /// @brief whether or not the transaction did expire at least once bool wasExpired; + + /// @brief number of (reading) side users of the transaction. this number + /// is currently only increased on DB servers when they handle incoming + /// requests by the AQL document function. while this number is > 0, there + /// are still read requests ongoing, and the transaction status cannot be + /// changed to committed/aborted. + std::atomic sideUsers; /// @brief final TRX state that is valid if this is a tombstone /// necessary to avoid getting error on a 'diamond' commit or accidentally /// repeated commit / abort messages @@ -93,8 +101,8 @@ class Manager final { double expiryTime; // time this expires std::shared_ptr state; /// Transaction, may be nullptr arangodb::cluster::CallbackGuard rGuard; - std::string user; /// user owning the transaction - std::string db; /// database in which the transaction operates + std::string const user; /// user owning the transaction + std::string const db; /// database in which the transaction operates /// cheap usage lock for _state mutable basics::ReadWriteSpinLock rwlock; }; @@ -151,10 +159,9 @@ class Manager final { Result beginTransaction(transaction::Hints hints, std::shared_ptr& state); /// @brief lease the transaction, increases nesting - std::shared_ptr leaseManagedTrx(TransactionId tid, - AccessMode::Type mode); - void returnManagedTrx(TransactionId) noexcept; - + std::shared_ptr leaseManagedTrx(TransactionId tid, AccessMode::Type mode, bool isSideUser); + void returnManagedTrx(TransactionId, bool isSideUser) noexcept; + /// @brief get the meta transasction state transaction::Status getManagedTrxStatus(TransactionId, std::string const& database) const; @@ -235,6 +242,8 @@ class Manager final { return std::hash()(tid) % numBuckets; } + std::shared_ptr buildManagedContextUnderLock(TransactionId tid, ManagedTrx& mtrx); + Result updateTransaction(TransactionId tid, transaction::Status status, bool clearServers, std::string const& database = "" /* leave empty to operate across all databases */); diff --git a/arangod/Transaction/ManagerFeature.cpp b/arangod/Transaction/ManagerFeature.cpp index 996225764209..e647c690094d 100644 --- a/arangod/Transaction/ManagerFeature.cpp +++ b/arangod/Transaction/ManagerFeature.cpp @@ -41,34 +41,6 @@ using namespace arangodb::application_features; using namespace arangodb::basics; using namespace arangodb::options; -namespace { -void queueGarbageCollection(std::mutex& mutex, arangodb::Scheduler::WorkHandle& workItem, - std::function& gcfunc) { - bool queued = false; - { - std::lock_guard guard(mutex); - std::tie(queued, workItem) = - arangodb::basics::function_utils::retryUntilTimeout( - [&gcfunc]() -> std::pair { - auto off = std::chrono::seconds(2); - // The RequestLane needs to be something which is `HIGH` priority, otherwise - // all threads executing this might be blocking, waiting for a lock to be - // released. - return arangodb::SchedulerFeature::SCHEDULER->queueDelay(arangodb::RequestLane::CLUSTER_INTERNAL, - off, gcfunc); - }, - arangodb::Logger::TRANSACTIONS, - "queue transaction garbage collection"); - } - if (!queued) { - LOG_TOPIC("f8b3d", FATAL, arangodb::Logger::TRANSACTIONS) - << "Failed to queue transaction garbage collection, for 5 minutes, " - "exiting."; - FATAL_ERROR_EXIT(); - } -} -} // namespace - namespace arangodb { namespace transaction { @@ -78,8 +50,6 @@ std::unique_ptr ManagerFeature::MANAGER; ManagerFeature::ManagerFeature(application_features::ApplicationServer& server) : ApplicationFeature(server, "TransactionManager"), - _workItem(nullptr), - _gcfunc(), _streamingLockTimeout(8.0), _streamingIdleTimeout(defaultStreamingIdleTimeout), _numExpiredTransactions( @@ -99,7 +69,7 @@ ManagerFeature::ManagerFeature(application_features::ApplicationServer& server) MANAGER->garbageCollect(/*abortAll*/false); if (!this->server().isStopping()) { - ::queueGarbageCollection(_workItemMutex, _workItem, _gcfunc); + queueGarbageCollection(); } }; } @@ -140,7 +110,7 @@ void ManagerFeature::prepare() { void ManagerFeature::start() { Scheduler* scheduler = SchedulerFeature::SCHEDULER; if (scheduler != nullptr) { // is nullptr in catch tests - ::queueGarbageCollection(_workItemMutex, _workItem, _gcfunc); + queueGarbageCollection(); } } @@ -191,6 +161,31 @@ void ManagerFeature::unprepare() { MANAGER.reset(); } +void ManagerFeature::queueGarbageCollection() { + auto [queued, workItem] = + arangodb::basics::function_utils::retryUntilTimeout( + [this]() -> std::pair { + auto off = std::chrono::seconds(2); + // The RequestLane needs to be something which is `HIGH` priority, otherwise + // all threads executing this might be blocking, waiting for a lock to be + // released. + return arangodb::SchedulerFeature::SCHEDULER->queueDelay(arangodb::RequestLane::CLUSTER_INTERNAL, + off, _gcfunc); + }, + arangodb::Logger::TRANSACTIONS, + "queue transaction garbage collection"); + + if (queued) { + std::lock_guard guard(_workItemMutex); + _workItem = std::move(workItem); + } else { + LOG_TOPIC("f8b3d", FATAL, arangodb::Logger::TRANSACTIONS) + << "Failed to queue transaction garbage collection, for 5 minutes, " + "exiting."; + FATAL_ERROR_EXIT(); + } +} + void ManagerFeature::trackExpired(uint64_t numExpired) { if (numExpired > 0) { _numExpiredTransactions.count(numExpired); diff --git a/arangod/Transaction/ManagerFeature.h b/arangod/Transaction/ManagerFeature.h index 0e07f99d930a..5c7466374293 100644 --- a/arangod/Transaction/ManagerFeature.h +++ b/arangod/Transaction/ManagerFeature.h @@ -60,6 +60,8 @@ class ManagerFeature final : public application_features::ApplicationFeature { void trackExpired(uint64_t numExpired); private: + void queueGarbageCollection(); + static constexpr double defaultStreamingIdleTimeout = 60.0; static constexpr double maxStreamingIdleTimeout = 120.0; diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index f593e067cbe4..4b95464ccb14 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -749,6 +749,7 @@ TRI_col_type_e transaction::Methods::getCollectionType(std::string const& collec /// Does not care for revision handling! Result transaction::Methods::documentFastPath(std::string const& collectionName, VPackSlice value, + OperationOptions const& options, VPackBuilder& result) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!value.isObject() && !value.isString()) { @@ -757,7 +758,6 @@ Result transaction::Methods::documentFastPath(std::string const& collectionName, } if (_state->isCoordinator()) { - OperationOptions options; OperationResult opRes = documentCoordinator(collectionName, value, options).get(); if (!opRes.fail()) { result.add(opRes.slice()); @@ -857,7 +857,7 @@ Future transaction::Methods::documentAsync(std::string const& c /// @brief read one or multiple documents in a collection, coordinator #ifndef USE_ENTERPRISE Future transaction::Methods::documentCoordinator( - std::string const& collectionName, VPackSlice value, OperationOptions& options) { + std::string const& collectionName, VPackSlice value, OperationOptions const& options) { if (!value.isArray()) { arangodb::velocypack::StringRef key(transaction::helpers::extractKeyPart(value)); if (key.empty()) { @@ -877,7 +877,7 @@ Future transaction::Methods::documentCoordinator( /// @brief read one or multiple documents in a collection, local Future transaction::Methods::documentLocal(std::string const& collectionName, VPackSlice value, - OperationOptions& options) { + OperationOptions const& options) { DataSourceId cid = addCollectionAtRuntime(collectionName, AccessMode::Type::READ); std::shared_ptr const& collection = trxCollection(cid)->collection(); diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index 32d207a319c7..6078e4bd1406 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -257,6 +257,7 @@ class Methods { /// it is already locked!) ENTERPRISE_VIRT Result documentFastPath(std::string const& collectionName, arangodb::velocypack::Slice value, + OperationOptions const& options, arangodb::velocypack::Builder& result); /// @brief return one document from a collection, fast path @@ -411,10 +412,10 @@ class Methods { Future documentCoordinator(std::string const& collectionName, VPackSlice value, - OperationOptions& options); + OperationOptions const& options); Future documentLocal(std::string const& collectionName, - VPackSlice value, OperationOptions& options); + VPackSlice value, OperationOptions const& options); Future insertCoordinator(std::string const& collectionName, VPackSlice value, diff --git a/arangod/Transaction/SmartContext.cpp b/arangod/Transaction/SmartContext.cpp index aeb000861760..145f77d4272c 100644 --- a/arangod/Transaction/SmartContext.cpp +++ b/arangod/Transaction/SmartContext.cpp @@ -43,15 +43,7 @@ SmartContext::SmartContext(TRI_vocbase_t& vocbase, TRI_ASSERT(_globalId.isSet()); } -SmartContext::~SmartContext() { -// if (_state) { -// if (_state->isTopLevelTransaction()) { -// std::this_thread::sleep_for(std::chrono::seconds(60)); -// TRI_ASSERT(false); // probably should not happen -// delete _state; -// } -// } -} +SmartContext::~SmartContext() = default; /// @brief order a custom type handler for the collection arangodb::velocypack::CustomTypeHandler* transaction::SmartContext::orderCustomTypeHandler() { @@ -84,16 +76,36 @@ ManagedContext::ManagedContext(TransactionId globalId, std::shared_ptr state, bool responsibleForCommit, bool cloned) : SmartContext(state->vocbase(), globalId, state), - _responsibleForCommit(responsibleForCommit), _cloned(cloned) {} + _responsibleForCommit(responsibleForCommit), + _cloned(cloned), + _isSideUser(false) {} + +ManagedContext::ManagedContext(TransactionId globalId, + std::shared_ptr state, + TransactionContextSideUser /*sideUser*/) + : SmartContext(state->vocbase(), globalId, state), + _responsibleForCommit(false), + _cloned(true), + _isSideUser(true) {} ManagedContext::~ManagedContext() { + bool doReturn = false; + if (_state != nullptr && !_cloned) { TRI_ASSERT(!_responsibleForCommit); - + TRI_ASSERT(!_isSideUser); + doReturn = true; + } else if (_isSideUser) { + TRI_ASSERT(!_responsibleForCommit); + TRI_ASSERT(_cloned); + doReturn = true; + } + + if (doReturn) { + // we are responsible for returning the lease for the managed transaction transaction::Manager* mgr = transaction::ManagerFeature::manager(); TRI_ASSERT(mgr != nullptr); - mgr->returnManagedTrx(_globalId); - _state = nullptr; + mgr->returnManagedTrx(_globalId, _isSideUser); } } @@ -115,7 +127,7 @@ void ManagedContext::unregisterTransaction() noexcept { std::shared_ptr ManagedContext::clone() const { // cloned transactions may never be responsible for commits auto clone = std::make_shared(_globalId, _state, - /*responsibleForCommit*/false, /*cloned*/true); + /*responsibleForCommit*/ false, /*cloned*/ true); clone->_state = _state; return clone; } diff --git a/arangod/Transaction/SmartContext.h b/arangod/Transaction/SmartContext.h index 9386cb491bc2..d166cfd3f59d 100644 --- a/arangod/Transaction/SmartContext.h +++ b/arangod/Transaction/SmartContext.h @@ -76,12 +76,17 @@ class SmartContext : public Context { TransactionId const _globalId; std::shared_ptr _state; }; + +struct TransactionContextSideUser {}; /// @brief Acquire a transaction from the Manager struct ManagedContext final : public SmartContext { ManagedContext(TransactionId globalId, std::shared_ptr state, - bool responsibleForCommit, bool cloned = false); + bool responsibleForCommit, bool cloned); + + ManagedContext(TransactionId globalId, std::shared_ptr state, + TransactionContextSideUser /*sideUser*/); ~ManagedContext(); @@ -94,9 +99,10 @@ struct ManagedContext final : public SmartContext { std::shared_ptr clone() const override; -private: - const bool _responsibleForCommit; - const bool _cloned; + private: + bool const _responsibleForCommit; + bool const _cloned; + bool const _isSideUser; }; /// Used for a standalone AQL query. Always creates the state first. diff --git a/arangod/Utils/OperationOptions.cpp b/arangod/Utils/OperationOptions.cpp index 8a69a8baac7f..47ebe88f88d3 100644 --- a/arangod/Utils/OperationOptions.cpp +++ b/arangod/Utils/OperationOptions.cpp @@ -42,6 +42,7 @@ OperationOptions::OperationOptions() isRestore(false), checkUniqueConstraintsInPreflight(false), truncateCompact(true), + documentCallFromAql(false), _context(nullptr) {} OperationOptions::OperationOptions(ExecContext const& context) diff --git a/arangod/Utils/OperationOptions.h b/arangod/Utils/OperationOptions.h index 019db97e3f08..1097732b4b4a 100644 --- a/arangod/Utils/OperationOptions.h +++ b/arangod/Utils/OperationOptions.h @@ -135,6 +135,11 @@ struct OperationOptions { // defaults to true. bool truncateCompact; + // whether or not this request is a DOCUMENT() call from inside AQL. only set + // for exactly this case on a coordinator, in order to make it set a special + // header when putting together the requests for DB servers + bool documentCallFromAql; + // get associated execution context ExecContext const& context() const; diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index 3925922ed5bb..82a37ae911ba 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -360,6 +360,7 @@ std::string const StaticStrings::BackupSearchToDeleteName( // aql api strings std::string const StaticStrings::SerializationFormat("serializationFormat"); +std::string const StaticStrings::AqlDocumentCall("x-arango-aql-document-aql"); std::string const StaticStrings::AqlRemoteExecute("execute"); std::string const StaticStrings::AqlRemoteCallStack("callStack"); std::string const StaticStrings::AqlRemoteLimit("limit"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index 69950603065c..076a815ee865 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -332,6 +332,7 @@ class StaticStrings { // aql api strings static std::string const SerializationFormat; + static std::string const AqlDocumentCall; static std::string const AqlRemoteExecute; static std::string const AqlRemoteCallStack; static std::string const AqlRemoteLimit; diff --git a/tests/Transaction/Manager-test.cpp b/tests/Transaction/Manager-test.cpp index 2e1fcdca7a9b..8d0d4d59beac 100644 --- a/tests/Transaction/Manager-test.cpp +++ b/tests/Transaction/Manager-test.cpp @@ -154,7 +154,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_abort) { auto doc = arangodb::velocypack::Parser::fromJson("{ \"_key\": \"1\"}"); { - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE); @@ -169,7 +169,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_abort) { ASSERT_EQ(mgr->getManagedTrxStatus(tid, vocbase.name()), transaction::Status::RUNNING); { // lease again - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::READ); @@ -205,7 +205,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit) { ASSERT_TRUE(res.ok()); { - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE); @@ -249,7 +249,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit_is_follower) { ASSERT_TRUE(res.ok()); { - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE); @@ -288,7 +288,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit_while_in_use) { ASSERT_TRUE(res.ok()); { - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE); @@ -330,19 +330,19 @@ TEST_F(TransactionManagerTest, leading_multiple_readonly_transactions) { transaction::Options opts; bool responsible; - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::READ); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::READ, false); ASSERT_NE(ctx.get(), nullptr); auto state1 = ctx->acquireState(opts, responsible); ASSERT_NE(state1.get(), nullptr); ASSERT_TRUE(!responsible); - auto ctx2 = mgr->leaseManagedTrx(tid, AccessMode::Type::READ); + auto ctx2 = mgr->leaseManagedTrx(tid, AccessMode::Type::READ, false); ASSERT_NE(ctx2.get(), nullptr); auto state2 = ctx2->acquireState(opts, responsible); EXPECT_EQ(state1.get(), state2.get()); ASSERT_TRUE(!responsible); - auto ctx3 = mgr->leaseManagedTrx(tid, AccessMode::Type::READ); + auto ctx3 = mgr->leaseManagedTrx(tid, AccessMode::Type::READ, false); ASSERT_NE(ctx3.get(), nullptr); auto state3 = ctx3->acquireState(opts, responsible); EXPECT_EQ(state3.get(), state2.get()); @@ -369,12 +369,46 @@ TEST_F(TransactionManagerTest, lock_conflict) { transaction::Options opts; bool responsible; - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); auto state1 = ctx->acquireState(opts, responsible); ASSERT_NE(state1.get(), nullptr); ASSERT_TRUE(!responsible); - ASSERT_ANY_THROW(mgr->leaseManagedTrx(tid, AccessMode::Type::READ)); + ASSERT_ANY_THROW(mgr->leaseManagedTrx(tid, AccessMode::Type::READ, false)); + } + ASSERT_TRUE(mgr->abortManagedTrx(tid, vocbase.name()).ok()); + ASSERT_EQ(mgr->getManagedTrxStatus(tid, vocbase.name()), transaction::Status::ABORTED); +} + +TEST_F(TransactionManagerTest, lock_conflict_side_user) { + std::shared_ptr coll; + { + auto json = + VPackParser::fromJson("{ \"name\": \"testCollection\", \"id\": 42 }"); + coll = vocbase.createCollection(json->slice()); + } + ASSERT_NE(coll, nullptr); + + auto json = arangodb::velocypack::Parser::fromJson( + "{ \"collections\":{\"write\": [\"42\"]}}"); + Result res = mgr->ensureManagedTrx(vocbase, tid, json->slice(), false); + ASSERT_TRUE(res.ok()); + { + transaction::Options opts; + bool responsible; + + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); + ASSERT_NE(ctx.get(), nullptr); + auto state1 = ctx->acquireState(opts, responsible); + ASSERT_NE(state1.get(), nullptr); + ASSERT_TRUE(!responsible); + ASSERT_ANY_THROW(mgr->leaseManagedTrx(tid, AccessMode::Type::READ, false)); + + auto ctxSide = mgr->leaseManagedTrx(tid, AccessMode::Type::READ, true); + ASSERT_NE(ctxSide.get(), nullptr); + auto state2 = ctxSide->acquireState(opts, responsible); + ASSERT_NE(state2.get(), nullptr); + ASSERT_TRUE(!responsible); } ASSERT_TRUE(mgr->abortManagedTrx(tid, vocbase.name()).ok()); ASSERT_EQ(mgr->getManagedTrxStatus(tid, vocbase.name()), transaction::Status::ABORTED); @@ -397,7 +431,7 @@ TEST_F(TransactionManagerTest, garbage_collection_shutdown) { transaction::Options opts; bool responsible; - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); auto state1 = ctx->acquireState(opts, responsible); ASSERT_NE(state1.get(), nullptr); @@ -456,7 +490,7 @@ TEST_F(TransactionManagerTest, abort_transactions_with_matcher) { ASSERT_TRUE(res.ok()); { - auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE); + auto ctx = mgr->leaseManagedTrx(tid, AccessMode::Type::WRITE, false); ASSERT_NE(ctx.get(), nullptr); SingleCollectionTransaction trx(ctx, "testCollection", AccessMode::Type::WRITE); diff --git a/tests/js/client/chaos/test-chaos-load-common.inc b/tests/js/client/chaos/test-chaos-load-common.inc index 37f91d27a3a8..b1e759cc7503 100644 --- a/tests/js/client/chaos/test-chaos-load-common.inc +++ b/tests/js/client/chaos/test-chaos-load-common.inc @@ -141,6 +141,9 @@ function BaseChaosSuite(testOpts) { debugSetFailAt(getEndpointById(server.id), "replicateOperations_randomize_timeout"); debugSetFailAt(getEndpointById(server.id), "delayed_synchronous_replication_request_processing"); debugSetFailAt(getEndpointById(server.id), "Query::setupTimeout"); + debugSetFailAt(getEndpointById(server.id), "Query::setupLockTimeout"); + debugSetFailAt(getEndpointById(server.id), "Query::setupTimeoutFailSequence"); + debugSetFailAt(getEndpointById(server.id), "Query::setupTimeoutFailSequenceRandom"); debugSetFailAt(getEndpointById(server.id), "RocksDBCollection::insertFail1"); debugSetFailAt(getEndpointById(server.id), "RocksDBCollection::insertFail2"); debugSetFailAt(getEndpointById(server.id), "RocksDBCollection::modifyFail1"); @@ -153,6 +156,7 @@ function BaseChaosSuite(testOpts) { assertTrue(servers.length > 0); for (const server of servers) { debugSetFailAt(getEndpointById(server.id), "Query::setupTimeout"); + debugSetFailAt(getEndpointById(server.id), "Query::setupTimeoutFailSequence"); } } }, @@ -178,7 +182,7 @@ function BaseChaosSuite(testOpts) { const docs = () => { let result = []; const max = 2000; - const r = Math.floor(Math.random() * max) + 1; + let r = Math.floor(Math.random() * max) + 1; if (r > (max * 0.8)) { // we want ~20% of all requests to be single document operations r = 1; @@ -247,16 +251,29 @@ function BaseChaosSuite(testOpts) { const limit = Math.floor(Math.random() * 2000); log(`RUNNING AQL REPLACE WITH LIMIT=${limit}. OPTIONS: ${JSON.stringify(o)}`); query("FOR doc IN " + c.name() + " LIMIT @limit REPLACE doc WITH { pfihg: 434, fjgjg: 555 } IN " + c.name(), {limit}, o); + } else if (d >= 0.68) { + const limit = Math.floor(Math.random() * 10) + 1; + log(`RUNNING DOCUMENT SINGLE LOOKUP QUERY WITH LIMIT=${limit}`); + query("FOR doc IN " + c.name() + " LIMIT @limit RETURN DOCUMENT(doc._id)", {limit}); + } else if (d >= 0.66) { + const limit = Math.floor(Math.random() * 10) + 1; + log(`RUNNING DOCUMENT ARRAY LOOKUP QUERY WITH LIMIT=${limit}`); + query("LET keys = (FOR doc IN " + c.name() + " LIMIT @limit RETURN doc._id) RETURN DOCUMENT(keys)", {limit}); + } else if (d >= 0.65) { + let o = opts(); + const limit = Math.floor(Math.random() * 10) + 1; + log(`RUNNING DOCUMENT LOOKUP AND WRITE QUERY WITH LIMIT=${limit}. OPTIONS: ${JSON.stringify(0)}`); + query("FOR doc IN " + c.name() + " LIMIT @limit LET d = DOCUMENT(doc._id) INSERT UNSET(doc, '_key') INTO " + c.name(), {limit}, o); } else if (d >= 0.25) { let o = opts(); let d = docs(); log(`RUNNING INSERT WITH ${d.length} DOCS. OPTIONS: ${JSON.stringify(o)}`); - d = d.length == 1 ? d[0] : d; + d = d.length === 1 ? d[0] : d; c.insert(d, o); } else { let d = docs(); log(`RUNNING REMOVE WITH ${d.length} DOCS`); - d = d.length == 1 ? d[0] : d; + d = d.length === 1 ? d[0] : d; c.remove(d); } } catch (err) {} @@ -302,7 +319,7 @@ const makeConfig = (paramValues) => { opts["with" + params[j]] = paramValues[j]; } return { suffix: suffix, options: opts }; -} +}; const run = () => { if (!global.currentTestConfig) { @@ -322,7 +339,7 @@ const run = () => { jsunity.run(func); return jsunity.done(); -} +}; module.exports.parameters = params; module.exports.makeConfig = makeConfig; diff --git a/tests/js/client/shell/shell-transaction.js b/tests/js/client/shell/shell-transaction.js index a65246c30805..f05f972f545e 100644 --- a/tests/js/client/shell/shell-transaction.js +++ b/tests/js/client/shell/shell-transaction.js @@ -4309,47 +4309,47 @@ function transactionTTLStreamSuite () { let c; return { - - // ////////////////////////////////////////////////////////////////////////////// - // / @brief set up - // ////////////////////////////////////////////////////////////////////////////// - setUp: function () { db._drop(cn); c = db._create(cn, {numberOfShards: 2, replicationFactor: 2}); }, - // ////////////////////////////////////////////////////////////////////////////// - // / @brief tear down - // ////////////////////////////////////////////////////////////////////////////// - tearDown: function () { db._drop(cn); }, - // ////////////////////////////////////////////////////////////////////////////// // / @brief test: abort idle transactions // ////////////////////////////////////////////////////////////////////////////// testAbortIdleTrx: function () { - let trx = db._createTransaction({ - collections: { write: cn } - }); + try { + internal.debugSetFailAt("lowStreamingIdleTimeout"); + } catch (err) {} + + try { + let trx = db._createTransaction({ + collections: { write: cn } + }); - trx.collection(cn).save({value:'val'}); + trx.collection(cn).save({value:'val'}); - let x = 60; - do { - internal.sleep(1); + let x = 60; + do { + internal.sleep(1); - if (trx.status().status === "aborted") { - return; - } + if (trx.status().status === "aborted") { + return; + } - } while(--x > 0); - if (x <= 0) { - fail(); // should not be reached + } while(--x > 0); + if (x <= 0) { + fail(); // should not be reached + } + } finally { + try { + internal.debugRemoveFailAt("lowStreamingIdleTimeout"); + } catch (err) {} } } }; @@ -4357,8 +4357,8 @@ function transactionTTLStreamSuite () { function transactionIteratorSuite() { 'use strict'; - var cn = 'UnitTestsTransaction'; - var c = null; + let cn = 'UnitTestsTransaction'; + let c; return {