8000 fix locking of write transactions on DB servers by jsteemann · Pull Request #14450 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

fix locking of write transactions on DB servers #14450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
devel
-----

* Fix locking of AQL queries write queries on DB servers.

* APM-112: invalid use of OPTIONS in AQL queries will now raise a warning in
the query.
The feature is useful to detect misspelled attribute names in OPTIONS, e.g.
Expand Down
8 changes: 8 additions & 0 deletions arangod/Aql/ClusterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "Aql/QueryRegistry.h"
#include "Aql/QueryProfile.h"
#include "Cluster/ServerState.h"
#include "Random/RandomGenerator.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Context.h"
#include "RestServer/QueryRegistryFeature.h"
Expand Down Expand Up @@ -102,6 +103,13 @@ void ClusterQuery::prepareClusterQuery(VPackSlice querySlice,
if (_trx->state()->isDBServer()) {
_trx->state()->acceptAnalyzersRevision(analyzersRevision);
}

TRI_IF_FAILURE("Query::setupLockTimeout") {
if (RandomGenerator::interval(uint32_t(100)) >= 95) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_LOCK_TIMEOUT);
}
}

Result res = _trx->begin();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
Expand Down
89 changes: 75 additions & 14 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,20 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
ErrorCode cleanupReason = TRI_ERROR_CLUSTER_TIMEOUT;

auto cleanupGuard = scopeGuard([this, &serverToQueryId, &cleanupReason]() {
// Fire and forget
std::ignore = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId);
try {
transaction::Methods& trx = _query.trxForOptimization();
auto requests = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId);
if (!trx.isMainTransaction()) {
// for AQL queries in streaming transactions, we will wait for the
// complete shutdown to have finished before we return to the caller.
// this is done so that there will be no 2 AQL queries in the same
// streaming transaction at the same time
futures::collectAll(requests).wait();
}
} catch (std::exception const& ex) {
LOG_TOPIC("2a9fe", WARN, Logger::AQL)
<< "unable to clean up query snippets: " << ex.what();
}
});

NetworkFeature const& nf = _query.vocbase().server().getFeature<NetworkFeature>();
Expand All @@ -412,6 +424,9 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
// nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN};
}

// remember which servers we add during our setup request
::arangodb::containers::HashSet<std::string> serversAdded;

transaction::Methods& trx = _query.trxForOptimization();
std::vector<arangodb::futures::Future<Result>> networkCalls{};
Expand All @@ -427,9 +442,15 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
}

TRI_IF_FAILURE("Query::setupTimeoutFailSequence") {
options.timeout = network::Timeout(0.5);
double t = 0.5;
TRI_IF_FAILURE("Query::setupTimeoutFailSequenceRandom") {
if (RandomGenerator::interval(uint32_t(100)) >= 95) {
t = 3.0;
}
}
options.timeout = network::Timeout(t);
}

/// cluster global query id, under which the query will be registered
/// on DB servers from 3.8 onwards.
QueryId clusterQueryId = _query.vocbase().server().getFeature<ClusterFeature>().clusterInfo().uniqid();
Expand All @@ -450,6 +471,13 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
continue;
}

if (!trx.state()->knownServers().contains(server)) {
// we are about to add this server to the transaction.
// remember it, so we can roll the addition back for
// the second setup request if we need to
serversAdded.emplace(server);
}

networkCalls.emplace_back(
buildSetupRequest(trx, server, infoSlice, didCreateEngine, snippetIds,
serverToQueryId, serverToQueryIdLock, pool, options));
Expand All @@ -463,7 +491,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
// We can directly report a non TRI_ERROR_LOCK_TIMEOUT
// error as we need to abort after.
// Otherwise we need to report
Result res{TRI_ERROR_NO_ERROR};
Result res;
for (auto const& tryRes : responses) {
auto response = tryRes.get();
if (response.fail()) {
Expand All @@ -478,7 +506,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
}
// Return what we have, this will be ok() if and only
// if none of the requests failed.
// If will be LOCK_TIMEOUT if and only if the only error
// It will be LOCK_TIMEOUT if and only if the only error
// we see was LOCK_TIMEOUT.
return res;
});
Expand All @@ -490,26 +518,59 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
return fastPathResult.get();
}

