8000 Improve internal AQL and transaction timeouts by jsteemann · Pull Request #14283 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
5887944
Improve internal AQL and transaction timeouts
jsteemann May 31, 2021
4f5f9bb
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann May 31, 2021
963879e
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jun 28, 2021
95b6013
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jun 29, 2021
6f409ae
fix reboot trackers, adjust timeout to 20 minutes
jsteemann Jun 29, 2021
f610e5e
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jun 30, 2021
7e77857
try to fix clang compilation error
jsteemann Jun 30, 2021
8e419c3
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jun 30, 2021
74901c4
fix wrong server name
jsteemann Jun 30, 2021
f5a50d4
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jun 30, 2021
c284667
fix several issues
jsteemann Jun 30, 2021
0dff2bd
Merge branch 'feature/increase-internal-timeouts' of github.com:arang…
jsteemann Jun 30, 2021
39bac47
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jul 1, 2021
006fb88
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jul 9, 2021
8116702
Merge branch 'devel' of github.com:arangodb/arangodb into feature/inc…
jsteemann Jul 9, 2021
7ce4991
bump timeout from 3 to 5 minutes
jsteemann Jul 9, 2021
6fcdfe0
Merge branch 'devel' into feature/increase-internal-timeouts
jsteemann Jul 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
devel
-----

* Slightly increase internal AQL query and transaction timeout on DB servers
from 3 to 5 minutes.
Previously, queries and transactions on DB servers could expire quicker,
which led to spurious "query ID not found" or "transaction ID not found"
errors on DB servers for multi-server queries/transactions with unbalanced
access patterns for the different participating DB servers.
The timeouts on coordinators remain unchanged, so any queries/transactions
that are abandoned will be aborted there, which will also be propagated to
DB servers. In addition, if a particpating server in an AQL query becomes
unavailable, the coordinator is now notified of that and will terminate the
query more eagerly.

