10000 Introduce callstack splitting to avoid stackoverflows with large AQL … · arangodb/arangodb@7584472 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7584472

Browse files
mpoeterjsteemann
andauthored
Introduce callstack splitting to avoid stackoverflows with large AQL queries (#14351)
Co-authored-by: Jan <jsteemann@users.noreply.github.com>
1 parent c055bcc commit 7584472

16 files changed

+297
-7
lines changed

CHANGELOG

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
devel
22
-----
33

4+
* Fix potential stack overflow when executing large queries. This is
5+
achieved by splitting the callstack and moving part of the execution
6+
to a separate thread. The number of execution nodes after which such
7+
a callstack split should be performed can be configured via the query
8+
option `maxNodesPerCallstack` and the command line option
9+
`--query.max-nodes-per-callstack`; the default is 250.
10+
411
* Bug-Fix: Pregel WCC algorithm could yield incorrect results if a
512
part of the connected component was only attached via OUTBOUND edges.
613
The underlying algorithm is now modified to properly retain INBOUND
@@ -76,7 +83,6 @@ devel
7683
* Improve usability of hidden options: `--help` mentions that these exist
7784
and how to display them.
7885

79-
8086
* Fix DEVSUP-753: now it is safe to call visit on exhausted disjunction
8187
iterator.
8288

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -978,6 +978,7 @@ if (MSVC)
978978
add_definitions(/Z7)
979979
endif ()
980980

981+
add_definitions(/bigobj)
981982
else () # NOT MSVC
982983
set(EXTRA_C_FLAGS "")
983984
set(EXTRA_CXX_FLAGS "")

arangod/Aql/ExecutionBlockImpl.cpp

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
188188
// Break the stack before waiting.
189189
// We should not use this here.
190190
_stackBeforeWaiting.popCall();
191+
192+
if (_exeNode->isCallstackSplitEnabled()) {
193+
_callstackSplit = std::make_unique<CallstackSplit>(*this);
194+
}
191195
}
192196
193197
template <class Executor>
@@ -1518,7 +1522,14 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack)
15181522
auto subqueryLevelBefore = ctx.stack.subqueryLevel();
15191523
#endif
15201524
SkipResult skippedLocal;
1521-
std::tie(_upstreamState, skippedLocal, _lastRange) = executeFetcher(ctx, _upstreamRequest);
1525+
if (_callstackSplit) {
1526+
// we need to split the callstack to avoid stack overflows, so we move upstream
1527+
// execution into a separate thread
1528+
std::tie(_upstreamState, skippedLocal, _lastRange) = _callstackSplit->execute(ctx, _upstreamRequest);
1529+
} else {
1530+
std::tie(_upstreamState, skippedLocal, _lastRange) = executeFetcher(ctx, _upstreamRequest);
1531+
}
1532+
15221533
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
15231534
TRI_ASSERT(subqueryLevelBefore == ctx.stack.subqueryLevel());
15241535
#endif
@@ -1969,6 +1980,73 @@ void ExecutionBlockImpl<Executor>::PrefetchTask::execute(ExecutionBlockImpl& blo
19691980
}
19701981
}
19711982

