10000 fix locking of write transactions on DB servers (#14450) · arangodb/arangodb@477186f · GitHub
[go: up one dir, main page]

Skip to content

Commit 477186f

Browse files
authored
fix locking of write transactions on DB servers (#14450)
1 parent 5e917d4 commit 477186f

33 files changed

+526
-213
lines changed

CHANGELOG

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ devel
77
* Lower log level to warning, when take over shard leadership finds
88
an agency Current entry is missing the server taking over.
99

10+
* Fix locking of AQL queries write queries on DB servers.
11+
1012
* APM-112: invalid use of OPTIONS in AQL queries will now raise a warning in
1113
the query.
1214
The feature is useful to detect misspelled attribute names in OPTIONS, e.g.

arangod/Aql/ClusterQuery.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "Aql/QueryRegistry.h"
3131
#include "Aql/QueryProfile.h"
3232
#include "Cluster/ServerState.h"
33+
#include "Random/RandomGenerator.h"
3334
#include "StorageEngine/TransactionState.h"
3435
#include "Transaction/Context.h"
3536
#include "RestServer/QueryRegistryFeature.h"
@@ -102,6 +103,13 @@ void ClusterQuery::prepareClusterQuery(VPackSlice querySlice,
102103
if (_trx->state()->isDBServer()) {
103104
_trx->state()->acceptAnalyzersRevision(analyzersRevision);
104105
}
106+
107+
TRI_IF_FAILURE("Query::setupLockTimeout") {
108+
if (RandomGenerator::interval(uint32_t(100)) >= 95) {
109+
THROW_ARANGO_EXCEPTION(TRI_ERROR_LOCK_TIMEOUT);
110+
}
111+
}
112+
105113
Result res = _trx->begin();
106114
if (!res.ok()) {
107115
THROW_ARANGO_EXCEPTION(res);

arangod/Aql/EngineInfoContainerDBServerServerBased.cpp

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,20 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
402402
ErrorCode cleanupReason = TRI_ERROR_CLUSTER_TIMEOUT;
403403

404404
auto cleanupGuard = scopeGuard([this, &serverToQueryId, &cleanupReason]() {
405-
// Fire and forget
406-
std::ignore = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId);
405+
try {
406+
transaction::Methods& trx = _query.trxForOptimization();
407+
auto requests = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId);
408+
if (!trx.isMainTransaction()) {
409+
// for AQL queries in streaming transactions, we will wait for the
410+
// complete shutdown to have finished before we return to the caller.
411+
// this is done so that there will be no 2 AQL queries in the same
412+
// streaming transaction at the same time
413+
futures::collectAll(requests).wait();
414+
}
415+
} catch (std::exception const& ex) {
416+
LOG_TOPIC("2a9fe", WARN, Logger::AQL)
417+
<< "unable to clean up query snippets: " << ex.what();
418+
}
407419
});
408420

409421
NetworkFeature const& nf = _query.vocbase().server().getFeature<NetworkFeature>();
@@ -412,6 +424,9 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
412424
// nullptr only happens on controlled shutdown
413425
return {TRI_ERROR_SHUTTING_DOWN};
414426
}
427+
428+
// remember which servers we add during our setup request
429+
::arangodb::containers::HashSet<std::string> serversAdded;
415430

416431
transaction::Methods& trx = _query.trxForOptimization();
417432
std::vector<arangodb::futures::Future<Result>> networkCalls{};
@@ -427,9 +442,15 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
427442
}
428443

429444
TRI_IF_FAILURE("Query::setupTimeoutFailSequence") {
430-
options.timeout = network::Timeout(0.5);
445+
double t = 0.5;
446+
TRI_IF_FAILURE("Query::setupTimeoutFailSequenceRandom") {
447+
if (RandomGenerator::interval(uint32_t(100)) >= 95) {
448+
t = 3.0;
449+
}
450+
}
451+
options.timeout = network::Timeout(t);
431452
}
432-
453+
433454
/// cluster global query id, under which the query will be registered
434455
/// on DB servers from 3.8 onwards.
435456
QueryId clusterQueryId = _query.vocbase().server().getFeature<ClusterFeature>().clusterInfo().uniqid();
@@ -450,6 +471,13 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
450471
continue;
451472
}
452473

474+
if (!trx.state()->knownServers().contains(server)) {
475+
// we are about to add this server to the transaction.
476+
// remember it, so we can roll the addition back for
477+
// the second setup request if we need to
478+
serversAdded.emplace(server);
479+
}
480+
453481
networkCalls.emplace_back(
454482
buildSetupRequest(trx, server, infoSlice, didCreateEngine, snippetIds,
455483
serverToQueryId, serverToQueryIdLock, pool, options));
@@ -463,7 +491,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
463491
// We can directly report a non TRI_ERROR_LOCK_TIMEOUT
464492
// error as we need to abort after.
465493
// Otherwise we need to report
466-
Result res{TRI_ERROR_NO_ERROR};
494+
Result res;
467495
for (auto const& tryRes : responses) {
468496
auto response = tryRes.get();
469497
if (response.fail()) {
@@ -478,7 +506,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
478506
}
479507
// Return what we have, this will be ok() if and only
480508
// if none of the requests failed.
481-
// If will be LOCK_TIMEOUT if and only if the only error
509+
// It will be LOCK_TIMEOUT if and only if the only error
482510
// we see was LOCK_TIMEOUT.
483511
return res;
484512
});
@@ -490,26 +518,59 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
490518
return fastPathResult.get();
491519
}
492520