* Add hard-coded complexity limits for AQL queries, in order to prevent
programmatically generated large queries from causing trouble (too deep
recursion, enormous memory usage, long query optimization and distribution
Expand Down Expand Up @@ -342,12 +354,12 @@ devel
indefinitely when LIMIT is less than number of available documents.

* Bug-Fix (MacOs): In MacOs there is an upper bound for descriptors defined by
the system, which is independend of the settings in `ulimit -n`. If the
the system, which is independent of the settings in `ulimit -n`. If the
hard limit is set above this upper bound value ArangoDB tries to raise the
soft limit to the hard limit on boot. This will fail due to the system
limit. This could cause ArangoDB to not start, asking you to lower the
minimum of required file descriptors. The system set upper bound is now
honored and the soft limit will be set to either hard limit or system limit
minimum of required file descriptors. The system-set upper bound is now
honored and the soft limit will be set to either hard limit or system limit,
whichever is lower.

* Fix BTS-409: return error 1948 when a negative edge was detected during or was
Expand Down
76 changes: 55 additions & 21 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "Aql/Ast.h"
#include "Aql/GraphNode.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterTrxMethods.h"
Expand All @@ -34,6 +35,7 @@
#include "Network/Utils.h"
#include "Random/RandomGenerator.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Manager.h"
#include "Utils/CollectionNameResolver.h"

#include <set>
Expand All @@ -47,6 +49,9 @@ namespace {
const double SETUP_TIMEOUT = 60.0;
// Wait 2s to get the Lock in FastPath, otherwise assume dead-lock.
const double FAST_PATH_LOCK_TIMEOUT = 2.0;

std::string const finishUrl("/_api/aql/finish/");
std::string const traverserUrl("/_internal/traverser/");

Result ExtractRemoteAndShard(VPackSlice keySlice, ExecutionNodeId& remoteId,
std::string& shardId) {
Expand Down Expand Up @@ -302,7 +307,8 @@ arangodb::futures::Future<Result> EngineInfoContainerDBServerServerBased::buildS
std::vector<bool> didCreateEngine, MapRemoteToSnippet& snippetIds,
aql::ServerQueryIdList& serverToQueryId, std::mutex& serverToQueryIdLock,
network::ConnectionPool* pool, network::RequestOptions const& options) const {
std::string serverDest = "server:" + server;

TRI_ASSERT(server.substr(0, 7) != "server:");

VPackBuffer<uint8_t> buffer(infoSlice.byteSize());
buffer.append(infoSlice.begin(), infoSlice.byteSize());
Expand All @@ -315,11 +321,14 @@ arangodb::futures::Future<Result> EngineInfoContainerDBServerServerBased::buildS
QueryId globalId = infoSlice.get("clusterQueryId").getNumber<QueryId>();

auto buildCallback =
[this, server, serverDest, didCreateEngine = std::move(didCreateEngine),
[this, server, didCreateEngine = std::move(didCreateEngine),
&serverToQueryId, &serverToQueryIdLock, &snippetIds, globalId](
arangodb::futures::Try<arangodb::network::Response> const& response) -> Result {
auto const& resolvedResponse = response.get();
auto queryId = globalId;
RebootId rebootId{0};

TRI_ASSERT(server.substr(0, 7) != "server:");

std::unique_lock<std::mutex> guard{serverToQueryIdLock};

Expand All @@ -329,22 +338,22 @@ arangodb::futures::Future<Result> EngineInfoContainerDBServerServerBased::buildS
<< server << " responded with " << res.errorNumber() << ": "
<< res.errorMessage();

serverToQueryId.emplace_back(serverDest, globalId);
serverToQueryId.emplace_back(ServerQueryIdEntry{ server, globalId, rebootId });
return res;
}

VPackSlice responseSlice = resolvedResponse.slice();
if (responseSlice.isNone()) {
return {TRI_ERROR_INTERNAL, "malformed response while building engines"};
}
auto result = parseResponse(responseSlice, snippetIds, server, serverDest,
didCreateEngine, queryId);
serverToQueryId.emplace_back(serverDest, queryId);
auto result = parseResponse(responseSlice, snippetIds, server,
didCreateEngine, queryId, rebootId);
serverToQueryId.emplace_back(ServerQueryIdEntry{ server, queryId, rebootId });

return result;
};

return network::sendRequestRetry(pool, serverDest, fuerte::RestVerb::Post,
return network::sendRequestRetry(pool, "server:" + server, fuerte::RestVerb::Post,
"/_api/aql/setup", std::move(buffer), options,
std::move(headers))
.then([buildCallback = std::move(buildCallback)](futures::Try<network::Response>&& resp) mutable {
Expand Down Expand Up @@ -425,6 +434,21 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
return {TRI_ERROR_SHUTTING_DOWN};
}

double oldTtl = _query.queryOptions().ttl;
// we use a timeout of at least the trx timeout for DB server snippets.
// we assume this is safe because the RebootTracker on the coordinator
// will abort all snippets of failed coordinators eventually.
// the ttl on the coordinator is not that high, i.e. if an AQL query
// is abandoned by a client application or an end user, the coordinator
// ttl should kick in a lot earlier and also terminate the query on the
// DB server(s).
_query.queryOptions().ttl = std::max<double>(oldTtl, transaction::Manager::idleTTLDBServer);

auto ttlGuard = scopeGuard([this, oldTtl]() {
// restore previous TTL value
_query.queryOptions().ttl = oldTtl;
});

// remember which servers we add during our setup request
::arangodb::containers::HashSet<std::string> serversAdded;

Expand All @@ -435,7 +459,6 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
options.database = _query.vocbase().name();
options.timeout = network::Timeout(SETUP_TIMEOUT);
options.skipScheduler = true; // hack to speed up future.get()
options.param("ttl", std::to_string(_query.queryOptions().ttl));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this request parameter was set, but not read by RestAqlHandler.


TRI_IF_FAILURE("Query::setupTimeout") {
options.timeout = network::Timeout(0.01 + (double) RandomGenerator::interval(uint32_t(10)));
Expand All @@ -461,6 +484,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
std::mutex serverToQueryIdLock{};
std::vector<std::tuple<ServerID, std::shared_ptr<VPackBuffer<uint8_t>>, std::vector<bool>>> engineInformation;
engineInformation.reserve(dbServers.size());
serverToQueryId.reserve(dbServers.size());

for (ServerID const& server : dbServers) {
// Build Lookup Infos
VPackBuilder infoBuilder;
Expand Down Expand Up @@ -544,13 +569,15 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
}
}

// fast path locking rolled back successfully!
snippetIds.clear();

// revert the addition of servers by us
for (auto const& s : serversAdded) {
trx.state()->removeKnownServer(s);
}

// fast path locking rolled back successfully!
TRI_ASSERT(serverToQueryId.empty());

// we must generate a new query id, because the fast path setup has failed
clusterQueryId = _query.vocbase().server().getFeature<ClusterFeature>().clusterInfo().uniqid();
Expand Down Expand Up @@ -623,8 +650,10 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(

Result EngineInfoContainerDBServerServerBased::parseResponse(
VPackSlice response, MapRemoteToSnippet& queryIds, ServerID const& server,
std::string const& serverDest, std::vector<bool> const& didCreateEngine,
QueryId& globalQueryId) const {
std::vector<bool> const& didCreateEngine,
QueryId& globalQueryId, RebootId& rebootId) const {
TRI_ASSERT(server.substr(0, 7) != "server:");

if (!response.isObject() || !response.get("result").isObject()) {
LOG_TOPIC("0c3f2", WARN, Logger::AQL) << "Received error information from "
<< server << ": " << response.toJson();
Expand Down Expand Up @@ -655,6 +684,11 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
// had prescribed
globalQueryId = queryIdSlice.getNumber<QueryId>();
}

VPackSlice rebootIdSlice = result.get(StaticStrings::RebootId);
if (rebootIdSlice.isNumber()) {
rebootId = RebootId(rebootIdSlice.getNumber<uint64_t>());
}

VPackSlice snippets = result.get("snippets");
// Link Snippets to their sinks
Expand All @@ -667,15 +701,15 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
server};
}
auto remoteId = ExecutionNodeId{0};
std::string shardId = "";
std::string shardId;
auto res = ExtractRemoteAndShard(resEntry.key, remoteId, shardId);
if (!res.ok()) {
return res;
}
TRI_ASSERT(remoteId != ExecutionNodeId{0});
TRI_ASSERT(!shardId.empty());
auto& remote = queryIds[remoteId];
auto& thisServer = remote[serverDest];
auto& thisServer = remote["server:" + server];
thisServer.emplace_back(resEntry.value.copyString());
}

Expand Down Expand Up @@ -707,7 +741,7 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
// We need to consume all traverser engines
TRI_ASSERT(!idIter.valid());
}
return {TRI_ERROR_NO_ERROR};
return {};
}

/**
Expand Down Expand Up @@ -737,29 +771,29 @@ std::vector<arangodb::network::FutureRes> EngineInfoContainerDBServerServerBased
options.skipScheduler = true; // hack to speed up future.get()

// Shutdown query snippets
std::string url("/_api/aql/finish/");
VPackBuffer<uint8_t> body;
VPackBuilder builder(body);
builder.openObject();
builder.add("code", VPackValue(errorCode));
builder.add(StaticStrings::Code, VPackValue(errorCode));
builder.close();
requests.reserve(serverQueryIds.size());
for (auto const& [server, queryId] : serverQueryIds) {
requests.emplace_back(network::sendRequestRetry(pool, server, fuerte::RestVerb::Delete,
url + std::to_string(queryId),
for (auto const& [server, queryId, rebootId] : serverQueryIds) {
TRI_ASSERT(server.substr(0, 7) != "server:");
requests.emplace_back(network::sendRequestRetry(pool, "server:" + server, fuerte::RestVerb::Delete,
::finishUrl + std::to_string(queryId),
/*copy*/ body, options));
}
_query.incHttpRequests(static_cast<unsigned>(serverQueryIds.size()));