1983+
template <class Executor>
1984+
ExecutionBlockImpl<Executor>::CallstackSplit::CallstackSplit(ExecutionBlockImpl& block) :
1985+
_block(block),
1986+
_thread(&CallstackSplit::run, this, std::cref(ExecContext::current())) {}
1987+
1988+
template <class Executor>
1989+
ExecutionBlockImpl<Executor>::CallstackSplit::~CallstackSplit() {
1990+
_lock.lock();
1991+
_state.store(State::Stopped);
1992+
_lock.unlock();
1993+
1994+
_bell.notify_one();
1995+
_thread.join();
1996+
}
1997+
1998+
template <class Executor>
1999+
auto ExecutionBlockImpl<Executor>::CallstackSplit::execute(ExecutionContext& ctx, AqlCallType const& aqlCall)
2000+
-> UpstreamResult {
2001+
std::variant<UpstreamResult, std::exception_ptr, std::nullopt_t> result{std::nullopt};
2002+
Params params{result, ctx, aqlCall};
2003+
2004+
{
2005+
std::unique_lock guard(_lock);
2006+
_params = &params;
2007+
_state.store(State::Executing);
2008+
}
2009+
2010+
_bell.notify_one();
2011+
2012+
std::unique_lock<std::mutex> guard(_lock);
2013+
_bell.wait(guard, [this]() {
2014+
return _state.load(std::memory_order_acquire) != State::Executing;
2015+
});
2016+
TRI_ASSERT(_state.load() == State::Waiting);
2017+
2018+
if (std::holds_alternative<std::exception_ptr>(result)) {
2019+
std::rethrow_exception(std::get<std::exception_ptr>(result));
2020+
}
2021+
2022+
return std::get<UpstreamResult>(std::move(result));
2023+
}
2024+
2025+
template <class Executor>
2026+
void ExecutionBlockImpl<Executor>::CallstackSplit::run(ExecContext const& execContext) {
2027+
ExecContextScope scope(&execContext);
2028+
std::unique_lock<std::mutex> guard(_lock);
2029+
while (true) {
2030+
_bell.wait(guard, [this]() {
2031+
return _state.load(std::memory_order_relaxed) != State::Waiting;
2032+
});
2033+
if (_state == State::Stopped) {
2034+
return;
2035+
}
2036+
TRI_ASSERT(_params != nullptr);
2037+
_state = State::Executing;
2038+
2039+
try {
2040+
_params->result = _block.executeFetcher(_params->ctx, _params->aqlCall);
2041+
} catch(...) {
2042+
_params->result = std::current_exception();
2043+
}
2044+
2045+
_state.store(State::Waiting, std::memory_order_relaxed);
2046+
_bell.notify_one();
2047+
}
2048+
}
2049+
19722050
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::Condition>>;
19732051
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::Reference>>;
19742052
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::V8Condition>>;

arangod/Aql/ExecutionBlockImpl.h

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
#include <memory>
4040
#include <mutex>
4141

42+
namespace arangodb {
43+
class ExecContext;
44+
}
45+
4246
namespace arangodb::aql {
4347

4448
template <BlockPassthrough passThrough>
@@ -346,6 +350,61 @@ class ExecutionBlockImpl final : public ExecutionBlock {
346350
std::condition_variable _bell;
347351
std::optional<PrefetchResult> _result;
348352
};
353+
354+
/**
355+
* @brief The CallstackSplit class is used for execution blocks that need to
356+
* perform their calls to upstream nodes in a separate thread in order to
357+
* prevent stack overflows.
358+
*
359+
* Execution blocks for which the callstack split has been enabled create a
360+
* single CallstackSplit instance upon creation. The CallstackSplit instance
361+
* manages the new thread for the upstream execution. Instead of calling
362+
* executeFetcher directly, we call Callsplit::execute which stores the call
363+
* parameter, signals the thread and then blocks. The other thread fetches
364+
* the parameters, performs the call to executeFetcher, stores the result and
365+
* notifies the original thread.
366+
*
367+
* This way we can split the callstack over multiple threads and thereby
368+
* avoid stack overflows.
369+
*
370+
*/
371+
struct CallstackSplit {
372+
explicit CallstackSplit(ExecutionBlockImpl& block);
373+
~CallstackSplit();
374+
375+
using UpstreamResult =
376+
std::tuple<ExecutionState, SkipResult, typename Fetcher::DataRange>;
377+
UpstreamResult execute(ExecutionContext& ctx, AqlCallType const& aqlCall);
378+
379+
private:
380+
enum class State {
381+
/// @brief the thread is waiting to be notified about a pending upstream
382+
/// call or that it should terminate.
383+
Waiting,
384+
/// @brief the thread is currently executing an upstream call. As long
385+
/// as we are in state `Executing` the previous thread is blocked waiting
386+
/// for the result.
387+
Executing,
388+
/// @brief the CallstackSplit instance is getting destroyed and the thread
389+
/// must terminate
390+
Stopped
391+
};
392+
struct Params {
393+
std::variant<UpstreamResult, std::exception_ptr, std::nullopt_t>& result;
394+
ExecutionContext& ctx;
395+
AqlCallType const& aqlCall;
396+
};
397+
398+
void run(ExecContext const& execContext);
399+
std::atomic<State> _state{State::Waiting};
400+
Params* _params;
401+
ExecutionBlockImpl& _block;
402+
403+
std::mutex _lock;
404+
std::condition_variable _bell;
405+
406+
std::thread _thread;
407+
};
349408

350409
RegisterInfos _registerInfos;
351410

@@ -393,9 +452,11 @@ class ExecutionBlockImpl final : public ExecutionBlock {
393452
AqlCallStack _stackBeforeWaiting;
394453

395454
std::shared_ptr<PrefetchTask> _prefetchTask;
396-
455+
456+
std::unique_ptr<CallstackSplit> _callstackSplit;
457+
397458
Result _firstFailure;
398-
459+
399460
bool _hasMemoizedCall{false};
400461

401462
// Only used in passthrough variant.

arangod/Aql/ExecutionNode.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,9 @@ ExecutionNode::ExecutionNode(ExecutionPlan* plan, VPackSlice const& slice)
505505

506506
_isAsyncPrefetchEnabled =
507507
VelocyPackHelper::getBooleanValue(slice, "isAsyncPrefetchEnabled", false);
508+
509+
_isCallstackSplitEnabled =
510+
VelocyPackHelper::getBooleanValue(slice, "isCallstackSplitEnabled", false);
508511
}
509512

510513
ExecutionNode::ExecutionNode(ExecutionPlan& plan, ExecutionNode const& other)
@@ -946,6 +949,7 @@ void ExecutionNode::toVelocyPackHelperGeneric(VPackBuilder& nodes, unsigned flag
946949

947950
nodes.add("isInSplicedSubquery", VPackValue(_isInSplicedSubquery));
948951
nodes.add("isAsyncPrefetchEnabled", VPackValue(_isAsyncPrefetchEnabled));
952+
nodes.add("isCallstackSplitEnabled", VPackValue(_isCallstackSplitEnabled));
949953
}
950954
TRI_ASSERT(nodes.isOpenObject());
951955
}

