8000 Introduce callstack splitting to avoid stackoverflows with large AQL queries by mpoeter · Pull Request #14351 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Introduce callstack splitting to avoid stackoverflows with large AQL queries #14351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 30, 2021
8 changes: 7 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
devel
-----

* Fix potential stack overflow when executing large queries. This is
achieved by splitting the callstack and moving part of the execution
to a separate thread. The number of execution nodes after which such
a callstack split should be performed can be configured via the query
option `maxNodesPerCallstack` and the command line option
`--query.max-nodes-per-callstack`; the default is 250.

* Bug-Fix: Pregel WCC algorithm could yield incorrect results if a
part of the connected component was only attached via OUTBOUND edges.
The underlying algorithm is now modified to properly retain INBOUND
Expand Down Expand Up @@ -76,7 +83,6 @@ devel
* Improve usability of hidden options: `--help` mentions that these exist
and how to display them.


* Fix DEVSUP-753: now it is safe to call visit on exhausted disjunction
iterator.

Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ if (MSVC)
add_definitions(/Z7)
endif ()

add_definitions(/bigobj)
else () # NOT MSVC
set(EXTRA_C_FLAGS "")
set(EXTRA_CXX_FLAGS "")
Expand Down
80 changes: 79 additions & 1 deletion arangod/Aql/ExecutionBlockImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
// Break the stack before waiting.
// We should not use this here.
_stackBeforeWaiting.popCall();

if (_exeNode->isCallstackSplitEnabled()) {
_callstackSplit = std::make_unique<CallstackSplit>(*this);
}
}

template <class Executor>
Expand Down Expand Up @@ -1518,7 +1522,14 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack)
auto subqueryLevelBefore = ctx.stack.subqueryLevel();
#endif
SkipResult skippedLocal;
std::tie(_upstreamState, skippedLocal, _lastRange) = executeFetcher(ctx, _upstreamRequest);
if (_callstackSplit) {
// we need to split the callstack to avoid stack overflows, so we move upstream
// execution into a separate thread
std::tie(_upstreamState, skippedLocal, _lastRange) = _callstackSplit->execute(ctx, _upstreamRequest);
} else {
std::tie(_upstreamState, skippedLocal, _lastRange) = executeFetcher(ctx, _upstreamRequest);
}

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(subqueryLevelBefore == ctx.stack.subqueryLevel());
#endif
Expand Down Expand Up @@ -1969,6 +1980,73 @@ void ExecutionBlockImpl<Executor>::PrefetchTask::execute(ExecutionBlockImpl& blo
}
}

template <class Executor>
ExecutionBlockImpl<Executor>::CallstackSplit::CallstackSplit(ExecutionBlockImpl& block) :
_block(block),
_thread(&CallstackSplit::run, this, std::cref(ExecContext::current())) {}

template <class Executor>
ExecutionBlockImpl<Executor>::CallstackSplit::~CallstackSplit() {
_lock.lock();
_state.store(State::Stopped);
_lock.unlock();

_bell.notify_one();
_thread.join();
}

template <class Executor>
auto ExecutionBlockImpl<Executor>::CallstackSplit::execute(ExecutionContext& ctx, AqlCallType const& aqlCall)
-> UpstreamResult {
std::variant<UpstreamResult, std::exception_ptr, std::nullopt_t> result{std::nullopt};
Params params{result, ctx, aqlCall};

{
std::unique_lock guard(_lock);
_params = &params;
_state.store(State::Executing);
}

_bell.notify_one();

std::unique_lock<std::mutex> guard(_lock);
_bell.wait(guard, [this]() {
return _state.load(std::memory_order_acquire) != State::Executing;
});
TRI_ASSERT(_state.load() == State::Waiting);

if (std::holds_alternative<std::exception_ptr>(result)) {
std::rethrow_exception(std::get<std::exception_ptr>(result));
}

return std::get<UpstreamResult>(std::move(result));
}

