8000 Merge branch 'feature/AqlSubqueryExecutionBlockImplExecuteImplementat… · arangodb/arangodb@e28c231 · GitHub
[go: up one dir, main page]

Skip to content

Commit e28c231

Browse files
committed
Merge branch 'feature/AqlSubqueryExecutionBlockImplExecuteImplementation' of ssh://github.com/arangodb/ArangoDB into feature/AqlSubqueryExecutionBlockImplExecuteImplementation-bypass-skip
2 parents 5659bc2 + fd763cb commit e28c231

10 files changed

+74
-35
lines changed

arangod/Aql/BlocksWithClients.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,15 +211,15 @@ auto BlocksWithClientsImpl<Executor>::executeWithoutTraceForClient(AqlCallStack
211211
std::string const& clientId)
212212
-> std::tuple<ExecutionState, SkipResult, SharedAqlItemBlockPtr> {
213213
TRI_ASSERT(!clientId.empty());
214-
if (clientId.empty()) {
214+
if (ADB_UNLIKELY(clientId.empty())) {
215215
// Security bailout to avoid UB
216216
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
217217
"got empty distribution id");
218218
}
219219

220220
auto it = _clientBlockData.find(clientId);
221221
TRI_ASSERT(it != _clientBlockData.end());
222-
if (it == _clientBlockData.end()) {
222+
if (ADB_UNLIKELY(it == _clientBlockData.end())) {
223223
// Security bailout to avoid UB
224224
std::string message("AQL: unknown distribution id ");
225225
message.append(clientId);
@@ -305,4 +305,4 @@ std::pair<ExecutionState, size_t> BlocksWithClientsImpl<Executor>::skipSomeForSh
305305
}
306306

307307
template class ::arangodb::aql::BlocksWithClientsImpl<ScatterExecutor>;
308-
template class ::arangodb::aql::BlocksWithClientsImpl<DistributeExecutor>;
308+
template class ::arangodb::aql::BlocksWithClientsImpl<DistributeExecutor>;

arangod/Aql/DistributeExecutor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,4 +454,4 @@ std::pair<ExecutionState, arangodb::Result> ExecutionBlockImpl<DistributeExecuto
454454
455455
return {getHasMoreStateForClientId(clientId), TRI_ERROR_NO_ERROR};
456456
}
457-
*/
457+
*/

arangod/Aql/DistributeExecutor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ class DistributeExecutor {
116116

117117
std::deque<std::pair<SharedAqlItemBlockPtr, std::vector<size_t>>> _queue;
118118

119-
// This is unique_ptr to get away with everything beeing forward declared...
119+
// This is unique_ptr to get away with everything being forward declared...
120120
std::unique_ptr<ExecutionBlock> _executor;
121-
bool _executorHasMore;
121+
bool _executorHasMore = false;
122122
};
123123

124124
DistributeExecutor(DistributeExecutorInfos const& infos);

arangod/Aql/ExecutionBlockImpl.cpp

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ class TestLambdaSkipExecutor;
127127
} // namespace arangodb
128128
#endif
129129
130-
template <typename T, typename... Es>
131-
constexpr bool is_one_of_v = (std::is_same_v<T, Es> || ...);
132-
133130
/*
134131
* Determine whether an executor cannot bypass subquery skips.
135132
* This is if exection of this Executor does have side-effects
@@ -1253,11 +1250,25 @@ auto ExecutionBlockImpl<Executor>::executeFetcher(AqlCallStack& stack, size_t co
12531250
TRI_ASSERT(dependency < _dependencies.size());
12541251
_lastRange.resizeIfNecessary(ExecutorState::HASMORE, 0, _dependencies.size());
12551252

1256-
auto [state, skipped, range] = _rowFetcher.executeForDependency(dependency, stack);
1257-
1258-
_lastRange.setDependency(dependency, range);
1259-
1260-
return {state, skipped, _lastRange};
1253+
if constexpr (!isParallelExecutor) {
1254+
auto [state, skipped, range] = _rowFetcher.executeForDependency(dependency, stack);
1255+
_lastRange.setDependency(dependency, range);
1256+
return {state, skipped, _lastRange};
1257+
} else {
1258+
_callsInFlight.resize(_dependencyProxy.numberDependencies());
1259+
if (!_callsInFlight[dependency].has_value()) {
1260+
_callsInFlight[dependency] = stack;
1261+
}
1262+
TRI_ASSERT(_callsInFlight[dependency].has_value());
1263+
auto [state, skipped, range] =
1264+
_rowFetcher.executeForDependency(dependency,
1265+
_callsInFlight[dependency].value());
1266+
if (state != ExecutionState::WAITING) {
1267+
_callsInFlight[dependency] = std::nullopt;
1268+
}
1269+
_lastRange.setDependency(dependency, range);
1270+
return {state, skipped, _lastRange};
1271+
}
12611272
} else if constexpr (executorHasSideEffects<Executor>) {
12621273
// If the executor has side effects, we cannot bypass any subqueries
12631274
// by skipping them. SO we need to fetch all shadow rows in order to
@@ -1283,6 +1294,7 @@ auto ExecutionBlockImpl<Executor>::executeProduceRows(typename Fetcher::DataRang
12831294
-> std::tuple<ExecutorState, typename Executor::Stats, AqlCall, size_t> {
12841295
if constexpr (isNewStyleExecutor<Executor>) {
12851296
if constexpr (is_one_of_v<Executor, UnsortedGatherExecutor, SortingGatherExecutor, ParallelUnsortedGatherExecutor>) {
1297+
input.resizeIfNecessary(ExecutorState::HASMORE, 0, _dependencies.size());
12861298
return _executor.produceRows(input, output);
12871299
} else if constexpr (is_one_of_v<Executor, SubqueryExecutor<true>, SubqueryExecutor<false>>) {
12881300
// The SubqueryExecutor has it's own special handling outside.
@@ -1449,16 +1461,17 @@ auto ExecutionBlockImpl<SubqueryEndExecutor>::shadowRowForwarding() -> ExecState
14491461
}
14501462
}
14511463

1452-
template<class Executor>
1453-
auto ExecutionBlockImpl<Executor>::sideEffectShadowRowForwarding(AqlCallStack& stack) -> ExecState {
1454-
static_assert(executorHasSideEffects<Executor>);
1464+
template <class Executor>
1465+
auto ExecutionBlockImpl<Executor>::sideEffectShadowRowForwarding(AqlCallStack& stack)
1466+
-> ExecState {
1467+
TRI_ASSERT(executorHasSideEffects<Executor>);
14551468
if (!stack.needToSkipSubquery()) {
14561469
// We need to really produce things here
14571470
// fall back to original version as any other executor.
1458-
return shadowRowForwarding();
1471+
return shadowRowForwarding();
14591472
}
14601473
// TODO implemenet ShadowRowHandling
1461-
return shadowRowForwarding();
1474+
return shadowRowForwarding();
14621475
}
14631476

14641477
template <class Executor>
@@ -1696,9 +1709,9 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack stack) {
16961709
}
16971710
case ExecState::SKIP: {
16981711
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
1699-
size_t offsetBefore = clientCall.getOffset();
1712+
auto const offsetBefore = clientCall.getOffset();
17001713
TRI_ASSERT(offsetBefore > 0);
1701-
size_t canPassFullcount =
1714+
bool const canPassFullcount =
17021715
clientCall.getLimit() == 0 && clientCall.needsFullCount();
17031716
#endif
17041717
LOG_QUERY("1f786", DEBUG) << printTypeInfo() << " call skipRows " << clientCall;
@@ -1711,7 +1724,7 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack stack) {
17111724
if constexpr (is_one_of_v<Executor, SubqueryExecutor<true>>) {
17121725
// NOTE: The subquery Executor will by itself call EXECUTE on it's
17131726
// subquery. This can return waiting => we can get a WAITING state
1714-
// here. We can only get the waiting state for SUbquery executors.
1727+
// here. We can only get the waiting state for Subquery executors.
17151728
ExecutionState subqueryState = ExecutionState::HASMORE;
17161729
std::tie(subqueryState, stats, skippedLocal, call) =
17171730
_executor.skipRowsRange(_lastRange, clientCall);

arangod/Aql/ExecutionBlockImpl.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class OutputAqlItemRow;
5252
class Query;
5353
class ShadowAqlItemRow;
5454
class SkipResult;
55+
class ParallelUnsortedGatherExecutor;
56+
57+
template <typename T, typename... Es>
58+
constexpr bool is_one_of_v = (std::is_same_v<T, Es> || ...);
5559

5660
/**
5761
* @brief This is the implementation class of AqlExecutionBlocks.
@@ -136,6 +140,9 @@ class ExecutionBlockImpl final : public ExecutionBlock {
136140
DONE
137141
};
138142

143+
static constexpr bool isParallelExecutor =
144+
is_one_of_v<Executor, ParallelUnsortedGatherExecutor>;
145+
139146
public:
140147
/**
141148
* @brief Construct a new ExecutionBlock
@@ -363,6 +370,9 @@ class ExecutionBlockImpl final : public ExecutionBlock {
363370
bool _hasUsedDataRangeBlock;
364371

365372
bool _executorReturnedDone = false;
373+
374+
/// @brief Only needed for parallel executors; could be omitted otherwise
375+
std::vector<std::optional<AqlCallStack>> _callsInFlight;
366376
};
367377

368378
} // namespace arangodb::aql

arangod/Aql/MultiAqlItemBlockInputRange.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ auto MultiAqlItemBlockInputRange::hasDataRow() const noexcept -> bool {
6969

7070
auto MultiAqlItemBlockInputRange::rangeForDependency(size_t const dependency)
7171
-> AqlItemBlockInputRange& {
72+
TRI_ASSERT(dependency < _inputs.size());
7273
return _inputs.at(dependency);
7374
}
7475

arangod/Aql/ParallelUnsortedGatherExecutor.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,19 @@ ParallelUnsortedGatherExecutor::ParallelUnsortedGatherExecutor(Fetcher&, Infos&
4343

4444
ParallelUnsortedGatherExecutor::~ParallelUnsortedGatherExecutor() = default;
4545

46-
auto ParallelUnsortedGatherExecutor::upstreamCall(AqlCall const& clientCall) const
46+
auto ParallelUnsortedGatherExecutor::upstreamCallSkip(AqlCall const& clientCall) const
4747
noexcept -> AqlCall {
48+
// Only skip, don't ask for rows
49+
auto upstreamCall = clientCall;
50+
upstreamCall.softLimit = 0;
51+
upstreamCall.hardLimit = AqlCall::Infinity{};
52+
upstreamCall.fullCount = false;
53+
return upstreamCall;
54+
}
55+
56+
auto ParallelUnsortedGatherExecutor::upstreamCallProduce(AqlCall const& clientCall) const
57+
noexcept -> AqlCall {
58+
TRI_ASSERT(clientCall.getOffset() == 0);
4859
return clientCall;
4960
}
5061

@@ -87,7 +98,8 @@ auto ParallelUnsortedGatherExecutor::produceRows(typename Fetcher::DataRange& in
8798
TRI_ASSERT(waitingDep == input.numberDependencies());
8899
return {ExecutorState::DONE, NoStats{}, AqlCall{}, waitingDep};
89100
}
90-
return {ExecutorState::HASMORE, NoStats{}, upstreamCall(output.getClientCall()), waitingDep};
101+
return {ExecutorState::HASMORE, NoStats{},
102+
upstreamCallProduce(output.getClientCall()), waitingDep};
91103
}
92104

93105
auto ParallelUnsortedGatherExecutor::skipRowsRange(typename Fetcher::DataRange& input,
@@ -112,7 +124,7 @@ auto ParallelUnsortedGatherExecutor::skipRowsRange(typename Fetcher::DataRange&
112124
}
113125
if (range.hasDataRow()) {
114126
// We overfetched, skipLocally
115-
// By gurantee we will only see data, if
127+
// By guarantee we will only see data, if
116128
// we are past the offset phase.
117129
TRI_ASSERT(call.getOffset() == 0);
118130
} else {
@@ -131,5 +143,5 @@ auto ParallelUnsortedGatherExecutor::skipRowsRange(typename Fetcher::DataRange&
131143
return {ExecutorState::DONE, NoStats{}, call.getSkipCount(), AqlCall{}, waitingDep};
132144
}
133145
return {ExecutorState::HASMORE, NoStats{}, call.getSkipCount(),
134-
upstreamCall(call), waitingDep};
146+
upstreamCallSkip(call), waitingDep};
135147
}

arangod/Aql/ParallelUnsortedGatherExecutor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ class ParallelUnsortedGatherExecutor {
100100
-> std::tuple<ExecutorState, Stats, size_t, AqlCall, size_t>;
101101

102102
private:
103-
auto upstreamCall(AqlCall const& clientCall) const noexcept -> AqlCall;
103+
auto upstreamCallSkip(AqlCall const& clientCall) const noexcept -> AqlCall;
104+
auto upstreamCallProduce(AqlCall const& clientCall) const noexcept -> AqlCall;
104105
};
105106

106107
} // namespace aql

arangod/Aql/SortingGatherExecutor.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,19 +218,18 @@ auto SortingGatherExecutor::initialize(typename Fetcher::DataRange const& inputR
218218
TRI_ASSERT(_numberDependencies == 0 ||
219219
_numberDependencies == inputRange.numberDependencies());
220220
_numberDependencies = inputRange.numberDependencies();
221-
auto call = requiresMoreInput(inputRange);
222-
if (call.has_value()) {
223-
return call;
224-
}
225221
// If we have collected all ranges once, we can prepare the local data-structure copy
226222
_inputRows.reserve(_numberDependencies);
227223
for (size_t dep = 0; dep < _numberDependencies; ++dep) {
228224
auto const [state, row] = inputRange.peekDataRow(dep);
229225
_inputRows.emplace_back(dep, row, state);
230226
}
227+
auto call = requiresMoreInput(inputRange);
228+
if (call.has_value()) {
229+
return call;
230+
}
231231
_strategy->prepare(_inputRows);
232232
_initialized = true;
233-
_numberDependencies = inputRange.numberDependencies();
234233
}
235234
return {};
236235
}

arangod/Aql/TraversalExecutor.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,16 +184,19 @@ auto TraversalExecutor::doOutput(OutputAqlItemRow& output) -> void {
184184
// traverser now has next v, e, p values
185185
if (_infos.useVertexOutput()) {
186186
AqlValue vertex = _traverser.lastVertexToAqlValue();
187-
output.cloneValueInto(_infos.vertexRegister(), _inputRow, vertex);
187+
AqlValueGuard guard{vertex, true};
188+
output.moveValueInto(_infos.vertexRegister(), _inputRow, guard);
188189
}
189190
if (_infos.useEdgeOutput()) {
190191
AqlValue edge = _traverser.lastEdgeToAqlValue();
191-
output.cloneValueInto(_infos.edgeRegister(), _inputRow, edge);
192+
AqlValueGuard guard{edge, true};
193+
output.moveValueInto(_infos.edgeRegister(), _inputRow, guard);
192194
}
193195
if (_infos.usePathOutput()) {
194196
transaction::BuilderLeaser tmp(_traverser.trx());
195197
AqlValue path = _traverser.pathToAqlValue(*tmp.builder());
196-
output.cloneValueInto(_infos.pathRegister(), _inputRow, path);
198+
AqlValueGuard guard{path, true};
199+
output.moveValueInto(_infos.pathRegister(), _inputRow, guard);
197200
}
198201
output.advanceRow();
199202
}

0 commit comments

Comments
 (0)
0