arangod/Aql/ExecutionNode.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@ class ExecutionNode {
475475

476476
void setIsAsyncPrefetchEnabled(bool v) noexcept { _isAsyncPrefetchEnabled = v; }
477477

478+
bool isCallstackSplitEnabled() const noexcept { return _isCallstackSplitEnabled; }
479+
480+
void enableCallstackSplit() noexcept { _isCallstackSplitEnabled = true; }
481+
478482
[[nodiscard]] static bool isIncreaseDepth(NodeType type);
479483
[[nodiscard]] bool isIncreaseDepth() const;
480484
[[nodiscard]] static bool alwaysCopiesRows(NodeType type);
@@ -547,6 +551,10 @@ class ExecutionNode {
547551
/// @brief whether or not asynchronous prefetching is enabled for this node
548552
bool _isAsyncPrefetchEnabled{false};
549553

554+
/// @brief whether or not this node should split calls to upstream nodes to a
555+
/// separate thread to avoid the problem of stackoverflows in large queries.
556+
bool _isCallstackSplitEnabled{false};
557+
550558
/// @brief _plan, the ExecutionPlan object
551559
ExecutionPlan* _plan;
552560

arangod/Aql/Optimizer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ void Optimizer::createPlans(std::unique_ptr<ExecutionPlan> plan,
375375
void Optimizer::finalizePlans() {
376376
for (auto& plan : _plans.list) {
377377
insertDistributeInputCalculation(*plan.first);
378+
activateCallstackSplit(*plan.first);
378379
if (plan.first->isAsyncPrefetchEnabled()) {
379380
enableAsyncPrefetching(*plan.first);
380381
}

arangod/Aql/OptimizerRules.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,22 @@
7979

8080
namespace {
8181

82+
bool willUseV8(arangodb::aql::ExecutionPlan const& plan) {
83+
struct V8Checker : arangodb::aql::WalkerWorkerBase<arangodb::aql::ExecutionNode> {
84+
bool before(arangodb::aql::ExecutionNode* n) override {
85+
if (n->getType() == arangodb::aql::ExecutionNode::CALCULATION &&
86+
static_cast<arangodb::aql::CalculationNode*>(n)->expression()->willUseV8()) {
87+
result = true;
88+
return true;
89+
}
90+
return false;
91+
}
92+
bool result{false};
93+
} walker{};
94+
plan.root()->walk(walker);
95+
return walker.result;
96+
}
97+
8298
bool accessesCollectionVariable(arangodb::aql::ExecutionPlan const* plan,
8399
arangodb::aql::ExecutionNode const* node,
84100
arangodb::aql::VarSet& vars) {
@@ -7889,6 +7905,37 @@ void arangodb::aql::enableAsyncPrefetching(ExecutionPlan& plan) {
78897905
plan.root()->walk(walker);
78907906
}
78917907

7908+
void arangodb::aql::activateCallstackSplit(ExecutionPlan& plan) {
7909+
if (willUseV8(plan)) {
7910+
// V8 requires thread local context configuration, so we cannot
7911+
// use our thread based split solution...
7912+
return;
7913+
}
7914+
7915+
auto const& options = plan.getAst()->query().queryOptions();
7916+
struct CallstackSplitter : WalkerWorkerBase<ExecutionNode> {
7917+
explicit CallstackSplitter(size_t maxNodes) : maxNodesPerCallstack(maxNodes) {}
7918+
bool before(ExecutionNode* n) override {
7919+
// This rule must be executed after subquery splicing, so we must not see any subqueries here!
7920+
TRI_ASSERT(n->getType() != EN::SUBQUERY);
7921+
7922+
if (n->getType() == EN::REMOTE) {
7923+
// RemoteNodes provide a natural split in the callstack, so we can reset the counter here!
7924+
count = 0;
7925+
} else if (++count >= maxNodesPerCallstack) {
7926+
count = 0;
7927+
n->enableCallstackSplit();
7928+
}
7929+
return false;
7930+
}
7931+
size_t maxNodesPerCallstack;
7932+
size_t count = 0;
7933+
};
7934+
7935+
CallstackSplitter walker(options.maxNodesPerCallstack);
7936+
plan.root()->walk(walker);
7937+
}
7938+
78927939
namespace {
78937940

78947941
void findSubqueriesSuitableForSplicing(ExecutionPlan const& plan,

arangod/Aql/OptimizerRules.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Collection* addCollectionToQuery(QueryContext& query, std::string const& cname,
4646
void insertDistributeInputCalculation(ExecutionPlan& plan);
4747

4848
void enableAsyncPrefetching(ExecutionPlan& plan);
49+
void activateCallstackSplit(ExecutionPlan& plan);
4950

5051
/// @brief adds a SORT operation for IN right-hand side operands
5152
void sortInValuesRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const&);

arangod/Aql/QueryOptions.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ using namespace arangodb::aql;
3838

3939
size_t QueryOptions::defaultMemoryLimit = 0;
4040
size_t QueryOptions::defaultMaxNumberOfPlans = 128;
41+
size_t QueryOptions::defaultMaxNodesPerCallstack = 250;
4142
double QueryOptions::defaultMaxRuntime = 0.0;
4243
double QueryOptions::defaultTtl;
4344
bool QueryOptions::defaultFailOnWarning = false;
@@ -47,6 +48,7 @@ QueryOptions::QueryOptions()
4748
: memoryLimit(0),
4849
maxNumberOfPlans(QueryOptions::defaultMaxNumberOfPlans),
4950
maxWarningCount(10),
51+
maxNodesPerCallstack(QueryOptions::defaultMaxNodesPerCallstack),
5052
maxRuntime(0.0),
5153
satelliteSyncWait(60.0),
5254
ttl(QueryOptions::defaultTtl), // get global default ttl
@@ -128,6 +130,11 @@ void QueryOptions::fromVelocyPack(VPackSlice slice) {
128130
maxWarningCount = value.getNumber<size_t>();
129131
}
130132

133+
value = slice.get("maxNodesPerCallstack");
134+
if (value.isNumber()) {
135+
maxNodesPerCallstack = value.getNumber<size_t>();
136+
}
137+
131138
value = slice.get("maxRuntime");
132139
if (value.isNumber()) {
133140
maxRuntime = value.getNumber<double>();
@@ -251,6 +258,7 @@ void QueryOptions::toVelocyPack(VPackBuilder& builder, bool disableOptimizerRule
251258
builder.add("memoryLimit", VPackValue(memoryLimit));
252259
builder.add("maxNumberOfPlans", VPackValue(maxNumberOfPlans));
253260
builder.add("maxWarningCount", VPackValue(maxWarningCount));
261+
builder.add("maxNodesPerCallstack", VPackValue(maxNodesPerCallstack));
254262
builder.add("maxRuntime", VPackValue(maxRuntime));
255263
builder.add("satelliteSyncWait", VPackValue(satelliteSyncWait));
256264
builder.add("ttl", VPackValue(ttl));

arangod/Aql/QueryOptions.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ struct QueryOptions {
7272
size_t memoryLimit;
7373
size_t maxNumberOfPlans;
7474
size_t maxWarningCount;
75+
size_t maxNodesPerCallstack;
7576
double maxRuntime; // query has to execute within the given time or will be killed
7677
double satelliteSyncWait;
7778
double ttl; // time until query cursor expires - avoids coursors to
@@ -106,6 +107,7 @@ struct QueryOptions {
106107

107108
static size_t defaultMemoryLimit;
108109
static size_t defaultMaxNumberOfPlans;
110+
static size_t defaultMaxNodesPerCallstack;
109111
static double defaultMaxRuntime;
110112
static double defaultTtl;
111113
static bool defaultFailOnWarning;

0 commit comments

Comments
 (0)
0