521+
// we got a lock timeout response for the fast path locking...
493522
{
494523
// in case of fast path failure, we need to cleanup engines
495524
auto requests = cleanupEngines(fastPathResult.get().errorNumber(), _query.vocbase().name(), serverToQueryId);
496-
// Wait for all requests to complete.
525+
// Wait for all cleanup requests to complete.
497526
// So we know that all Transactions are aborted.
498-
// We do NOT care for the actual result.
499-
futures::collectAll(requests).wait();
500-
snippetIds.clear();
527+
Result res;
528+
for (auto& tryRes : requests) {
529+
network::Response const& response = tryRes.get();
530+
if (response.fail()) {
531+
// note first error, but continue iterating over all results
532+
LOG_TOPIC("2d319", DEBUG, Logger::AQL)
533+
<< "received error from server " << response.destination
534+
<< " during query cleanup: " << response.combinedResult().errorMessage();
535+
res.reset(response.combinedResult());
536+
}
537+
}
538+
if (res.fail()) {
539+
// unable to do a proper cleanup.
540+
// it is not safe to go on here.
541+
cleanupGuard.cancel();
542+
cleanupReason = res.errorNumber();
543+
return res;
544+
}
501545
}
502-
546+
547+
// fast path locking rolled back successfully!
548+
snippetIds.clear();
549+
550+
// revert the addition of servers by us
551+
for (auto const& s : serversAdded) {
552+
trx.state()->removeKnownServer(s);
553+
}
554+
503555
// we must generate a new query id, because the fast path setup has failed
504556
clusterQueryId = _query.vocbase().server().getFeature<ClusterFeature>().clusterInfo().uniqid();
505557

558+
if (trx.isMainTransaction() && !trx.state()->isReadOnlyTransaction()) {
559+
// when we are not in a streaming transaction, it is ok to roll a new trx id.
560+
// it is not ok to change the trx id inside a streaming transaction,
561+
// because then the caller would not be able to "talk" to the transaction
562+
// any further.
563+
// note: read-only transactions do not need to reroll their id, as there will
564+
// be no locks taken.
565+
trx.state()->coordinatorRerollTransactionId();
566+
}
567+
506568
// set back to default lock timeout for slow path fallback
507569
_query.setLockTimeout(oldLockTimeout);
508570
LOG_TOPIC("f5022", DEBUG, Logger::AQL)
509571
<< "Potential deadlock detected, using slow path for locking. This "
510572
"is expected if exclusive locks are used.";
511573

512-
trx.state()->coordinatorRerollTransactionId();
513574

