diff --git a/arangod/Aql/AqlItemBlockInputMatrix.cpp b/arangod/Aql/AqlItemBlockInputMatrix.cpp index 956b98467c51..bbd495784e27 100644 --- a/arangod/Aql/AqlItemBlockInputMatrix.cpp +++ b/arangod/Aql/AqlItemBlockInputMatrix.cpp @@ -54,6 +54,20 @@ AqlItemBlockInputMatrix::AqlItemBlockInputMatrix(ExecutorState state, AqlItemMat } } +AqlItemBlockInputRange AqlItemBlockInputMatrix::getNextInputRange() { + TRI_ASSERT(_aqlItemMatrix != nullptr); + + if (_aqlItemMatrix->numberOfBlocks() == 0) { + return AqlItemBlockInputRange{upstreamState()}; + } + + SharedAqlItemBlockPtr blockPtr = _aqlItemMatrix->getBlock(_currentBlockRowIndex); + auto [start, end] = blockPtr->getRelevantRange(); + ExecutorState state = incrBlockIndex(); + + return {state, 0, std::move(blockPtr), start}; +} + SharedAqlItemBlockPtr AqlItemBlockInputMatrix::getBlock() const noexcept { TRI_ASSERT(_aqlItemMatrix == nullptr); return _block; @@ -94,6 +108,7 @@ std::pair AqlItemBlockInputMatrix::nextShadowRo !_aqlItemMatrix->peekShadowRow().isRelevant()) { // next row will be a shadow row _shadowRow = _aqlItemMatrix->popShadowRow(); + resetBlockIndex(); } else { _shadowRow = ShadowAqlItemRow{CreateInvalidShadowRowHint()}; } @@ -136,8 +151,24 @@ size_t AqlItemBlockInputMatrix::skipAllRemainingDataRows() { TRI_ASSERT(_finalState == ExecutorState::DONE); _aqlItemMatrix->clear(); } + resetBlockIndex(); } // Else we did already skip once. // nothing to do return 0; } + +ExecutorState AqlItemBlockInputMatrix::incrBlockIndex() { + TRI_ASSERT(_aqlItemMatrix != nullptr); + if (_currentBlockRowIndex + 1 < _aqlItemMatrix->numberOfBlocks()) { + _currentBlockRowIndex++; + // we were able to increase the size as we reached not the end yet + return ExecutorState::HASMORE; + } + // we could not increase the index, we already reached the end + return ExecutorState::DONE; +} + +void AqlItemBlockInputMatrix::resetBlockIndex() noexcept { + _currentBlockRowIndex = 0; +} diff --git a/arangod/Aql/AqlItemBlockInputMatrix.h b/arangod/Aql/AqlItemBlockInputMatrix.h index 80e37e40e1f9..6e638aa2dbb6 100644 --- a/arangod/Aql/AqlItemBlockInputMatrix.h +++ b/arangod/Aql/AqlItemBlockInputMatrix.h @@ -23,6 +23,7 @@ #ifndef ARANGOD_AQL_AQLITEMBLOCKMATRIXITERATOR_H #define ARANGOD_AQL_AQLITEMBLOCKMATRIXITERATOR_H +#include "Aql/AqlItemBlockInputRange.h" #include "Aql/AqlItemMatrix.h" #include "Aql/ExecutionState.h" #include "Aql/InputAqlItemRow.h" @@ -46,12 +47,22 @@ class AqlItemBlockInputMatrix { bool hasDataRow() const noexcept; arangodb::aql::SharedAqlItemBlockPtr getBlock() const noexcept; + + // Will provide access to the first block (from _aqlItemMatrix) + // After a block has been delivered, the block index will be increased. + // Next call then will deliver the next block etc. + AqlItemBlockInputRange getNextInputRange(); std::pair getMatrix() noexcept; ExecutorState upstreamState() const noexcept; bool upstreamHasMore() const noexcept; size_t skipAllRemainingDataRows(); + // Will return HASMORE if we were able to increase the row index. + // Otherwise will return DONE. + ExecutorState incrBlockIndex(); + void resetBlockIndex() noexcept; + private: arangodb::aql::SharedAqlItemBlockPtr _block{nullptr}; ExecutorState _finalState{ExecutorState::HASMORE}; @@ -59,6 +70,7 @@ class AqlItemBlockInputMatrix { // Only if _aqlItemMatrix is set (and NOT a nullptr), we have a valid and // usable DataRange object available to work with. AqlItemMatrix* _aqlItemMatrix; + size_t _currentBlockRowIndex = 0; ShadowAqlItemRow _shadowRow{CreateInvalidShadowRowHint{}}; }; diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 54bc29742555..d27ce8534525 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -144,9 +144,16 @@ constexpr bool isNewStyleExecutor = is_one_of_v< TestLambdaExecutor, TestLambdaSkipExecutor, // we need one after these to avoid compile errors in non-test mode #endif - UnsortedGatherExecutor, SubqueryStartExecutor, SubqueryEndExecutor, TraversalExecutor, - KShortestPathsExecutor, ShortestPathExecutor, EnumerateListExecutor, LimitExecutor, SortExecutor, - IResearchViewExecutor, + ModificationExecutor, + ModificationExecutor, InsertModifier>, + ModificationExecutor, + ModificationExecutor, RemoveModifier>, + ModificationExecutor, + ModificationExecutor, UpdateReplaceModifier>, + ModificationExecutor, + ModificationExecutor, UpsertModifier>, SubqueryStartExecutor, + UnsortedGatherExecutor, SubqueryEndExecutor, TraversalExecutor, KShortestPathsExecutor, ShortestPathExecutor, EnumerateListExecutor, + LimitExecutor, SortExecutor, IResearchViewExecutor, IResearchViewExecutor, IResearchViewExecutor, IResearchViewExecutor, @@ -1120,9 +1127,16 @@ static SkipRowsRangeVariant constexpr skipRowsType() { #ifdef ARANGODB_USE_GOOGLE_TESTS TestLambdaSkipExecutor, #endif - UnsortedGatherExecutor, TraversalExecutor, EnumerateListExecutor, SubqueryStartExecutor, - SubqueryEndExecutor, SortedCollectExecutor, LimitExecutor, SortExecutor, - IResearchViewExecutor, + ModificationExecutor, + ModificationExecutor, InsertModifier>, + ModificationExecutor, + ModificationExecutor, RemoveModifier>, + ModificationExecutor, + ModificationExecutor, UpdateReplaceModifier>, + ModificationExecutor, + ModificationExecutor, UpsertModifier>, TraversalExecutor, + EnumerateListExecutor, SubqueryStartExecutor, SubqueryEndExecutor, SortedCollectExecutor, LimitExecutor, + UnsortedGatherExecutor, SortExecutor, IResearchViewExecutor, IResearchViewExecutor, IResearchViewExecutor, IResearchViewExecutor, @@ -1187,7 +1201,15 @@ static auto fastForwardType(AqlCall const& call, Executor const& e) -> FastForwa } // TODO: We only need to do this is the executor actually require to call. // e.g. Modifications will always need to be called. Limit only if it needs to report fullCount - if constexpr (is_one_of_v) { + if constexpr (is_one_of_v, + ModificationExecutor, InsertModifier>, + ModificationExecutor, + ModificationExecutor, RemoveModifier>, + ModificationExecutor, + ModificationExecutor, UpdateReplaceModifier>, + ModificationExecutor, + ModificationExecutor, UpsertModifier>>) { return FastForwardVariant::EXECUTOR; } return FastForwardVariant::FETCHER; @@ -1441,6 +1463,7 @@ auto ExecutionBlockImpl::executeFastForward(typename Fetcher::DataRang return {ExecutorState::DONE, NoStats{}, 0, AqlCall{}, 0}; } auto type = fastForwardType(clientCall, _executor); + switch (type) { case FastForwardVariant::FULLCOUNT: case FastForwardVariant::EXECUTOR: { diff --git a/arangod/Aql/ModificationExecutor.cpp b/arangod/Aql/ModificationExecutor.cpp index 76c8f1aac60b..5556ae39b93d 100644 --- a/arangod/Aql/ModificationExecutor.cpp +++ b/arangod/Aql/ModificationExecutor.cpp @@ -101,29 +101,23 @@ ModificationExecutor::ModificationExecutor(Fetcher& f // Fetches as many rows as possible from upstream using the fetcher's fetchRow // method and accumulates results through the modifier template -std::pair::Stats> -ModificationExecutor::doCollect(size_t maxOutputs) { +auto ModificationExecutor::doCollect(AqlItemBlockInputRange& input, + size_t maxOutputs) + -> void { // for fetchRow InputAqlItemRow row{CreateInvalidInputRowHint{}}; ExecutionState state = ExecutionState::HASMORE; // Maximum number of rows we can put into output // So we only ever produce this many here - // TODO: If we SKIP_IGNORE, then we'd be able to output more; - // this would require some counting to happen in the modifier - while (_modifier.nrOfOperations() < maxOutputs && state != ExecutionState::DONE) { - std::tie(state, row) = _fetcher.fetchRow(maxOutputs); - if (state == ExecutionState::WAITING) { - return {ExecutionState::WAITING, ModificationStats{}}; - } - if (row.isInitialized()) { - // Make sure we have a valid row - TRI_ASSERT(row.isInitialized()); - _modifier.accumulate(row); - } + while (_modifier.nrOfOperations() < maxOutputs && input.hasDataRow()) { + auto [state, row] = input.nextDataRow(); + + // Make sure we have a valid row + TRI_ASSERT(row.isInitialized()); + _modifier.accumulate(row); } TRI_ASSERT(state == ExecutionState::DONE || state == ExecutionState::HASMORE); - return {state, ModificationStats{}}; } // Outputs accumulated results, and counts the statistics @@ -131,7 +125,10 @@ template void ModificationExecutor::doOutput(OutputAqlItemRow& output, Stats& stats) { typename ModifierType::OutputIterator modifierOutputIterator(_modifier); + // We only accumulated as many items as we can output, so this + // should be correct for (auto const& modifierOutput : modifierOutputIterator) { + TRI_ASSERT(!output.isFull()); bool written = false; switch (modifierOutput.getType()) { case ModifierOutput::Type::ReturnIfRequired: @@ -157,53 +154,81 @@ void ModificationExecutor::doOutput(OutputAqlItemRow& break; } } - - if (_infos._doCount) { - stats.addWritesExecuted(_modifier.nrOfWritesExecuted()); - stats.addWritesIgnored(_modifier.nrOfWritesIgnored()); - } } template std::pair::Stats> ModificationExecutor::produceRows(OutputAqlItemRow& output) { + TRI_ASSERT(false); + + return {ExecutionState::DONE, ModificationStats{}}; +} + +template +[[nodiscard]] auto ModificationExecutor::produceRows( + typename FetcherType::DataRange& input, OutputAqlItemRow& output) + -> std::tuple { TRI_ASSERT(_infos._trx); - ModificationExecutor::Stats stats; + auto stats = ModificationStats{}; - const size_t maxOutputs = std::min(output.numRowsLeft(), _modifier.getBatchSize()); + _modifier.reset(); - // if we returned "WAITING" the last time we still have - // documents in the accumulator that we have not submitted - // yet - if (_lastState != ExecutionState::WAITING) { - _modifier.reset(); + ExecutorState upstreamState = ExecutorState::HASMORE; + // only produce at most output.numRowsLeft() many results + if constexpr (std::is_same_v) { + auto range = input.getNextInputRange(); + doCollect(range, output.numRowsLeft()); + upstreamState = range.upstreamState(); + } else { + doCollect(input, output.numRowsLeft()); + upstreamState = input.upstreamState(); } - TRI_IF_FAILURE("ModificationBlock::getSome") { - THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); - } + if (_modifier.nrOfOperations() > 0) { + _modifier.transact(); - std::tie(_lastState, stats) = doCollect(maxOutputs); + if (_infos._doCount) { + stats.addWritesExecuted(_modifier.nrOfWritesExecuted()); + stats.addWritesIgnored(_modifier.nrOfWritesIgnored()); + } - if (_lastState == ExecutionState::WAITING) { - return {ExecutionState::WAITING, std::move(stats)}; + doOutput(output, stats); } - TRI_ASSERT(_lastState == ExecutionState::DONE || _lastState == ExecutionState::HASMORE); + return {upstreamState, stats, AqlCall{}}; +} - _modifier.transact(); +template +[[nodiscard]] auto ModificationExecutor::skipRowsRange( + typename FetcherType::DataRange& input, AqlCall& call) + -> std::tuple { + auto stats = ModificationStats{}; + _modifier.reset(); + + ExecutorState upstreamState = ExecutorState::HASMORE; + // only produce at most output.numRowsLeft() many results + if constexpr (std::is_same_v) { + auto range = input.getNextInputRange(); + doCollect(range, call.getOffset()); + upstreamState = range.upstreamState(); + } else { + doCollect(input, call.getOffset()); + upstreamState = input.upstreamState(); + } + + if (_modifier.nrOfOperations() > 0) { + _modifier.transact(); - // If the query is silent, there is no way to relate - // the results slice contents and the submitted documents - // If the query is *not* silent, we should get one result - // for every document. - // Yes. Really. - TRI_ASSERT(_infos._options.silent || _modifier.nrOfDocuments() == _modifier.nrOfResults()); + if (_infos._doCount) { + stats.addWritesExecuted(_modifier.nrOfWritesExecuted()); + stats.addWritesIgnored(_modifier.nrOfWritesIgnored()); + } - doOutput(output, stats); + call.didSkip(_modifier.nrOfOperations()); + } - return {_lastState, std::move(stats)}; + return {upstreamState, stats, _modifier.nrOfOperations(), AqlCall{}}; } using NoPassthroughSingleRowFetcher = SingleRowFetcher; diff --git a/arangod/Aql/ModificationExecutor.h b/arangod/Aql/ModificationExecutor.h index eeaeb5f298cb..deec6db66e29 100644 --- a/arangod/Aql/ModificationExecutor.h +++ b/arangod/Aql/ModificationExecutor.h @@ -39,6 +39,16 @@ namespace arangodb { namespace aql { + +struct AqlCall; +class AqlItemBlockInputRange; +class InputAqlItemRow; +class OutputAqlItemRow; +class ExecutorInfos; +class FilterStats; +template +class SingleRowFetcher; + // // ModificationExecutor is the "base" class for the Insert, Remove, // UpdateReplace and Upsert executors. @@ -163,8 +173,14 @@ class ModificationExecutor { std::pair produceRows(OutputAqlItemRow& output); + [[nodiscard]] auto produceRows(typename FetcherType::DataRange& input, OutputAqlItemRow& output) + -> std::tuple; + + [[nodiscard]] auto skipRowsRange(typename FetcherType::DataRange& inputRange, AqlCall& call) + -> std::tuple; + protected: - std::pair doCollect(size_t maxOutputs); + void doCollect(AqlItemBlockInputRange& input, size_t maxOutputs); void doOutput(OutputAqlItemRow& output, Stats& stats); // The state that was returned on the last call to produceRows. For us