8000 Feature/hybrid smart graph additional steptype by hkernbach · Pull Request #14474 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Feature/hybrid smart graph additional steptype #14474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8000
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 67 additions & 53 deletions arangod/Aql/ExecutionBlockImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
#include "Graph/Providers/SingleServerProvider.h"
#include "Graph/Queues/FifoQueue.h"
#include "Graph/Queues/QueueTracer.h"
#include "Graph/Steps/SingleServerProviderStep.h"
#include "Graph/algorithm-aliases.h"

#include <velocypack/Dumper.h>
Expand All @@ -92,11 +93,13 @@
#include <type_traits>

/* SingleServerProvider Section */
using SingleServerProviderStep = ::arangodb::graph::SingleServerProviderStep;

using KPathRefactored =
arangodb::graph::KPathEnumerator<arangodb::graph::SingleServerProvider>;
arangodb::graph::KPathEnumerator<arangodb::graph::SingleServerProvider<SingleServerProviderStep>>;

using KPathRefactoredTracer =
arangodb::graph::TracedKPathEnumerator<arangodb::graph::SingleServerProvider>;
arangodb::graph::TracedKPathEnumerator<arangodb::graph::SingleServerProvider<SingleServerProviderStep>>;

/* ClusterProvider Section */
using KPathRefactoredCluster =
Expand Down Expand Up @@ -188,7 +191,7 @@ ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
// Break the stack before waiting.
// We should not use this here.
_stackBeforeWaiting.popCall();

