8000 Move ModificationExecutors to new interface by markuspf · Pull Request #11165 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Move ModificationExecutors to new interface #11165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

31 changes: 31 additions & 0 deletions arangod/Aql/AqlItemBlockInputMatrix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ AqlItemBlockInputMatrix::AqlItemBlockInputMatrix(ExecutorState state, AqlItemMat
}
}

AqlItemBlockInputRange AqlItemBlockInputMatrix::getNextInputRange() {
TRI_ASSERT(_aqlItemMatrix != nullptr);

if (_aqlItemMatrix->numberOfBlocks() == 0) {
return AqlItemBlockInputRange{upstreamState()};
}

SharedAqlItemBlockPtr blockPtr = _aqlItemMatrix->getBlock(_currentBlockRowIndex);
auto [start, end] = blockPtr->getRelevantRange();
ExecutorState state = incrBlockIndex();

return {state, 0, std::move(blockPtr), start};
}

SharedAqlItemBlockPtr AqlItemBlockInputMatrix::getBlock() const noexcept {
TRI_ASSERT(_aqlItemMatrix == nullptr);
return _block;
Expand Down Expand Up @@ -94,6 +108,7 @@ std::pair<ExecutorState, ShadowAqlItemRow> AqlItemBlockInputMatrix::nextShadowRo
!_aqlItemMatrix->peekShadowRow().isRelevant()) {
// next row will be a shadow row
_shadowRow = _aqlItemMatrix->popShadowRow();
resetBlockIndex();
} else {
_shadowRow = ShadowAqlItemRow{CreateInvalidShadowRowHint()};
}
Expand Down Expand Up @@ -136,8 +151,24 @@ size_t AqlItemBlockInputMatrix::skipAllRemainingDataRows() {
TRI_ASSERT(_finalState == ExecutorState::DONE);
_aqlItemMatrix->clear();
}
resetBlockIndex();
}
// Else we did already skip once.
// nothing to do
return 0;
}

ExecutorState AqlItemBlockInputMatrix::incrBlockIndex() {
TRI_ASSERT(_aqlItemMatrix != nullptr);
if (_currentBlockRowIndex + 1 < _aqlItemMatrix->numberOfBlocks()) {
_currentBlockRowIndex++;
// we were able to increase the size as we reached not the end yet
return ExecutorState::HASMORE;
}
// we could not increase the index, we already reached the end
return ExecutorState::DONE;
}

void AqlItemBlockInputMatrix::resetBlockIndex() noexcept {
_currentBlockRowIndex = 0;
}
12 changes: 12 additions & 0 deletions arangod/Aql/AqlItemBlockInputMatrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#ifndef ARANGOD_AQL_AQLITEMBLOCKMATRIXITERATOR_H
#define ARANGOD_AQL_AQLITEMBLOCKMATRIXITERATOR_H

#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/AqlItemMatrix.h"
#include "Aql/ExecutionState.h"
#include "Aql/InputAqlItemRow.h"
Expand All @@ -46,19 +47,30 @@ class AqlItemBlockInputMatrix {
bool hasDataRow() const noexcept;

arangodb::aql::SharedAqlItemBlockPtr getBlock() const noexcept;

// Will provide access to the first block (from _aqlItemMatrix)
// After a block has been delivered, the block index will be increased.
// Next call then will deliver the next block etc.
AqlItemBlockInputRange getNextInputRange();
std::pair<ExecutorState, AqlItemMatrix const*> getMatrix() noexcept;

ExecutorState upstreamState() const noexcept;
bool upstreamHasMore() const noexcept;
size_t skipAllRemainingDataRows();

// Will return HASMORE if we were able to increase the row index.
// Otherwise will return DONE.
ExecutorState incrBlockIndex();
void resetBlockIndex() noexcept;

private:
arangodb::aql::SharedAqlItemBlockPtr _block{nullptr};
ExecutorState _finalState{ExecutorState::HASMORE};

// Only if _aqlItemMatrix is set (and NOT a nullptr), we have a valid and
// usable DataRange object available to work with.
AqlItemMatrix* _aqlItemMatrix;
size_t _currentBlockRowIndex = 0;
ShadowAqlItemRow _shadowRow{CreateInvalidShadowRowHint{}};
};