// Shutdown traverser engines
url = "/_internal/traverser/";
VPackBuffer<uint8_t> noBody;

for (auto& gn : _graphNodes) {
auto allEngines = gn->engines();
for (auto const& engine : *allEngines) {
TRI_ASSERT(engine.first.substr(0, 7) != "server:");
requests.emplace_back(network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Delete,
url + basics::StringUtils::itoa(engine.second), noBody, options));
::traverserUrl + basics::StringUtils::itoa(engine.second), noBody, options));
}
_query.incHttpRequests(static_cast<unsigned>(allEngines->size()));
gn->clearEngines();
Expand Down
5 changes: 3 additions & 2 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "Aql/ShardLocking.h"
#include "Aql/types.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterTypes.h"
#include "VocBase/AccessMode.h"

#include <map>
Expand Down Expand Up @@ -210,9 +211,9 @@ class EngineInfoContainerDBServerServerBased {

// Parse the response of a DBServer to a setup request
Result parseResponse(VPackSlice response, MapRemoteToSnippet& queryIds,
ServerID const& server, std::string const& serverDest,
ServerID const& server,
std::vector<bool> const& didCreateEngine,
QueryId& globalQueryId) const;
QueryId& globalQueryId, RebootId& rebootId) const;

void injectVertexCollections(GraphNode* node);