if (_exeNode->isCallstackSplitEnabled()) {
_callstackSplit = std::make_unique<CallstackSplit>(*this);
}
Expand Down Expand Up @@ -342,7 +345,7 @@ ExecutionBlockImpl<Executor>::execute(AqlCallStack const& stack) {
TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome3") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}

// check if this block failed already.
if (_firstFailure.fail()) {
// if so, just return the stored error.
Expand Down Expand Up @@ -380,13 +383,17 @@ ExecutionBlockImpl<Executor>::execute(AqlCallStack const& stack) {
TRI_ASSERT(_firstFailure.ok());
// store only the first failure we got
_firstFailure = {ex.code(), ex.what()};
LOG_QUERY("7289a", DEBUG) << printBlockInfo() << " local statemachine failed with exception: " << ex.what();
LOG_QUERY("7289a", DEBUG)
<< printBlockInfo()
<< " local statemachine failed with exception: " << ex.what();
throw;
} catch (std::exception const& ex) {
TRI_ASSERT(_firstFailure.ok());
// store only the first failure we got
_firstFailure = {TRI_ERROR_INTERNAL, ex.what()};
LOG_QUERY("2bbd5", DEBUG) << printBlockInfo() << " local statemachine failed with exception: " << ex.what();
LOG_QUERY("2bbd5", DEBUG)
<< printBlockInfo()
<< " local statemachine failed with exception: " << ex.what();
// Rewire the error, to be consistent with potentially next caller.
THROW_ARANGO_EXCEPTION(_firstFailure);
}
Expand Down Expand Up @@ -527,7 +534,7 @@ auto ExecutionBlockImpl<Executor>::allocateOutputBlock(AqlCall&& call)
}

template <class Executor>
void ExecutionBlockImpl<Executor>::ensureOutputBlock(AqlCall&& call) {
void ExecutionBlockImpl<Executor>::ensureOutputBlock(AqlCall&& call) {
if (_outputItemRow == nullptr || !_outputItemRow->isInitialized()) {
_outputItemRow = allocateOutputBlock(std::move(call));
} else {
Expand Down Expand Up @@ -602,12 +609,12 @@ static SkipRowsRangeVariant constexpr skipRowsType() {
useExecutor ==
(is_one_of_v<
Executor, FilterExecutor, ShortestPathExecutor, ReturnExecutor,
KShortestPathsExecutor<graph::KShortestPathsFinder>,
KShortestPathsExecutor<KPathRefactored>, KShortestPathsExecutor<KPathRefactoredTracer>,
KShortestPathsExecutor<KPathRefactoredCluster>, KShortestPathsExecutor<KPathRefactoredClusterTracer>, ParallelUnsortedGatherExecutor,
IdExecutor<SingleRowFetcher<BlockPassthrough::Enable>>, IdExecutor<ConstFetcher>, HashedCollectExecutor,
AccuWindowExecutor, WindowExecutor, IndexExecutor, EnumerateCollectionExecutor, DistinctCollectExecutor,
ConstrainedSortExecutor, CountCollectExecutor,
KShortestPathsExecutor<graph::KShortestPathsFinder>, KShortestPathsExecutor<KPathRefactored>,
KShortestPathsExecutor<KPathRefactoredTracer>, KShortestPathsExecutor<KPathRefactoredCluster>,
KShortestPathsExecutor<KPathRefactoredClusterTracer>, ParallelUnsortedGatherExecutor,
IdExecutor<SingleRowFetcher<BlockPassthrough::Enable>>, IdExecutor<ConstFetcher>,
HashedCollectExecutor, AccuWindowExecutor, WindowExecutor, IndexExecutor, EnumerateCollectionExecutor,
DistinctCollectExecutor, ConstrainedSortExecutor, CountCollectExecutor,
#ifdef ARANGODB_USE_GOOGLE_TESTS
TestLambdaSkipExecutor,
#endif
Expand Down Expand Up @@ -734,7 +741,8 @@ auto ExecutionBlockImpl<Executor>::executeFetcher(ExecutionContext& ctx,
// SubqueryStart and the partnered SubqueryEnd by *not*
// pushing the upstream request.
if constexpr (!std::is_same_v<Executor, SubqueryStartExecutor>) {
ctx.stack.pushCall(createUpstreamCall(std::move(aqlCall), ctx.clientCallList.hasMoreCalls()));
ctx.stack.pushCall(createUpstreamCall(std::move(aqlCall),
ctx.clientCallList.hasMoreCalls()));
}

auto const result = std::invoke([&]() {
Expand All @@ -759,23 +767,24 @@ auto ExecutionBlockImpl<Executor>::executeFetcher(ExecutionContext& ctx,

// TODO - we should avoid flooding the queue with too many tasks as that
// can significantly delay processing of user REST requests.

// we can safely ignore the result here, because we will try to
// claim the task ourselves anyway.
std::ignore = 8000 SchedulerFeature::SCHEDULER->queue(
RequestLane::INTERNAL_LOW,
[block = this, task = _prefetchTask, stack = ctx.stack]() mutable {
if (!task->tryClaim()) {
return;
}
// task is a copy of the PrefetchTask shared_ptr, and we will only
// attempt to execute the task if we successfully claimed the task.
// i.e., it does not matter if this task lingers around in the
// scheduler queue even after the execution block has been destroyed,
// because in this case we will not be able to claim the task and
// simply return early without accessing the block.
task->execute(*block, stack);
});
std::ignore =
SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW,
[block = this, task = _prefetchTask,
stack = ctx.stack]() mutable {
if (!task->tryClaim()) {
return;
}
// task is a copy of the PrefetchTask shared_ptr, and we will only
// attempt to execute the task if we successfully claimed the task.
// i.e., it does not matter if this task lingers around in the
// scheduler queue even after the execution block has been destroyed,
// because in this case we will not be able to claim the task and
// simply return early without accessing the block.
task->execute(*block, stack);
});
}

if constexpr (!std::is_same_v<Executor, SubqueryStartExecutor>) {
Expand Down Expand Up @@ -1231,10 +1240,10 @@ auto ExecutionBlockImpl<Executor>::executeFastForward(typename Fetcher::DataRang
template <class Executor>
std::tuple<ExecutionState, SkipResult, SharedAqlItemBlockPtr>
ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack) {
// We can only work on a Stack that has valid calls for all levels.
// We can only work on a Stack that has valid calls for all levels.
TRI_ASSERT(callStack.hasAllValidCalls());
ExecutionContext ctx(*this, callStack);

ExecutorState localExecutorState = ExecutorState::DONE;

// We can only have returned the following internal states
Expand Down Expand Up @@ -1344,8 +1353,8 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack)
TRI_ASSERT(ctx.clientCall.getSkipCount() == 0);
switch (_execState) {
case ExecState::CHECKCALL: {
LOG_QUERY("cfe46", DEBUG)
<< printTypeInfo() << " determine next action on call " << ctx.clientCall;
LOG_QUERY("cfe46", DEBUG) << printTypeInfo() << " determine next action on call "
<< ctx.clientCall;

if constexpr (executorHasSideEffects<Executor>) {
// If the executor has sideEffects, and we need to skip the results we would
Expand All @@ -1369,7 +1378,8 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack)
LOG_QUERY("1f786", DEBUG) << printTypeInfo() << " call skipRows " << ctx.clientCall;

// Execute skipSome
auto [state, stats, skippedLocal, call] = executeSkipRowsRange(_lastRange, ctx.clientCall);
auto [state, stats, skippedLocal, call] =
executeSkipRowsRange(_lastRange, ctx.clientCall);

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
// Assertion: We did skip 'skippedLocal' documents here.
Expand Down Expand Up @@ -1517,19 +1527,21 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack)
// executors.
TRI_ASSERT(isMultiDepExecutor<Executor> || !lastRangeHasDataRow());
TRI_ASSERT(!_lastRange.hasShadowRow());

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto subqueryLevelBefore = ctx.stack.subqueryLevel();
#endif
SkipResult skippedLocal;
if (_callstackSplit) {
// we need to split the callstack to avoid stack overflows, so we move upstream
// execution into a separate thread
std::tie(_upstreamState, skippedLoc 9E88 al, _lastRange) = _callstackSplit->execute(ctx, _upstreamRequest);
// we need to split the callstack to avoid stack overflows, so we move
// upstream execution into a separate thread
std::tie(_upstreamState, skippedLocal, _lastRange) =
_callstackSplit->execute(ctx, _upstreamRequest);
} else {
std::tie(_upstreamState, skippedLocal, _lastRange) = executeFetcher(ctx, _upstreamRequest);
std::tie(_upstreamState, skippedLocal, _lastRange) =
executeFetcher(ctx, _upstreamRequest);
}

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(subqueryLevelBefore == ctx.stack.subqueryLevel());
#endif
Expand Down Expand Up @@ -1891,8 +1903,8 @@ auto ExecutionBlockImpl<Executor>::testInjectInputRange(DataRange range, SkipRes
#endif

template <class Executor>
ExecutionBlockImpl<Executor>::ExecutionContext::ExecutionContext(
ExecutionBlockImpl& block, AqlCallStack const& callstack)
ExecutionBlockImpl<Executor>::ExecutionContext::ExecutionContext(ExecutionBlockImpl& block,
AqlCallStack const& callstack)
: stack(callstack), clientCallList(this->stack.popCall()) {
if constexpr (std::is_same_v<Executor, SubqueryEndExecutor>) {
// In subqeryEndExecutor we actually manage two calls.
Expand All @@ -1913,7 +1925,7 @@ ExecutionBlockImpl<Executor>::ExecutionContext::ExecutionContext(
// We got called with a skip count already set!
// Caller is wrong fix it.
TRI_ASSERT(clientCall.getSkipCount() == 0);

TRI_ASSERT(!(clientCall.getOffset() == 0 && clientCall.softLimit == AqlCall::Limit{0u}));
TRI_ASSERT(!(clientCall.hasSoftLimit() && clientCall.fullCount));
TRI_ASSERT(!(clientCall.hasSoftLimit() && clientCall.hasHardLimit()));
Expand Down Expand Up @@ -1960,7 +1972,8 @@ auto ExecutionBlockImpl<Executor>::PrefetchTask::stealResult() noexcept -> Prefe
}

template <class Executor>
void ExecutionBlockImpl<Executor>::PrefetchTask::execute(ExecutionBlockImpl& block, AqlCallStack& stack) {
void ExecutionBlockImpl<Executor>::PrefetchTask::execute(ExecutionBlockImpl& block,
AqlCallStack& stack) {
if constexpr (std::is_same_v<Fetcher, MultiDependencySingleRowFetcher> ||
executorHasSideEffects<Executor>) {
TRI_ASSERT(false);
Expand All @@ -1981,12 +1994,12 @@ void ExecutionBlockImpl<Executor>::PrefetchTask::execute(ExecutionBlockImpl& blo
}

template <class Executor>
ExecutionBlockImpl<Executor>::CallstackSplit::CallstackSplit(ExecutionBlockImpl& block) :
_block(block),
_thread(&CallstackSplit::run, this, std::cref(ExecContext::current())) {}
ExecutionBlockImpl<Executor>::CallstackSplit::CallstackSplit(ExecutionBlockImpl& block)
: _block(block),
_thread(&CallstackSplit::run, this, std::cref(ExecContext::current())) {}

template <class Executor>
ExecutionBlockImpl<Executor>::CallstackSplit::~CallstackSplit() {
ExecutionBlockImpl<Executor>::CallstackSplit::~CallstackSplit() {
_lock.lock();
_state.store(State::Stopped);
_lock.unlock();
Expand All @@ -1996,7 +2009,8 @@ ExecutionBlockImpl<Executor>::CallstackSplit::~CallstackSplit() {
}

template <class Executor>
auto ExecutionBlockImpl<Executor>::CallstackSplit::execute(ExecutionContext& ctx, AqlCallType const& aqlCall)
auto ExecutionBlockImpl<Executor>::CallstackSplit::execute(ExecutionContext& ctx,
AqlCallType const& aqlCall)
-> UpstreamResult {
std::variant<UpstreamResult, std::exception_ptr, std::nullopt_t> result{std::nullopt};
Params params{result, ctx, aqlCall};
Expand All @@ -2008,7 +2022,7 @@ auto ExecutionBlockImpl<Executor>::CallstackSplit::execute(ExecutionContext& ctx
}

_bell.notify_one();

std::unique_lock<std::mutex> guard(_lock);
_bell.wait(guard, [this]() {
return _state.load(std::memory_order_acquire) != State::Executing;
Expand All @@ -2021,7 +2035,7 @@ auto ExecutionBlockImpl<Executor>::CallstackSplit::execute(ExecutionContext& ctx

return std::get<UpstreamResult>(std::move(result));
}

template <class Executor>
void ExecutionBlockImpl<Executor>::CallstackSplit::run(ExecContext const& execContext) {
ExecContextScope scope(&execContext);
Expand All @@ -2038,7 +2052,7 @@ void ExecutionBlockImpl<Executor>::CallstackSplit::run(ExecContext const& execCo

try {
_params->result = _block.executeFetcher(_params->ctx, _params->aqlCall);
} catch(...) {
} catch (...) {
_params->result = std::current_exception();
}

Expand Down
21 changes: 11 additions & 10 deletions arangod/Aql/KShortestPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "Graph/Queues/QueueTracer.h"
#include "Graph/ShortestPathOptions.h"
#include "Graph/ShortestPathResult.h"
#include "Graph/Steps/SingleServerProviderStep.h"
#include "Transaction/Helpers.h"

#include "Graph/algorithm-aliases.h"
Expand Down Expand Up @@ -143,8 +144,8 @@ auto KShortestPathsExecutorInfos<FinderType>::getTargetVertex() const noexcept

template <class FinderType>
auto KShortestPathsExecutorInfos<FinderType>::cache() const -> graph::TraverserCache* {
if constexpr (std::is_same_v<FinderType, KPathEnumerator<SingleServerProvider>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<SingleServerProvider>> ||
if constexpr (std::is_same_v<FinderType, KPathEnumerator<SingleServerProvider<SingleServerProviderStep>>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<SingleServerProvider<SingleServerProviderStep>>> ||
std::is_same_v<FinderType, KPathEnumerator<ClusterProvider>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<ClusterProvider>>

Expand Down Expand Up @@ -267,8 +268,8 @@ template <class FinderType>
auto KShortestPathsExecutor<FinderType>::doOutputPath(OutputAqlItemRow& output) -> void {
transaction::BuilderLeaser tmp{&_trx};
tmp->clear();
if constexpr (std::is_same_v<FinderType, KPathEnumerator<SingleServerProvider>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<SingleServerProvider>> ||
if constexpr (std::is_same_v<FinderType, KPathEnumerator<SingleServerProvider<SingleServerProviderStep>>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<SingleServerProvider<SingleServerProviderStep>>> ||
std::is_same_v<FinderType, KPathEnumerator<ClusterProvider>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<ClusterProvider>>) {
if (_finder.getNextPath(*tmp.builder())) {
Expand Down Expand Up @@ -306,8 +307,8 @@ auto KShortestPathsExecutor<FinderType>::getVertexId(InputVertex const& vertex,
try {
std::string idString;
// TODO: calculate expression once e.g. header constexpr bool and check then here
if constexpr (std::is_same_v<FinderType, KPathEnumerator<SingleServerProvider>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<SingleServerProvider>> ||
if constexpr (std::is_same_v<FinderType, KPathEnumerator<SingleServerProvider<SingleServerProviderStep>>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<SingleServerProvider<SingleServerProviderStep>>> ||
std::is_same_v<FinderType, KPathEnumerator<ClusterProvider>> ||
std::is_same_v<FinderType, TracedKPathEnumerator<ClusterProvider>>) {
idString = _trx.extractIdString(in.slice());
Expand Down Expand Up @@ -378,11 +379,11 @@ template class ::arangodb::aql::KShortestPathsExecutorInfos<arangodb::graph::KSh

/* SingleServerProvider Section */

template class ::arangodb::aql::KShortestPathsExecutorInfos<KPathEnumerator<SingleServerProvider>>;
template class ::arangodb::aql::KShortestPathsExecutorInfos<TracedKPathEnumerator<SingleServerProvider>>;
template class ::arangodb::aql::KShortestPathsExecutorInfos<KPathEnumerator<SingleServerProvider<SingleServerProviderStep>>>;
template class ::arangodb::aql::KShortestPathsExecutorInfos<TracedKPathEnumerator<SingleServerProvider<SingleServerProviderStep>>>;

template class ::arangodb::aql::KShortestPathsExecutor<KPathEnumerator<SingleServerProvider>>;
template class ::arangodb::aql::KShortestPathsExecutor<TracedKPathEnumerator<SingleServerProvider>>;
template class ::arangodb::aql::KShortestPathsExecutor<KPathEnumerator<SingleServerProvider<SingleServerProviderStep>>>;
template class ::arangodb::aql::KShortestPathsExecutor<TracedKPathEnumerator<SingleServerProvider<SingleServerProviderStep>>>;

/* ClusterProvider Section */

Expand Down
Loading
0