514575
// Make sure we always use the same ordering on servers
515576
std::sort(engineInformation.begin(), engineInformation.end(),
@@ -566,7 +627,7 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
566627
QueryId& globalQueryId) const {
567628
if (!response.isObject() || !response.get("result").isObject()) {
568629
LOG_TOPIC("0c3f2", WARN, Logger::AQL) << "Received error information from "
569-
<< server << " : " << response.toJson();
630+
<< server << ": " << response.toJson();
570631
if (response.hasKey(StaticStrings::ErrorNum) &&
571632
response.hasKey(StaticStrings::ErrorMessage)) {
572633
return network::resultFromBody(response, TRI_ERROR_CLUSTER_AQL_COMMUNICATION)
@@ -680,7 +741,7 @@ std::vector<arangodb::network::FutureRes> EngineInfoContainerDBServerServerBased
680741
VPackBuffer<uint8_t> body;
681742
VPackBuilder builder(body);
682743
builder.openObject();
683-
builder.add("code", VPackValue(to_string(errorCode)));
744+
builder.add("code", VPackValue(errorCode));
684745
builder.close();
685746
requests.reserve(serverQueryIds.size());
686747
for (auto const& [server, queryId] : serverQueryIds) {

arangod/Aql/ExecutionEngine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ Result ExecutionEngine::createBlocks(std::vector<ExecutionNode*> const& nodes,
178178
// put it into our cache:
179179
cache.try_emplace(en, eb);
180180
}
181-
return {TRI_ERROR_NO_ERROR};
181+
return {};
182182
}
183183

184184
/// @brief create the engine

arangod/Aql/ExecutionEngine.h

Lines changed: 1 addition & 2 deletions
10000
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ class ExecutionEngine {
8484
EngineId engineId() const {
8585
return _engineId;
8686
}
87-
88-
87+
8988
/// @brief get the root block
9089
TEST_VIRTUAL ExecutionBlock* root() const;
9190

arangod/Aql/Functions.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,8 @@ void unsetOrKeep(transaction::Methods* trx, VPackSlice const& value,
845845

846846
/// @brief Helper function to get a document by it's identifier
847847
/// Lazy Locks the collection if necessary.
848-
void getDocumentByIdentifier(transaction::Methods* trx, std::string& collectionName,
848+
void getDocumentByIdentifier(transaction::Methods* trx, OperationOptions const& options,
849+
std::string& collectionName,
849850
std::string const& identifier, bool ignoreError,
850851
VPackBuilder& result) {
851852
transaction::BuilderLeaser searchBuilder(trx);
@@ -874,7 +875,7 @@ void getDocumentByIdentifier(transaction::Methods* trx, std::string& collectionN
874875

875876
Result res;
876877
try {
877-
res = trx->documentFastPath(collectionName, searchBuilder->slice(), result);
878+
res = trx->documentFastPath(collectionName, searchBuilder->slice(), options, result);
878879
} catch (arangodb::basics::Exception const& ex) {
879880
res.reset(ex.code());
880881
}
@@ -6604,6 +6605,9 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
66046605
// cppcheck-suppress variableScope
66056606
static char const* AFN = "DOCUMENT";
66066607

6608+
OperationOptions options;
6609+
options.documentCallFromAql = true;
6610+
66076611
transaction::Methods* trx = &expressionContext->trx();
66086612
auto* vopts = &trx->vpackOptions();
66096613
if (parameters.size() == 1) {
@@ -6612,7 +6616,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
66126616
if (id.isString()) {
66136617
std::string identifier(id.slice().copyString());
66146618
std::string colName;
6615-
::getDocumentByIdentifier(trx, colName, identifier, true, *builder.get());
6619+
::getDocumentByIdentifier(trx, options, colName, identifier, true, *builder.get());
66166620
if (builder->isEmpty()) {
66176621
// not found
66186622
return AqlValue(AqlValueHintNull());
@@ -6627,7 +6631,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
66276631
if (next.isString()) {
66286632
std::string identifier = next.copyString();
66296633
std::string colName;
6630-
::getDocumentByIdentifier(trx, colName, identifier, true, *builder.get());
6634+
::getDocumentByIdentifier(trx, options, colName, identifier, true, *builder.get());
66316635
}
66326636
}
66336637
builder->close();
@@ -6647,7 +6651,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
66476651
if (id.isString()) {
66486652
transaction::BuilderLeaser builder(trx);
66496653
std::string identifier(id.slice().copyString());
6650-
::getDocumentByIdentifier(trx, collectionName, identifier, true, *builder.get());
6654+
::getDocumentByIdentifier(trx, options, collectionName, identifier, true, *builder.get());
66516655
if (builder->isEmpty()) {
66526656
return AqlValue(AqlValueHintNull());
66536657
}
@@ -6663,7 +6667,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
66636667
for (auto const& next : VPackArrayIterator(idSlice)) {
66646668
if (next.isString()) {
66656669
std::string identifier(next.copyString());
6666-
::getDocumentByIdentifier(trx, collectionName, identifier, true,
6670+
::getDocumentByIdentifier(trx, options, collectionName, identifier, true,
66676671
*builder.get());
66686672
}
66696673
}

arangod/Aql/Query.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,7 @@ ExecutionState Query::cleanupPlanAndEngine(ErrorCode errorCode, bool sync) {
11771177
_sharedState->waitForAsyncWakeup();
11781178
state = cleanupTrxAndEngines(errorCode);
11791179
}
1180+
return state;
11801181
}
11811182

11821183
return cleanupTrxAndEngines(errorCode);

arangod/Aql/Query.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ class Query : public QueryContext {
206206
aql::ServerQueryIdList& serverQueryIds() { return _serverQueryIds; }
207207
aql::ExecutionStats& executionStats() { return _execStats; }
208208

209-
210209
// Debug method to kill a query at a specific position
211210
// during execution. It internally asserts that the query
212211
// is actually visible through other APIS (e.g. current queries)

arangod/Aql/QueryRegistry.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ void QueryRegistry::registerSnippets(SnippetList const& snippets) {
483483

484484
void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept {
485485
TRI_ASSERT(ServerState::instance()->isCoordinator());
486+
int iterations = 0;
486487

487488
while (true) {
488489
WRITE_LOCKER(guard, _lock);
@@ -494,8 +495,6 @@ void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept {
494495
continue;
495496
}
496497
if (it->second._isOpen) { // engine still in use
497-
LOG_TOPIC("33cfb", WARN, arangodb::Logger::AQL)
498-
<< "engine snippet '" << it->first << "' is still in use";
499498
continue;
500499
}
501500
_engines.erase(it);
@@ -505,6 +504,15 @@ void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept {
505504
if (remain == 0) {
506505
break;
507506
}
507+
if (iterations == 0) {
508+
LOG_TOPIC("33cfb", DEBUG, arangodb::Logger::AQL)
509+
<< remain << " engine snippet(s) still in use on query shutdown";
510+
} else if (iterations == 100) {
511+
LOG_TOPIC("df7c7", WARN, arangodb::Logger::AQL)
512+
<< remain << " engine snippet(s) still in use on query shutdown";
513+
}
514+
++iterations;
515+
508516
std::this_thread::yield();
509517
}
510518
}

0 commit comments

Comments
 (0)
0