// we got a lock timeout response for the fast path locking...
{
// in case of fast path failure, we need to cleanup engines
auto requests = cleanupEngines(fastPathResult.get().errorNumber(), _query.vocbase().name(), serverToQueryId);
// Wait for all requests to complete.
// Wait for all cleanup requests to complete.
// So we know that all Transactions are aborted.
// We do NOT care for the actual result.
futures::collectAll(requests).wait();
snippetIds.clear();
Result res;
for (auto& tryRes : requests) {
network::Response const& response = tryRes.get();
if (response.fail()) {
// note first error, but continue iterating over all results
LOG_TOPIC("2d319", DEBUG, Logger::AQL)
<< "received error from server " << response.destination
<< " during query cleanup: " << response.combinedResult().errorMessage();
res.reset(response.combinedResult());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are effectively loosing information since we only report the last error, so I would like to add some debug/trace logging here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added.

}
}
if (res.fail()) {
// unable to do a proper cleanup.
// it is not safe to go on here.
cleanupGuard.cancel();
cleanupReason = res.errorNumber();
return res;
}
}


// fast path locking rolled back successfully!
snippetIds.clear();

// revert the addition of servers by us
for (auto const& s : serversAdded) {
trx.state()->removeKnownServer(s);
}

// we must generate a new query id, because the fast path setup has failed
clusterQueryId = _query.vocbase().server().getFeature<ClusterFeature>().clusterInfo().uniqid();

if (trx.isMainTransaction() && !trx.state()->isReadOnlyTransaction()) {
// when we are not in a streaming transaction, it is ok to roll a new trx id.
// it is not ok to change the trx id inside a streaming transaction,
// because then the caller would not be able to "talk" to the transaction
// any further.
// note: read-only transactions do not need to reroll their id, as there will
// be no locks taken.
trx.state()->coordinatorRerollTransactionId();
}

// set back to default lock timeout for slow path fallback
_query.setLockTimeout(oldLockTimeout);
LOG_TOPIC("f5022", DEBUG, Logger::AQL)
<< "Potential deadlock detected, using slow path for locking. This "
"is expected if exclusive locks are used.";

trx.state()->coordinatorRerollTransactionId();