template <class Executor>
void ExecutionBlockImpl<Executor>::CallstackSplit::run(ExecContext const& execContext) {
ExecContextScope scope(&execContext);
std::unique_lock<std::mutex> guard(_lock);
while (true) {
_bell.wait(guard, [this]() {
return _state.load(std::memory_order_relaxed) != State::Waiting;
});
if (_state == State::Stopped) {
return;
}
TRI_ASSERT(_params != nullptr);
_state = State::Executing;

try {
_params->result = _block.executeFetcher(_params->ctx, _params->aqlCall);
} catch(...) {
_params->result = std::current_exception();
}

_state.store(State::Waiting, std::memory_order_relaxed);
_bell.notify_one();
}
}

template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::Condition>>;
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::Reference>>;
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::V8Condition>>;
Expand Down
65 changes: 63 additions & 2 deletions arangod/Aql/ExecutionBlockImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
#include <memory>
#include <mutex>

namespace arangodb {
class ExecContext;
}

namespace arangodb::aql {

template <BlockPassthrough passThrough>
Expand Down Expand Up @@ -346,6 +350,61 @@ class ExecutionBlockImpl final : public ExecutionBlock {
std::condition_variable _bell;
std::optional<PrefetchResult> _result;
};

/**
* @brief The CallstackSplit class is used for execution blocks that need to
* perform their calls to upstream nodes in a separate thread in order to
* prevent stack overflows.
*
* Execution blocks for which the callstack split has been enabled create a
* single CallstackSplit instance upon creation. The CallstackSplit instance
* manages the new thread for the upstream execution. Instead of calling
* executeFetcher directly, we call Callsplit::execute which stores the call
* parameter, signals the thread and then blocks. The other thread fetches
* the parameters, performs the call to executeFetcher, stores the result and
* notifies the original thread.
*
* This way we can split the callstack over multiple threads and thereby
* avoid stack overflows.
*
*/
struct CallstackSplit {
explicit CallstackSplit(ExecutionBlockImpl& block);
~CallstackSplit();

using UpstreamResult =
std::tuple<ExecutionState, SkipResult, typename Fetcher::DataRange>;
UpstreamResult execute(ExecutionContext& ctx, AqlCallType const& aqlCall);

private:
enum class State {
/// @brief the thread is waiting to be notified about a pending upstream
/// call or that it should terminate.
Waiting,
/// @brief the thread is currently executing an upstream call. As long
/// as we are in state `Executing` the previous thread is blocked waiting
/// for the result.
Executing,
/// @brief the CallstackSplit instance is getting destroyed and the thread
/// must terminate
Stopped
};
struct Params {
std::variant<UpstreamResult, std::exception_ptr, std::nullopt_t>& result;
ExecutionContext& ctx;
AqlCallType const& aqlCall;
};

void run(ExecContext const& execContext);
std::atomic<State> _state{State::Waiting};
Params* _params;
ExecutionBlockImpl& _block;

std::mutex _lock;
std::condition_variable _bell;

std::thread _thread;
};

RegisterInfos _registerInfos;

Expand Down Expand Up @@ -393,9 +452,11 @@ class ExecutionBlockImpl final : public ExecutionBlock {
AqlCallStack _stackBeforeWaiting;

std::shared_ptr<PrefetchTask> _prefetchTask;


std::unique_ptr<CallstackSplit> _callstackSplit;

Result _firstFailure;

bool _hasMemoizedCall{false};

// Only used in passthrough variant.
Expand Down
4 changes: 4 additions & 0 deletions arangod/Aql/ExecutionNode.cpp
F438
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ ExecutionNode::ExecutionNode(ExecutionPlan* plan, VPackSlice const& slice)

_isAsyncPrefetchEnabled =
VelocyPackHelper::getBooleanValue(slice, "isAsyncPrefetchEnabled", false);

_isCallstackSplitEnabled =
VelocyPackHelper::getBooleanValue(slice, "isCallstackSplitEnabled", false);
}

ExecutionNode::ExecutionNode(ExecutionPlan& plan, ExecutionNode const& other)
Expand Down Expand Up @@ -946,6 +949,7 @@ void ExecutionNode::toVelocyPackHelperGeneric(VPackBuilder& nodes, unsigned flag

nodes.add("isInSplicedSubquery", VPackValue(_isInSplicedSubquery));
nodes.add("isAsyncPrefetchEnabled", VPackValue(_isAsyncPrefetchEnabled));
nodes.add("isCallstackSplitEnabled", VPackValue(_isCallstackSplitEnabled));
}
TRI_ASSERT(nodes.isOpenObject());
}
Expand Down
8 changes: 8 additions & 0 deletions arangod/Aql/ExecutionNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ class ExecutionNode {

void setIsAsyncPrefetchEnabled(bool v) noexcept { _isAsyncPrefetchEnabled = v; }

bool isCallstackSplitEnabled() const noexcept { return _isCallstackSplitEnabled; }

void enableCallstackSplit() noexcept { _isCallstackSplitEnabled = true; }

[[nodiscard]] static bool isIncreaseDepth(NodeType type);
[[nodiscard]] bool isIncreaseDepth() const;
[[nodiscard]] static bool alwaysCopiesRows(NodeType type);
Expand Down Expand Up @@ -547,6 +551,10 @@ class ExecutionNode {
/// @brief whether or not asynchronous prefetching is enabled for this node
bool _isAsyncPrefetchEnabled{false};

/// @brief whether or not this node should split calls to upstream nodes to a
/// separate thread to avoid the problem of stackoverflows in large queries.
bool _isCallstackSplitEnabled{false};

/// @brief _plan, the ExecutionPlan object
ExecutionPlan* _plan;

Expand Down
1 change: 1 addition & 0 deletions arangod/Aql/Optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ void Optimizer::createPlans(std::unique_ptr<ExecutionPlan> plan,
void Optimizer::finalizePlans() {
for (auto& plan : _plans.list) {
insertDistributeInputCalculation(*plan.first);
activateCallstackSplit(*plan.first);
if (plan.first->isAsyncPrefetchEnabled()) {
enableAsyncPrefetching(*plan.first);
}
Expand Down
47 changes: 47 additions & 0 deletions arangod/Aql/OptimizerRules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@

namespace {

bool willUseV8(arangodb::aql::ExecutionPlan const& plan) {
struct V8Checker : arangodb::aql::WalkerWorkerBase<arangodb::aql::ExecutionNode> {
bool before(arangodb::aql::ExecutionNode* n) override {
if (n->getType() == arangodb::aql::ExecutionNode::CALCULATION &&
static_cast<arangodb::aql::CalculationNode*>(n)->expression()->willUseV8()) {
result = true;
return true;
}
return false;
}
bool result{false};
} walker{};
plan.root()->walk(walker);
return walker.result;
}

bool accessesCollectionVariable(arangodb::aql::ExecutionPlan const* plan,
arangodb::aql::ExecutionNode const* node,
arangodb::aql::VarSet& vars) {
Expand Down Expand Up @@ -7889,6 +7905,37 @@ void arangodb::aql::enableAsyncPrefetching(ExecutionPlan& plan) {
plan.root()->walk(walker);
}

void arangodb::aql::activateCallstackSplit(ExecutionPlan& plan) {
if (willUseV8(plan)) {
// V8 requires thread local context configuration, so we cannot
// use our thread based split solution...
return;
}

auto const& options = plan.getAst()->query().queryOptions();
struct CallstackSplitter : WalkerWorkerBase<ExecutionNode> {
explicit CallstackSplitter(size_t maxNodes) : maxNodesPerCallstack(maxNodes) {}
bool before(ExecutionNode* n) override {
// This rule must be executed after subquery splicing, so we must not see any subqueries here!
TRI_ASSERT(n->getType() != EN::SUBQUERY);

if (n->getType() == EN::REMOTE) {
// RemoteNodes provide a natural split in the callstack, so we can reset the counter here!
count = 0;
} else if (++count >= maxNodesPerCallstack) {
count = 0;
n->enableCallstackSplit();
}
return false;
}
size_t maxNodesPerCallstack;
size_t count = 0;
};

CallstackSplitter walker(options.maxNodesPerCallstack);
plan.root()->walk(walker);
}

namespace {

void findSubqueriesSuitableForSplicing(ExecutionPlan const& plan,
Expand Down
1 change: 1 addition & 0 deletions arangod/Aql/OptimizerRules.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Collection* addCollectionToQuery(QueryContext& query, std::string const& cname,
void insertDistributeInputCalculation(ExecutionPlan& plan);

void enableAsyncPrefetching(ExecutionPlan& plan);
void activateCallstackSplit(ExecutionPlan& plan);

/// @brief adds a SORT operation for IN right-hand side operands
void sortInValuesRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const&);
Expand Down
8 changes: 8 additions & 0 deletions arangod/Aql/QueryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ using namespace arangodb::aql;

size_t QueryOptions::defaultMemoryLimit = 0;
size_t QueryOptions::defaultMaxNumberOfPlans = 128;
size_t QueryOptions::defaultMaxNodesPerCallstack = 250;
double QueryOptions::defaultMaxRuntime = 0.0;
double QueryOptions::defaultTtl;
bool QueryOptions::defaultFailOnWarning = false;
Expand All @@ -47,6 +48,7 @@ QueryOptions::QueryOptions()
: memoryLimit(0),
maxNumberOfPlans(QueryOptions::defaultMaxNumberOfPlans),
maxWarningCount(10),
maxNodesPerCallstack(QueryOptions::defaultMaxNodesPerCallstack),
maxRuntime(0.0),
satelliteSyncWait(60.0),
ttl(QueryOptions::defaultTtl), // get global default ttl
Expand Down Expand Up @@ -128,6 +130,11 @@ void QueryOptions::fromVelocyPack(VPackSlice slice) {
maxWarningCount = value.getNumber<size_t>();
}

value = slice.get("maxNodesPerCallstack");
if (value.isNumber()) {
maxNodesPerCallstack = value.getNumber<size_t>();
}

value = slice.get("maxRuntime");
if (value.isNumber()) {
maxRuntime = value.getNumber<double>();
Expand Down Expand Up @@ -251,6 +258,7 @@ void QueryOptions::toVelocyPack(VPackBuilder& builder, bool disableOptimizerRule
builder.add("memoryLimit", VPackValue(memoryLimit));
builder.add("maxNumberOfPlans", VPackValue(maxNumberOfPlans));
builder.add("maxWarningCount", VPackValue(maxWarningCount));
builder.add("maxNodesPerCallstack", VPackValue(maxNodesPerCallstack));
builder.add("maxRuntime", VPackValue(maxRuntime));
builder.add("satelliteSyncWait", VPackValue(satelliteSyncWait));
builder.add("ttl", VPackValue(ttl));
Expand Down
2 changes: 2 additions & 0 deletions arangod/Aql/QueryOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct QueryOptions {
size_t memoryLimit;
size_t maxNumberOfPlans;
size_t maxWarningCount;
size_t maxNodesPerCallstack;
double maxRuntime; // query has to execute within the given time or will be killed
double satelliteSyncWait;
double ttl; // time until query cursor expires - avoids coursors to
Expand Down Expand Up @@ -106,6 +107,7 @@ struct QueryOptions {

static size_t defaultMemoryLimit;
static size_t defaultMaxNumberOfPlans;
static size_t defaultMaxNodesPerCallstack;
static double defaultMaxRuntime;
static double defaultTtl;
static bool defaultFailOnWarning;
Expand Down
Loading
0