Expand Down
46 changes: 44 additions & 2 deletions arangod/Aql/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@
#include "Aql/SkipResult.h"
#include "Aql/SharedQueryState.h"
#include "Basics/ScopeGuard.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/RebootTracker.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "Logger/LogMacros.h"
#include "VocBase/Methods/Queries.h"

using namespace arangodb;
using namespace arangodb::aql;
Expand Down Expand Up @@ -490,12 +494,46 @@ struct DistributedQueryInstanciator final
TRI_ASSERT(snippets.size() > 0);
TRI_ASSERT(snippets[0]->engineId() == 0);


{
// install reboot trackers for all participating DB servers.
// we do this so we have a quick shutdown of queries if one of the participating DB servers fails.
// while it is not necessary for correctness to fail quickly, it can be beneficial
// to avoid carrying out a lot of operations on other servers only to realize at
// query end that the query cannot be committed everywhere.
auto engine = snippets[0].get();

ClusterInfo& ci = _query.vocbase().server().getFeature<ClusterFeature>().clusterInfo();
engine->rebootTrackers().reserve(srvrQryId.size());
for (auto const& [server, queryId, rebootId] : srvrQryId) {
TRI_ASSERT(server.substr(0, 7) != "server:");
std::string comment = std::string("AQL query from coordinator ") + ServerState::instance()->getId();

std::function<void(void)> f = [srvr = server, id = _query.id(), &vocbase = _query.vocbase()]() {
LOG_TOPIC("d2554", INFO, Logger::QUERIES)
<< "killing query " << id << " because participating DB server "
<< srvr << " is unavailable";
try {
methods::Queries::kill(vocbase, id, false);
} catch (...) {
// it does not really matter if this fails.
// if the coordinator contacts the failed DB server next time, it will
// realize it has failed.
}
};

engine->rebootTrackers().emplace_back(
ci.rebootTracker().callMeOnChange(cluster::RebootTracker::PeerState(server, rebootId),
std::move(f), std::move(comment)));
}
}

bool knowsAllQueryIds = snippetIds.empty() || !srvrQryId.empty();
TRI_ASSERT(knowsAllQueryIds);
for (auto const& [serverDst, queryId] : srvrQryId) {
for (auto const& [server, queryId, rebootId] : srvrQryId) {
if (queryId == 0) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
std::string("no query ID known for ") + serverDst);
std::string("no query ID known for ") + server);
}
}

Expand Down Expand Up @@ -808,3 +846,7 @@ bool ExecutionEngine::waitForSatellites(QueryContext& /*query*/, Collection cons
return true;
}
#endif

std::vector<arangodb::cluster::CallbackGuard>& ExecutionEngine::rebootTrackers() {
return _rebootTrackers;
}
6 changes: 6 additions & 0 deletions arangod/Aql/ExecutionEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "Aql/WalkerWorker.h"
#include "Basics/Common.h"
#include "Basics/Result.h"
#include "Cluster/CallbackGuard.h"
#include "Containers/SmallVector.h"

#include <memory>
Expand Down Expand Up @@ -137,6 +138,8 @@ class ExecutionEngine {

bool waitForSatellites(aql::QueryContext& query, Collection const* collection) const;

std::vector<arangodb::cluster::CallbackGuard>& rebootTrackers();

#ifdef USE_ENTERPRISE
static void parallelizeTraversals(aql::Query& query, ExecutionPlan& plan,
std::map<aql::ExecutionNodeId, aql::ExecutionNodeId>& aliases);
Expand Down Expand Up @@ -172,6 +175,9 @@ class ExecutionEngine {

/// @brief root block of the engine
ExecutionBlock* _root;

/// @brief reboot trackers for DB servers participating in the query
std::vector<arangodb::cluster::CallbackGuard> _rebootTrackers;

/// @brief the register the final result of the query is stored in
RegisterId _resultRegister;
Expand Down
Loading
0