// Make sure we always use the same ordering on servers
std::sort(engineInformation.begin(), engineInformation.end(),
Expand Down Expand Up @@ -566,7 +627,7 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
QueryId& globalQueryId) const {
if (!response.isObject() || !response.get("result").isObject()) {
LOG_TOPIC("0c3f2", WARN, Logger::AQL) << "Received error information from "
<< server << " : " << response.toJson();
<< server << ": " << response.toJson();
if (response.hasKey(StaticStrings::ErrorNum) &&
response.hasKey(StaticStrings::ErrorMessage)) {
return network::resultFromBody(response, TRI_ERROR_CLUSTER_AQL_COMMUNICATION)
Expand Down Expand Up @@ -680,7 +741,7 @@ std::vector<arangodb::network::FutureRes> EngineInfoContainerDBServerServerBased
VPackBuffer<uint8_t> body;
VPackBuilder builder(body);
builder.openObject();
builder.add("code", VPackValue(to_string(errorCode)));
builder.add("code", VPackValue(errorCode));
builder.close();
requests.reserve(serverQueryIds.size());
for (auto const& [server, queryId] : serverQueryIds) {
Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ Result ExecutionEngine::createBlocks(std::vector<ExecutionNode*> const& nodes,
// put it into our cache:
cache.try_emplace(en, eb);
}
return {TRI_ERROR_NO_ERROR};
return {};
}

/// @brief create the engine
Expand Down
3 changes: 1 addition & 2 deletions arangod/Aql/ExecutionEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class ExecutionEngine {
EngineId engineId() const {
return _engineId;
}



/// @brief get the root block
TEST_VIRTUAL ExecutionBlock* root() const;

Expand Down
16 changes: 10 additions & 6 deletions arangod/Aql/Functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ void unsetOrKeep(transaction::Methods* trx, VPackSlice const& value,

/// @brief Helper function to get a document by it's identifier
/// Lazy Locks the collection if necessary.
void getDocumentByIdentifier(transaction::Methods* trx, std::string& collectionName,
void getDocumentByIdentifier(transaction::Methods* trx, OperationOptions const& options,
std::string& collectionName,
std::string const& identifier, bool ignoreError,
VPackBuilder& result) {
transaction::BuilderLeaser searchBuilder(trx);
Expand Down Expand Up @@ -874,7 +875,7 @@ void getDocumentByIdentifier(transaction::Methods* trx, std::string& collectionN

Result res;
try {
res = trx->documentFastPath(collectionName, searchBuilder->slice(), result);
res = trx->documentFastPath(collectionName, searchBuilder->slice(), options, result);
} catch (arangodb::basics::Exception const& ex) {
res.reset(ex.code());
}
Expand Down Expand Up @@ -6604,6 +6605,9 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
// cppcheck-suppress variableScope
static char const* AFN = "DOCUMENT";

OperationOptions options;
options.documentCallFromAql = true;

transaction::Methods* trx = &expressionContext->trx();
auto* vopts = &trx->vpackOptions();
if (parameters.size() == 1) {
Expand All @@ -6612,7 +6616,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
if (id.isString()) {
std::string identifier(id.slice().copyString());
std::string colName;
::getDocumentByIdentifier(trx, colName, identifier, true, *builder.get());
::getDocumentByIdentifier(trx, options, colName, identifier, true, *builder.get());
if (builder->isEmpty()) {
// not found
return AqlValue(AqlValueHintNull());
Expand All @@ -6627,7 +6631,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
if (next.isString()) {
std::string identifier = next.copyString();
std::string colName;
::getDocumentByIdentifier(trx, colName, identifier, true, *builder.get());
::getDocumentByIdentifier(trx, options, colName, identifier, true, *builder.get());
}
}
builder->close();
Expand All @@ -6647,7 +6651,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
if (id.isString()) {
transaction::BuilderLeaser builder(trx);
std::string identifier(id.slice().copyString());
::getDocumentByIdentifier(trx, collectionName, identifier, true, *builder.get());
::getDocumentByIdentifier(trx, options, collectionName, identifier, true, *builder.get());
if (builder->isEmpty()) {
return AqlValue(AqlValueHintNull());
}
Expand All @@ -6663,7 +6667,7 @@ AqlValue Functions::Document(ExpressionContext* expressionContext, AstNode const
for (auto const& next : VPackArrayIterator(idSlice)) {
if (next.isString()) {
std::string identifier(next.copyString());
::getDocumentByIdentifier(trx, collectionName, identifier, true,
::getDocumentByIdentifier(trx, options, collectionName, identifier, true,
*builder.get());
}
}
Expand Down
1 change: 1 addition & 0 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ ExecutionState Query::cleanupPlanAndEngine(ErrorCode errorCode, bool sync) {
_sharedState->waitForAsyncWakeup();
state = cleanupTrxAndEngines(errorCode);
}
return state;
}

return cleanupTrxAndEngines(errorCode);
Expand Down
1 change: 0 additions & 1 deletion arangod/Aql/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ class Query : public QueryContext {
aql::ServerQueryIdList& serverQueryIds() { return _serverQueryIds; }
aql::ExecutionStats& executionStats() { return _execStats; }


// Debug method to kill a query at a specific position
// during execution. It internally asserts that the query
// is actually visible through other APIS (e.g. current queries)
Expand Down
12 changes: 10 additions & 2 deletions arangod/Aql/QueryRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ void QueryRegistry::registerSnippets(SnippetList const& snippets) {

void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept {
TRI_ASSERT(ServerState::instance()->isCoordinator());
int iterations = 0;

while (true) {
WRITE_LOCKER(guard, _lock);
Expand All @@ -494,8 +495,6 @@ void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept {
continue;
}
if (it->second._isOpen) { // engine still in use
LOG_TOPIC("33cfb", WARN, arangodb::Logger::AQL)
<< "engine snippet '" << it->first << "' is still in use";
continue;
}
_engines.erase(it);
Expand All @@ -505,6 +504,15 @@ void QueryRegistry::unregisterSnippets(SnippetList const& snippets) noexcept {
if (remain == 0) {
break;
}
if (iterations == 0) {
LOG_TOPIC("33cfb", DEBUG, arangodb::Logger::AQL)
<< remain << " engine snippet(s) still in use on query shutdown";
} else if (iterations == 100) {
LOG_TOPIC("df7c7", WARN, arangodb::Logger::AQL)
<< remain << " engine snippet(s) still in use on query shutdown";
}
++iterations;

std::this_thread::yield();
}
}
Expand Down
Loading
0