Expand Down
37 changes: 30 additions & 7 deletions arangod/Aql/ExecutionBlockImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,16 @@ constexpr bool isNewStyleExecutor = is_one_of_v<
TestLambdaExecutor,
TestLambdaSkipExecutor, // we need one after these to avoid compile errors in non-test mode
#endif
UnsortedGatherExecutor, SubqueryStartExecutor, SubqueryEndExecutor, TraversalExecutor,
KShortestPathsExecutor, ShortestPathExecutor, EnumerateListExecutor, LimitExecutor, SortExecutor,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::NotMaterialize>,
ModificationExecutor<AllRowsFetcher, InsertModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, InsertModifier>,
ModificationExecutor<AllRowsFetcher, RemoveModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, RemoveModifier>,
ModificationExecutor<AllRowsFetcher, UpdateReplaceModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpdateReplaceModifier>,
ModificationExecutor<AllRowsFetcher, UpsertModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpsertModifier>, SubqueryStartExecutor,
UnsortedGatherExecutor, SubqueryEndExecutor, TraversalExecutor, KShortestPathsExecutor, ShortestPathExecutor, EnumerateListExecutor,
LimitExecutor, SortExecutor, IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::NotMaterialize>,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::LateMaterialize>,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::Materialize>,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::NotMaterialize | arangodb::iresearch::MaterializeType::UseStoredValues>,
Expand Down Expand Up @@ -1120,9 +1127,16 @@ static SkipRowsRangeVariant constexpr skipRowsType() {
#ifdef ARANGODB_USE_GOOGLE_TESTS
TestLambdaSkipExecutor,
#endif
UnsortedGatherExecutor, TraversalExecutor, EnumerateListExecutor, SubqueryStartExecutor,
SubqueryEndExecutor, SortedCollectExecutor, LimitExecutor, SortExecutor,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::NotMaterialize>,
ModificationExecutor<AllRowsFetcher, InsertModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, InsertModifier>,
ModificationExecutor<AllRowsFetcher, RemoveModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, RemoveModifier>,
ModificationExecutor<AllRowsFetcher, UpdateReplaceModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpdateReplaceModifier>,
ModificationExecutor<AllRowsFetcher, UpsertModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpsertModifier>, TraversalExecutor,
EnumerateListExecutor, SubqueryStartExecutor, SubqueryEndExecutor, SortedCollectExecutor, LimitExecutor,
UnsortedGatherExecutor, SortExecutor, IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::NotMaterialize>,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::LateMaterialize>,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::Materialize>,
IResearchViewExecutor<false, arangodb::iresearch::MaterializeType::NotMaterialize | arangodb::iresearch::MaterializeType::UseStoredValues>,
Expand Down Expand Up @@ -1187,7 +1201,15 @@ static auto fastForwardType(AqlCall const& call, Executor const& e) -> FastForwa
}
// TODO: We only need to do this is the executor actually require to call.
// e.g. Modifications will always need to be called. Limit only if it needs to report fullCount
if constexpr (is_one_of_v<Executor, LimitExecutor>) {
if constexpr (is_one_of_v<Executor, LimitExecutor,
ModificationExecutor<AllRowsFetcher, InsertModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, InsertModifier>,
ModificationExecutor<AllRowsFetcher, RemoveModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, RemoveModifier>,
ModificationExecutor<AllRowsFetcher, UpdateReplaceModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpdateReplaceModifier>,
ModificationExecutor<AllRowsFetcher, UpsertModifier>,
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpsertModifier>>) {
return FastForwardVariant::EXECUTOR;
}
return FastForwardVariant::FETCHER;
Expand Down Expand Up @@ -1441,6 +1463,7 @@ auto ExecutionBlockImpl<Executor>::executeFastForward(typename Fetcher::DataRang
return {ExecutorState::DONE, NoStats{}, 0, AqlCall{}, 0};
}
auto type = fastForwardType(clientCall, _executor);

switch (type) {
case FastForwardVariant::FULLCOUNT:
case FastForwardVariant::EXECUTOR: {
Expand Down
111 changes: 68 additions & 43 deletions arangod/Aql/ModificationExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,37 +101,34 @@ ModificationExecutor<FetcherType, ModifierType>::ModificationExecutor(Fetcher& f
// Fetches as many rows as possible from upstream using the fetcher's fetchRow
// method and accumulates results through the modifier
template <typename FetcherType, typename ModifierType>
std::pair<ExecutionState, typename ModificationExecutor<FetcherType, ModifierType>::Stats>
ModificationExecutor<FetcherType, ModifierType>::doCollect(size_t maxOutputs) {
auto ModificationExecutor<FetcherType, ModifierType>::doCollect(AqlItemBlockInputRange& input,
size_t maxOutputs)
-> void {
// for fetchRow
InputAqlItemRow row{CreateInvalidInputRowHint{}};
ExecutionState state = ExecutionState::HASMORE;

// Maximum number of rows we can put into output
// So we only ever produce this many here
// TODO: If we SKIP_IGNORE, then we'd be able to output more;
// this would require some counting to happen in the modifier
while (_modifier.nrOfOperations() < maxOutputs && state != ExecutionState::DONE) {
std::tie(state, row) = _fetcher.fetchRow(maxOutputs);
if (state == ExecutionState::WAITING) {
return {ExecutionState::WAITING, ModificationStats{}};
}
if (row.isInitialized()) {
// Make sure we have a valid row
TRI_ASSERT(row.isInitialized());
_modifier.accumulate(row);
}
while (_modifier.nrOfOperations() < maxOutputs && input.hasDataRow()) {
auto [state, row] = input.nextDataRow();

// Make sure we have a valid row
TRI_ASSERT(row.isInitialized());
_modifier.accumulate(row);
}
TRI_ASSERT(state == ExecutionState::DONE || state == ExecutionState::HASMORE);
return {state, ModificationStats{}};
}

// Outputs accumulated results, and counts the statistics
template <typename FetcherType, typename ModifierType>
void ModificationExecutor<FetcherType, ModifierType>::doOutput(OutputAqlItemRow& output,
Stats& stats) {
typename ModifierType::OutputIterator modifierOutputIterator(_modifier);
// We only accumulated as many items as we can output, so this
// should be correct
for (auto const& modifierOutput : modifierOutputIterator) {
TRI_ASSERT(!output.isFull());
bool written = false;
switch (modifierOutput.getType()) {
case ModifierOutput::Type::ReturnIfRequired:
Expand All @@ -157,53 +154,81 @@ void ModificationExecutor<FetcherType, ModifierType>::doOutput(OutputAqlItemRow&
break;
}
}

if (_infos._doCount) {
stats.addWritesExecuted(_modifier.nrOfWritesExecuted());
stats.addWritesIgnored(_modifier.nrOfWritesIgnored());
}
}

template <typename FetcherType, typename ModifierType>
std::pair<ExecutionState, typename ModificationExecutor<FetcherType, ModifierType>::Stats>
ModificationExecutor<FetcherType, ModifierType>::produceRows(OutputAqlItemRow& output) {
TRI_ASSERT(false);

return {ExecutionState::DONE, ModificationStats{}};
}

template <typename FetcherType, typename ModifierType>
[[nodiscard]] auto ModificationExecutor<FetcherType, ModifierType>::produceRows(
typename FetcherType::DataRange& input, OutputAqlItemRow& output)
-> std::tuple<ExecutorState, ModificationStats, AqlCall> {
TRI_ASSERT(_infos._trx);

ModificationExecutor::Stats stats;
auto stats = ModificationStats{};

const size_t maxOutputs = std::min(output.numRowsLeft(), _modifier.getBatchSize());
_modifier.reset();

// if we returned "WAITING" the last time we still have
// documents in the accumulator that we have not submitted
// yet
if (_lastState != ExecutionState::WAITING) {
_modifier.reset();
ExecutorState upstreamState = ExecutorState::HASMORE;
// only produce at most output.numRowsLeft() many results
if constexpr (std::is_same_v<typename FetcherType::DataRange, AqlItemBlockInputMatrix>) {
auto range = input.getNextInputRange();
doCollect(range, output.numRowsLeft());
upstreamState = range.upstreamState();
} else {
doCollect(input, output.numRowsLeft());
upstreamState = input.upstreamState();
}

TRI_IF_FAILURE("ModificationBlock::getSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (_modifier.nrOfOperations() > 0) {
_modifier.transact();

std::tie(_lastState, stats) = doCollect(maxOutputs);
if (_infos._doCount) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that not included in doOutput anymore?

stats.addWritesExecuted(_modifier.nrOfWritesExecuted());
stats.addWritesIgnored(_modifier.nrOfWritesIgnored());
}

if (_lastState == ExecutionState::WAITING) {
return {ExecutionState::WAITING, std::move(stats)};
doOutput(output, stats);
}

TRI_ASSERT(_lastState == ExecutionState::DONE || _lastState == ExecutionState::HASMORE);
return {upstreamState, stats, AqlCall{}};
}

_modifier.transact();
template <typename FetcherType, typename ModifierType>
[[nodiscard]] auto ModificationExecutor<FetcherType, ModifierType>::skipRowsRange(
typename FetcherType::DataRange& input, AqlCall& call)
-> std::tuple<ExecutorState, Stats, size_t, AqlCall> {
auto stats = ModificationStats{};
_modifier.reset();

ExecutorState upstreamState = ExecutorState::HASMORE;
// only produce at most output.numRowsLeft() many results
if constexpr (std::is_same_v<typename FetcherType::DataRange, AqlItemBlockInputMatrix>) {
auto range = input.getNextInputRange();
doCollect(range, call.getOffset());
upstreamState = range.upstreamState();
} else {
doCollect(input, call.getOffset());
upstreamState = input.upstreamState();
}

if (_modifier.nrOfOperations() > 0) {
_modifier.transact();

// If the query is silent, there is no way to relate
// the results slice contents and the submitted documents
// If the query is *not* silent, we should get one result
// for every document.
// Yes. Really.
TRI_ASSERT(_infos._options.silent || _modifier.nrOfDocuments() == _modifier.nrOfResults());
if (_infos._doCount) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that not included in doOutput anymore?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need it in the skip as well.
The doOutput does not need the Stats any more to make it clearer

stats.addWritesExecuted(_modifier.nrOfWritesExecuted());
stats.addWritesIgnored(_modifier.nrOfWritesIgnored());
}

doOutput(output, stats);
call.didSkip(_modifier.nrOfOperations());
}

return {_lastState, std::move(stats)};
return {upstreamState, stats, _modifier.nrOfOperations(), AqlCall{}};
}

using NoPassthroughSingleRowFetcher = SingleRowFetcher<BlockPassthrough::Disable>;
Expand Down
18 changes: 17 additions & 1 deletion arangod/Aql/ModificationExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@

namespace arangodb {
namespace aql {

struct AqlCall;
class AqlItemBlockInputRange;
class InputAqlItemRow;
class OutputAqlItemRow;
class ExecutorInfos;
class FilterStats;
template <BlockPassthrough>
class SingleRowFetcher;

//
// ModificationExecutor is the "base" class for the Insert, Remove,
// UpdateReplace and Upsert executors.
Expand Down Expand Up @@ -163,8 +173,14 @@ class ModificationExecutor {

std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);

[[nodiscard]] auto produceRows(typename FetcherType::DataRange& input, OutputAqlItemRow& output)
-> std::tuple<ExecutorState, Stats, AqlCall>;

[[nodiscard]] auto skipRowsRange(typename FetcherType::DataRange& inputRange, AqlCall& call)
-> std::tuple<ExecutorState, Stats, size_t, AqlCall>;

protected:
std::pair<ExecutionState, Stats> doCollect(size_t maxOutputs);
void doCollect(AqlItemBlockInputRange& input, size_t maxOutputs);
void doOutput(OutputAqlItemRow& output, Stats& stats);

// The state that was returned on the last call to produceRows. For us
Expand Down
0