diff --git a/arangod/Aql/AqlItemBlockInputRange.cpp b/arangod/Aql/AqlItemBlockInputRange.cpp index b0329dfd94d7..6e90698e2bc0 100644 --- a/arangod/Aql/AqlItemBlockInputRange.cpp +++ b/arangod/Aql/AqlItemBlockInputRange.cpp @@ -181,3 +181,23 @@ auto AqlItemBlockInputRange::skipAll() noexcept -> std::size_t { _skipped = 0; return skipped; } + +auto AqlItemBlockInputRange::countDataRows() const noexcept -> std::size_t { + if (_block == nullptr) { + return 0; + } + auto const& block = getBlock(); + return block->size() - block->getShadowRowIndexes().size(); +} + +auto AqlItemBlockInputRange::countShadowRows() const noexcept -> std::size_t { + if (_block == nullptr) { + return 0; + } + auto const& block = getBlock(); + return block->getShadowRowIndexes().size(); +} + +[[nodiscard]] auto AqlItemBlockInputRange::finalState() const noexcept -> ExecutorState { + return _finalState; +} diff --git a/arangod/Aql/AqlItemBlockInputRange.h b/arangod/Aql/AqlItemBlockInputRange.h index ef2bfd6b70e6..6a03bb1e6942 100644 --- a/arangod/Aql/AqlItemBlockInputRange.h +++ b/arangod/Aql/AqlItemBlockInputRange.h @@ -72,6 +72,22 @@ class AqlItemBlockInputRange { [[nodiscard]] auto skippedInFlight() const noexcept -> std::size_t; + /** + * @brief Count how many datarows are expected in this range + * Used to estimate amount of produced rows + * @return std::size_t + */ + [[nodiscard]] auto countDataRows() const noexcept -> std::size_t; + + /** + * @brief Count how many shadowRows are expected in this range + * Used to estimate amount of produced rows + * @return std::size_t + */ + [[nodiscard]] auto countShadowRows() const noexcept -> std::size_t; + + [[nodiscard]] auto finalState() const noexcept -> ExecutorState; + private: bool isIndexValid(std::size_t index) const noexcept; diff --git a/arangod/Aql/CountCollectExecutor.cpp b/arangod/Aql/CountCollectExecutor.cpp index ff6bb3ad4130..b66ce1e61662 100644 --- a/arangod/Aql/CountCollectExecutor.cpp +++ b/arangod/Aql/CountCollectExecutor.cpp @@ -108,6 +108,12 @@ std::pair CountCollectExecutor::expectedNumberOfRows(siz return {ExecutionState::HASMORE, 1}; } +auto CountCollectExecutor::expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const + noexcept -> size_t { + return std::min(1, call.getLimit()); +} + const CountCollectExecutor::Infos& CountCollectExecutor::infos() const noexcept { return _infos; } diff --git a/arangod/Aql/CountCollectExecutor.h b/arangod/Aql/CountCollectExecutor.h index 2fad185c917c..21e51a9e29f8 100644 --- a/arangod/Aql/CountCollectExecutor.h +++ b/arangod/Aql/CountCollectExecutor.h @@ -105,6 +105,9 @@ class CountCollectExecutor { std::pair expectedNumberOfRows(size_t atMost) const; + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: Infos const& infos() const noexcept; diff --git a/arangod/Aql/DistinctCollectExecutor.cpp b/arangod/Aql/DistinctCollectExecutor.cpp index bb6dc923baf1..30cffcd2e3f6 100644 --- a/arangod/Aql/DistinctCollectExecutor.cpp +++ b/arangod/Aql/DistinctCollectExecutor.cpp @@ -85,6 +85,18 @@ std::pair DistinctCollectExecutor::expectedNumberOfRows( return _fetcher.preFetchNumberOfRows(atMost); } +[[nodiscard]] auto DistinctCollectExecutor::expectedNumberOfRowsNew( + AqlItemBlockInputRange const& input, AqlCall const& call) const noexcept -> size_t { + if (input.finalState() == ExecutorState::DONE) { + // Worst case assumption: + // For every input row we have a new group. + // We will never produce more then asked for + return std::min(call.getLimit(), input.countDataRows()); + } + // Otherwise we do not know. + return call.getLimit(); +} + void DistinctCollectExecutor::destroyValues() { // destroy all AqlValues captured for (auto& value : _seen) { diff --git a/arangod/Aql/DistinctCollectExecutor.h b/arangod/Aql/DistinctCollectExecutor.h index def1af36cf13..00197b40d4b6 100644 --- a/arangod/Aql/DistinctCollectExecutor.h +++ b/arangod/Aql/DistinctCollectExecutor.h @@ -120,6 +120,9 @@ class DistinctCollectExecutor { std::pair expectedNumberOfRows(size_t atMost) const; + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: Infos const& infos() const noexcept; void destroyValues(); diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 711af02ac9d7..e8e8e68d60bf 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -114,6 +114,7 @@ CREATE_HAS_MEMBER_CHECK(skipRows, hasSkipRows); CREATE_HAS_MEMBER_CHECK(fetchBlockForPassthrough, hasFetchBlockForPassthrough); CREATE_HAS_MEMBER_CHECK(expectedNumberOfRows, hasExpectedNumberOfRows); CREATE_HAS_MEMBER_CHECK(skipRowsRange, hasSkipRowsRange); +CREATE_HAS_MEMBER_CHECK(expectedNumberOfRowsNew, hasExpectedNumberOfRowsNew); #ifdef ARANGODB_USE_GOOGLE_TESTS // Forward declaration of Test Executors. @@ -1032,7 +1033,7 @@ std::pair ExecutionBlockImpl::r // TODO: We need to define the size of this block based on Input / Executor / Subquery depth template -auto ExecutionBlockImpl::allocateOutputBlock(AqlCall&& call) +auto ExecutionBlockImpl::allocateOutputBlock(AqlCall&& call, DataRange const& inputRange) -> std::unique_ptr { if constexpr (Executor::Properties::allowsBlockPassthrough == BlockPassthrough::Enable) { SharedAqlItemBlockPtr newBlock{nullptr}; @@ -1047,8 +1048,45 @@ auto ExecutionBlockImpl::allocateOutputBlock(AqlCall&& call) return createOutputRow(newBlock, std::move(call)); } else { + if constexpr (isMultiDepExecutor) { + // MultiDepExecutor would require dependency handling. + // We do not have it here. + if (!inputRange.hasShadowRow() && !inputRange.hasDataRow()) { + // On empty input do not yet create output. + // We are going to ask again later + SharedAqlItemBlockPtr newBlock{nullptr}; + return createOutputRow(newBlock, std::move(call)); + } + } else { + if (!inputRange.hasShadowRow() && !inputRange.hasDataRow() && + inputRange.upstreamState() == ExecutorState::HASMORE) { + // On empty input do not yet create output. + // We are going to ask again later + SharedAqlItemBlockPtr newBlock{nullptr}; + return createOutputRow(newBlock, std::move(call)); + } + } + // Non-Passthrough variant, we need to allocate the block ourselfs size_t blockSize = ExecutionBlock::DefaultBatchSize; + if constexpr (hasExpectedNumberOfRowsNew::value) { + blockSize = _executor.expectedNumberOfRowsNew(inputRange, call); + // The executor cannot expect to produce more then the limit! + if constexpr (!std::is_same_v) { + // Except the subqueryStartExecutor, it's limit differs + // from it's output (it needs to count the new ShadowRows in addition) + TRI_ASSERT(blockSize <= call.getLimit()); + } + + blockSize += inputRange.countShadowRows(); + // We have an upper bound by DefaultBatchSize; + blockSize = std::min(ExecutionBlock::DefaultBatchSize, blockSize); + } + if (blockSize == 0) { + // There is no data to be produced + SharedAqlItemBlockPtr newBlock{nullptr}; + return createOutputRow(newBlock, std::move(call)); + } SharedAqlItemBlockPtr newBlock = _engine->itemBlockManager().requestBlock(blockSize, _infos.numberOfOutputRegisters()); return createOutputRow(newBlock, std::move(call)); @@ -1056,9 +1094,10 @@ auto ExecutionBlockImpl::allocateOutputBlock(AqlCall&& call) } template -void ExecutionBlockImpl::ensureOutputBlock(AqlCall&& call) { +void ExecutionBlockImpl::ensureOutputBlock(AqlCall&& call, + DataRange const& inputRange) { if (_outputItemRow == nullptr || !_outputItemRow->isInitialized()) { - _outputItemRow = allocateOutputBlock(std::move(call)); + _outputItemRow = allocateOutputBlock(std::move(call), inputRange); } else { _outputItemRow->setCall(std::move(call)); } @@ -1251,7 +1290,7 @@ auto ExecutionBlockImpl::executeFetcher(AqlCallStack& stack, AqlCallTy return {state, skipped, _lastRange}; } else if constexpr (executorHasSideEffects) { // If the executor has side effects, we cannot bypass any subqueries - // by skipping them. SO we need to fetch all shadow rows in order to + // by skipping them. So we need to fetch all shadow rows in order to // trigger this Executor with everthing from above. // NOTE: The Executor needs to discard shadowRows, and do the accouting. static_assert(std::is_same_v>); @@ -1939,9 +1978,9 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack stack) { TRI_ASSERT(!stack.empty()); AqlCall const& subqueryCall = stack.peek(); AqlCall copyCall = subqueryCall; - ensureOutputBlock(std::move(copyCall)); + ensureOutputBlock(std::move(copyCall), _lastRange); } else { - ensureOutputBlock(std::move(clientCall)); + ensureOutputBlock(std::move(clientCall), _lastRange); } TRI_ASSERT(_outputItemRow); TRI_ASSERT(!_executorReturnedDone); @@ -2133,9 +2172,9 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack stack) { TRI_ASSERT(!stack.empty()); AqlCall const& subqueryCall = stack.peek(); AqlCall copyCall = subqueryCall; - ensureOutputBlock(std::move(copyCall)); + ensureOutputBlock(std::move(copyCall), _lastRange); } else { - ensureOutputBlock(std::move(clientCall)); + ensureOutputBlock(std::move(clientCall), _lastRange); } TRI_ASSERT(!_outputItemRow->allRowsUsed()); diff --git a/arangod/Aql/ExecutionBlockImpl.h b/arangod/Aql/ExecutionBlockImpl.h index 1b4365c78c40..ff6b7f63cf0c 100644 --- a/arangod/Aql/ExecutionBlockImpl.h +++ b/arangod/Aql/ExecutionBlockImpl.h @@ -309,12 +309,12 @@ class ExecutionBlockImpl final : public ExecutionBlock { [[nodiscard]] ExecutionState fetchShadowRowInternal(); // Allocate an output block and install a call in it - [[nodiscard]] auto allocateOutputBlock(AqlCall&& call) + [[nodiscard]] auto allocateOutputBlock(AqlCall&& call, DataRange const& inputRange) -> std::unique_ptr; - // Ensure that we have an output block of the desired dimenstions + // Ensure that we have an output block of the desired dimensions // Will as a side effect modify _outputItemRow - void ensureOutputBlock(AqlCall&& call); + void ensureOutputBlock(AqlCall&& call, DataRange const& inputRange); // Compute the next state based on the given call. // Can only be one of Skip/Produce/FullCount/FastForward/Done diff --git a/arangod/Aql/FilterExecutor.cpp b/arangod/Aql/FilterExecutor.cpp index 516c2ea4678a..f151119acfde 100644 --- a/arangod/Aql/FilterExecutor.cpp +++ b/arangod/Aql/FilterExecutor.cpp @@ -153,3 +153,14 @@ auto FilterExecutor::produceRows(AqlItemBlockInputRange& inputRange, OutputAqlIt (std::min)(clientCall.softLimit, clientCall.hardLimit); return {inputRange.upstreamState(), stats, upstreamCall}; } + +[[nodiscard]] auto FilterExecutor::expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const + noexcept -> size_t { + if (input.finalState() == ExecutorState::DONE) { + return std::min(call.getLimit(), input.countDataRows()); + } + // We do not know how many more rows will be returned from upstream. + // So we can only overestimate + return call.getLimit(); +} diff --git a/arangod/Aql/FilterExecutor.h b/arangod/Aql/FilterExecutor.h index 4b7350088180..8c443b69c5c3 100644 --- a/arangod/Aql/FilterExecutor.h +++ b/arangod/Aql/FilterExecutor.h @@ -108,6 +108,9 @@ class FilterExecutor { [[nodiscard]] std::pair expectedNumberOfRows(size_t atMost) const; + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: Infos& _infos; Fetcher& _fetcher; diff --git a/arangod/Aql/HashedCollectExecutor.cpp b/arangod/Aql/HashedCollectExecutor.cpp index 6f8e086fb6f6..4efd9f0f50f1 100644 --- a/arangod/Aql/HashedCollectExecutor.cpp +++ b/arangod/Aql/HashedCollectExecutor.cpp @@ -360,6 +360,28 @@ std::pair HashedCollectExecutor::expectedNumberOfRows(si THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } +[[nodiscard]] auto HashedCollectExecutor::expectedNumberOfRowsNew( + AqlItemBlockInputRange const& input, AqlCall const& call) const noexcept -> size_t { + if (!_isInitialized) { + if (input.finalState() == ExecutorState::DONE) { + // Worst case assumption: + // For every input row we have a new group. + // We will never produce more then asked for + auto estOnInput = input.countDataRows(); + if (estOnInput == 0 && _infos.getGroupRegisters().empty()) { + // Special case, on empty input we will produce 1 output + estOnInput = 1; + } + return std::min(call.getLimit(), estOnInput); + } + // Otherwise we do not know. + return call.getLimit(); + } + // We know how many groups we have left + return std::min(call.getLimit(), + std::distance(_currentGroup, _allGroups.end())); +} + const HashedCollectExecutor::Infos& HashedCollectExecutor::infos() const noexcept { return _infos; } diff --git a/arangod/Aql/HashedCollectExecutor.h b/arangod/Aql/HashedCollectExecutor.h index 1ff7767d1f74..49cd7a047910 100644 --- a/arangod/Aql/HashedCollectExecutor.h +++ b/arangod/Aql/HashedCollectExecutor.h @@ -168,6 +168,15 @@ class HashedCollectExecutor { */ std::pair expectedNumberOfRows(size_t atMost) const; + /** + * @brief This Executor does not know how many distinct rows will be fetched + * from upstream, it can only report how many it has found by itself, plus + * it knows that it can only create as many new rows as pulled from upstream. + * So it will overestimate. + */ + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: using AggregateValuesType = std::vector>; using GroupKeyType = std::vector; @@ -215,7 +224,7 @@ class HashedCollectExecutor { /// @brief hashmap of all encountered groups GroupMapType _allGroups; - GroupMapType::iterator _currentGroup; + GroupMapType::const_iterator _currentGroup; bool _isInitialized; // init() was called successfully (e.g. it returned DONE) diff --git a/arangod/Aql/IdExecutor.cpp b/arangod/Aql/IdExecutor.cpp index 1350038678e0..0fd9bd1aaafa 100644 --- a/arangod/Aql/IdExecutor.cpp +++ b/arangod/Aql/IdExecutor.cpp @@ -88,25 +88,22 @@ auto IdExecutor::produceRows(AqlItemBlockInputRange& inputRange, -> std::tuple { CountStats stats; TRI_ASSERT(output.numRowsWritten() == 0); - // TODO: We can implement a fastForward copy here. - // We know that all rows we have will fit into the output - while (!output.isFull() && inputRange.hasDataRow()) { - auto const& [state, inputRow] = inputRange.nextDataRow(); - TRI_ASSERT(inputRow); - + if (inputRange.hasDataRow()) { + TRI_ASSERT(!output.isFull()); TRI_IF_FAILURE("SingletonBlock::getOrSkipSome") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } + auto const& [state, inputRow] = inputRange.peekDataRow(); + + output.fastForwardAllRows(inputRow, inputRange.countDataRows()); - /*Second parameter are to ignore registers that should be kept but are missing in the input row*/ - output.copyRow(inputRow, std::is_same_v); - TRI_ASSERT(output.produced()); - output.advanceRow(); + std::ignore = inputRange.skipAllRemainingDataRows(); TRI_IF_FAILURE("SingletonBlock::getOrSkipSomeSet") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } } + TRI_ASSERT(!inputRange.hasDataRow()); if (_infos.doCount()) { stats.addCounted(output.numRowsWritten()); } diff --git a/arangod/Aql/NoResultsExecutor.cpp b/arangod/Aql/NoResultsExecutor.cpp index 0291a889bd31..a7620d3d843f 100644 --- a/arangod/Aql/NoResultsExecutor.cpp +++ b/arangod/Aql/NoResultsExecutor.cpp @@ -42,4 +42,11 @@ auto NoResultsExecutor::skipRowsRange(AqlItemBlockInputRange& inputRange, AqlCal noexcept -> std::tuple { return {inputRange.upstreamState(), NoStats{}, 0, AqlCall{0, false, 0, AqlCall::LimitType::HARD}}; -}; \ No newline at end of file +}; + +[[nodiscard]] auto NoResultsExecutor::expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const + noexcept -> size_t { + // Well nevermind the input, but we will always return 0 rows here. + return 0; +} \ No newline at end of file diff --git a/arangod/Aql/NoResultsExecutor.h b/arangod/Aql/NoResultsExecutor.h index db5f7ed91059..c12bd06842a2 100644 --- a/arangod/Aql/NoResultsExecutor.h +++ b/arangod/Aql/NoResultsExecutor.h @@ -76,6 +76,9 @@ class NoResultsExecutor { // Well nevermind the input, but we will always return 0 rows here. return {ExecutionState::DONE, 0}; } + + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; }; } // namespace aql } // namespace arangodb diff --git a/arangod/Aql/OutputAqlItemRow.cpp b/arangod/Aql/OutputAqlItemRow.cpp index 9eddbcf13a1a..b7d933018c25 100644 --- a/arangod/Aql/OutputAqlItemRow.cpp +++ b/arangod/Aql/OutputAqlItemRow.cpp @@ -166,6 +166,30 @@ void OutputAqlItemRow::copyRow(ItemRowType const& sourceRow, bool ignoreMissing) doCopyRow(sourceRow, ignoreMissing); } +auto OutputAqlItemRow::fastForwardAllRows(InputAqlItemRow const& sourceRow, size_t rows) + -> void { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + TRI_ASSERT(sourceRow.internalBlockIs(_block)); +#endif + TRI_ASSERT(_doNotCopyInputRow); + TRI_ASSERT(_call.getLimit() >= rows); + TRI_ASSERT(rows > 0); + // We have the guarantee that we have all data in our block. + // We only need to adjust internal indexes. +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + // Safely assert that the API is not missused. + TRI_ASSERT(_baseIndex + rows <= _block->size()); + for (size_t i = _baseIndex; i < _baseIndex + rows; ++i) { + TRI_ASSERT(!_block->isShadowRow(i)); + } +#endif + _baseIndex += rows; + TRI_ASSERT(_baseIndex > 0); + _lastBaseIndex = _baseIndex - 1; + _lastSourceRow = InputAqlItemRow{_block, _lastBaseIndex}; + _call.didProduce(rows); +} + void OutputAqlItemRow::copyBlockInternalRegister(InputAqlItemRow const& sourceRow, RegisterId input, RegisterId output) { // This method is only allowed if the block of the input row is the same as diff --git a/arangod/Aql/OutputAqlItemRow.h b/arangod/Aql/OutputAqlItemRow.h index 29b9e26017bf..baee6f7d44bf 100644 --- a/arangod/Aql/OutputAqlItemRow.h +++ b/arangod/Aql/OutputAqlItemRow.h @@ -99,6 +99,19 @@ class OutputAqlItemRow { void copyBlockInternalRegister(InputAqlItemRow const& sourceRow, RegisterId input, RegisterId output); + /** + * @brief This function will only work if input and output + * have the same block. + * We will "copy" all rows in the input block into the output. + * No Registers will be modified. + * + * @param sourceRow The input source row. + * Requirements: + * - Input and Output blocks are the same + * - Input and Output row position are identical + */ + auto fastForwardAllRows(InputAqlItemRow const& sourceRow, size_t rows) -> void; + [[nodiscard]] std::size_t getNrRegisters() const { return block().getNrRegs(); } diff --git a/arangod/Aql/ParallelUnsortedGatherExecutor.cpp b/arangod/Aql/ParallelUnsortedGatherExecutor.cpp index 457d6daa61c1..8f9130034b70 100644 --- a/arangod/Aql/ParallelUnsortedGatherExecutor.cpp +++ b/arangod/Aql/ParallelUnsortedGatherExecutor.cpp @@ -91,19 +91,16 @@ auto ParallelUnsortedGatherExecutor::produceRows(typename Fetcher::DataRange& in auto callSet = AqlCallSet{}; for (size_t dep = 0; dep < input.numberDependencies(); ++dep) { - while (!output.isFull()) { - auto [state, row] = input.nextDataRow(dep); - if (row) { - output.copyRow(row); - output.advanceRow(); - } else { - // This output did not produce anything - if (state == ExecutorState::HASMORE) { - callSet.calls.emplace_back( - AqlCallSet::DepCallPair{dep, upstreamCallProduce(clientCall)}); - } - break; - } + auto& range = input.rangeForDependency(dep); + while (!output.isFull() && range.hasDataRow()) { + auto [state, row] = range.nextDataRow(); + TRI_ASSERT(row); + output.copyRow(row); + output.advanceRow(); + } + // Produce a new call if necessary (we consumed all, and the state is still HASMORE) + if (!range.hasDataRow() && range.upstreamState() == ExecutorState::HASMORE) { + callSet.calls.emplace_back(AqlCallSet::DepCallPair{dep, upstreamCallProduce(clientCall)}); } } diff --git a/arangod/Aql/ReturnExecutor.cpp b/arangod/Aql/ReturnExecutor.cpp index 52a03761d39d..7a5fc4344cde 100644 --- a/arangod/Aql/ReturnExecutor.cpp +++ b/arangod/Aql/ReturnExecutor.cpp @@ -131,3 +131,13 @@ auto ReturnExecutor::produceRows(AqlItemBlockInputRange& inputRange, OutputAqlIt return {inputRange.upstreamState(), stats, output.getClientCall()}; } + +[[nodiscard]] auto ReturnExecutor::expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const + noexcept -> size_t { + if (input.finalState() == ExecutorState::DONE) { + return input.countDataRows(); + } + // Otherwise we do not know. + return call.getLimit(); +} diff --git a/arangod/Aql/ReturnExecutor.h b/arangod/Aql/ReturnExecutor.h index c2d46ae638c1..69d03fa7accd 100644 --- a/arangod/Aql/ReturnExecutor.h +++ b/arangod/Aql/ReturnExecutor.h @@ -115,6 +115,9 @@ class ReturnExecutor { [[nodiscard]] auto expectedNumberOfRows(size_t atMost) const -> std::pair; + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: ReturnExecutorInfos& _infos; Fetcher& _fetcher; diff --git a/arangod/Aql/SortedCollectExecutor.cpp b/arangod/Aql/SortedCollectExecutor.cpp index d23975c5f72c..cc12029222a0 100644 --- a/arangod/Aql/SortedCollectExecutor.cpp +++ b/arangod/Aql/SortedCollectExecutor.cpp @@ -299,6 +299,29 @@ std::pair SortedCollectExecutor::expectedNumberOfRows(si THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } +[[nodiscard]] auto SortedCollectExecutor::expectedNumberOfRowsNew( + AqlItemBlockInputRange const& input, AqlCall const& call) const noexcept -> size_t { + if (input.finalState() == ExecutorState::DONE) { + // Worst case assumption: + // For every input row we have a new group. + // If we have an open group right now, we need to add 1 to this estimate. + // We will never produce more then asked for + auto estOnInput = input.countDataRows(); + if (_currentGroup.isValid()) { + // Have one group still to write, + // that is not part of this input. + estOnInput += 1; + } + if (estOnInput == 0 && _infos.getGroupRegisters().empty()) { + // Special case, on empty input we will produce 1 output + estOnInput = 1; + } + return std::min(call.getLimit(), estOnInput); + } + // Otherwise we do not know. + return call.getLimit(); +} + auto SortedCollectExecutor::produceRows(AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) -> std::tuple { diff --git a/arangod/Aql/SortedCollectExecutor.h b/arangod/Aql/SortedCollectExecutor.h index 55e28d0585af..21c633176d55 100644 --- a/arangod/Aql/SortedCollectExecutor.h +++ b/arangod/Aql/SortedCollectExecutor.h @@ -209,6 +209,9 @@ class SortedCollectExecutor { [[nodiscard]] auto expectedNumberOfRows(size_t atMost) const -> std::pair; + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: Infos const& infos() const noexcept { return _infos; }; diff --git a/arangod/Aql/SubqueryExecutor.cpp b/arangod/Aql/SubqueryExecutor.cpp index 76be9ef6c6cc..a48d517ad51a 100644 --- a/arangod/Aql/SubqueryExecutor.cpp +++ b/arangod/Aql/SubqueryExecutor.cpp @@ -341,6 +341,16 @@ auto SubqueryExecutor::skipRowsRange<>(AqlItemBlockInputRange& inputRange, _skipped = 0; return {translatedReturnType(), NoStats{}, call.getSkipCount(), getUpstreamCall()}; } +template +[[nodiscard]] auto SubqueryExecutor::expectedNumberOfRowsNew( + AqlItemBlockInputRange const& input, AqlCall const& call) const noexcept -> size_t { + if constexpr (isModificationSubquery) { + // This executor might skip data. + // It could overfetch it before. + return std::min(call.getLimit(), input.countDataRows()); + } + return input.countDataRows(); +} template class ::arangodb::aql::SubqueryExecutor; template class ::arangodb::aql::SubqueryExecutor; diff --git a/arangod/Aql/SubqueryExecutor.h b/arangod/Aql/SubqueryExecutor.h index b28823180849..b9659b5a2178 100644 --- a/arangod/Aql/SubqueryExecutor.h +++ b/arangod/Aql/SubqueryExecutor.h @@ -133,6 +133,9 @@ class SubqueryExecutor { auto initializeSubquery(AqlItemBlockInputRange& input) -> std::tuple; + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: Fetcher& _fetcher; SubqueryExecutorInfos& _infos; diff --git a/arangod/Aql/SubqueryStartExecutor.cpp b/arangod/Aql/SubqueryStartExecutor.cpp index f854d404496d..1af0c2fe806f 100644 --- a/arangod/Aql/SubqueryStartExecutor.cpp +++ b/arangod/Aql/SubqueryStartExecutor.cpp @@ -47,8 +47,8 @@ auto SubqueryStartExecutor::produceRows(AqlItemBlockInputRange& input, OutputAql return {ExecutorState::DONE, NoStats{}, AqlCall{}}; } TRI_ASSERT(!_inputRow.isInitialized()); - TRI_ASSERT(!output.isFull()); if (input.hasDataRow()) { + TRI_ASSERT(!output.isFull()); std::tie(_upstreamState, _inputRow) = input.peekDataRow(); output.copyRow(_inputRow); output.advanceRow(); @@ -100,3 +100,20 @@ auto SubqueryStartExecutor::expectedNumberOfRows(size_t atMost) const TRI_ASSERT(false); return {ExecutionState::DONE, 0}; } + +[[nodiscard]] auto SubqueryStartExecutor::expectedNumberOfRowsNew( + AqlItemBlockInputRange const& input, AqlCall const& call) const noexcept -> size_t { + // NOTE: + // As soon as we can overfetch data, this needs to be modified to + // returnVlaue * countDataRows(); + if (input.countDataRows() > 0) { + // We will write one ShadowRow + if (call.getLimit() > 0) { + // We will write one DataRow + return 2; + } + return 1; + } + // Nothing to create here. + return 0; +} diff --git a/arangod/Aql/SubqueryStartExecutor.h b/arangod/Aql/SubqueryStartExecutor.h index 1cdd6f6fbeb0..0182e3153514 100644 --- a/arangod/Aql/SubqueryStartExecutor.h +++ b/arangod/Aql/SubqueryStartExecutor.h @@ -73,7 +73,9 @@ class SubqueryStartExecutor { std::pair expectedNumberOfRows(size_t atMost) const; - private: + [[nodiscard]] auto expectedNumberOfRowsNew(AqlItemBlockInputRange const& input, + AqlCall const& call) const noexcept -> size_t; + private: // Upstream state, used to determine if we are done with all subqueries ExecutorState _upstreamState{ExecutorState::HASMORE}; diff --git a/tests/Aql/EnumerateCollectionExecutorTest.cpp b/tests/Aql/EnumerateCollectionExecutorTest.cpp index fe4a027271c2..45f7cb5cb8e2 100644 --- a/tests/Aql/EnumerateCollectionExecutorTest.cpp +++ b/tests/Aql/EnumerateCollectionExecutorTest.cpp @@ -68,7 +68,7 @@ static const std::string GetAllDocs = using CursorType = arangodb::transaction::Methods::CursorType; -class EnumerateCollectionExecutorTest : public AqlExecutorTestCase { +class EnumerateCollectionExecutorTest : public AqlExecutorTestCase { protected: ExecutionState state; AqlItemBlockManager itemBlockManager; @@ -381,7 +381,10 @@ TEST_P(EnumerateCollectionExecutorTestProduce, DISABLED_produce_all_documents) { TEST_P(EnumerateCollectionExecutorTestProduce, DISABLED_produce_5_documents) { auto [split] = GetParam(); + uint64_t numberOfDocumentsToInsert = 10; std::vector queryResults; + // auto vpackOptions = insertDocuments(numberOfDocumentsToInsert, queryResults); + std::ignore = insertDocuments(numberOfDocumentsToInsert, queryResults); makeExecutorTestHelper<1, 1>() .setInputValue({{RowBuilder<1>{R"({ "cid" : "1337", "name": "UnitTestCollection" })"}}}) @@ -399,7 +402,9 @@ TEST_P(EnumerateCollectionExecutorTestProduce, DISABLED_produce_5_documents) { TEST_P(EnumerateCollectionExecutorTestProduce, DISABLED_skip_5_documents_default) { auto [split] = GetParam(); + uint64_t numberOfDocumentsToInsert = 10; std::vector queryResults; + std::ignore = insertDocuments(numberOfDocumentsToInsert, queryResults); makeExecutorTestHelper<1, 1>() .setInputValue({{RowBuilder<1>{R"({ "cid" : "1337", "name": diff --git a/tests/Aql/ExecutionBlockImplTest.cpp b/tests/Aql/ExecutionBlockImplTest.cpp index 97bf70ad6ec8..b99a66db3b5c 100644 --- a/tests/Aql/ExecutionBlockImplTest.cpp +++ b/tests/Aql/ExecutionBlockImplTest.cpp @@ -549,7 +549,7 @@ class SharedExecutionBlockImplTest { * @return ProduceCall The call ready to hand over to the LambdaExecutorInfos */ static auto generateProduceCall(size_t& nrCalls, AqlCall expectedCall, - size_t numRowsLeftNoInput = ExecutionBlock::DefaultBatchSize, + size_t numRowsLeftNoInput = 0, size_t numRowsLeftWithInput = ExecutionBlock::DefaultBatchSize) -> ProduceCall { return [&nrCalls, numRowsLeftNoInput, numRowsLeftWithInput, @@ -793,7 +793,7 @@ TEST_P(ExecutionBlockImplExecuteSpecificTest, test_toplevel_softlimit_call) { // in the executor. // Non passthrough the available lines (visible to executor) are only the given soft limit. ProduceCall execImpl = GetParam() ? generateProduceCall(nrCalls, fullCall, 0, 1) - : generateProduceCall(nrCalls, fullCall, 20, 20); + : generateProduceCall(nrCalls, fullCall, 0, 20); SkipCall skipCall = generateNeverSkipCall(); auto [state, skipped, block] = runTest(execImpl, skipCall, fullCall); @@ -815,7 +815,7 @@ TEST_P(ExecutionBlockImplExecuteSpecificTest, test_toplevel_hardlimit_call) { // in the executor. // Non passthrough the available lines (visible to executor) are only the given soft limit. ProduceCall execImpl = GetParam() ? generateProduceCall(nrCalls, fullCall, 0, 1) - : generateProduceCall(nrCalls, fullCall, 20, 20); + : generateProduceCall(nrCalls, fullCall, 0, 20); SkipCall skipCall = generateNeverSkipCall(); auto [state, skipped, block] = runTest(execImpl, skipCall, fullCall); diff --git a/tests/Aql/IdExecutorTest.cpp b/tests/Aql/IdExecutorTest.cpp index b507798d7603..1a2db1655a55 100644 --- a/tests/Aql/IdExecutorTest.cpp +++ b/tests/Aql/IdExecutorTest.cpp @@ -50,8 +50,7 @@ namespace arangodb::tests::aql { using TestParam = std::tuple number ExecutorState, // The upstream state AqlCall, // The client Call, - bool, // flag to decide if we need to do couting - OutputAqlItemRow::CopyRowBehavior // How the data is handled within outputRow + bool // flag to decide if we need to do couting >; class IdExecutorTestCombiner : public AqlExecutorTestCaseWithParam { @@ -72,7 +71,7 @@ class IdExecutorTestCombiner : public AqlExecutorTestCaseWithParam { } auto doCount() -> bool { - auto const& [a, b, c, doCount, d] = GetParam(); + auto const& [a, b, c, doCount] = GetParam(); return doCount; } @@ -81,17 +80,17 @@ class IdExecutorTestCombiner : public AqlExecutorTestCaseWithParam { } auto getInput() -> size_t { - auto const& [input, a, b, c, d] = GetParam(); + auto const& [input, a, b, c] = GetParam(); return input; } auto getCall() -> AqlCall { - auto const& [a, b, call, c, d] = GetParam(); + auto const& [a, b, call, c] = GetParam(); return call; } auto getUpstreamState() -> ExecutorState { - auto const& [a, state, b, c, d] = GetParam(); + auto const& [a, state, b, c] = GetParam(); return state; } @@ -153,23 +152,11 @@ class IdExecutorTestCombiner : public AqlExecutorTestCaseWithParam { auto toWrite = make_shared_unordered_set({}); auto toKeep = make_shared_unordered_set({0}); auto toClear = make_shared_unordered_set(); - auto const& [unused, upstreamState, clientCall, unused2, copyBehaviour] = GetParam(); + auto const& [unused, upstreamState, clientCall, unused2] = GetParam(); AqlCall callCopy = clientCall; - if (copyBehaviour == OutputAqlItemRow::CopyRowBehavior::DoNotCopyInputRows) { - // For passthrough we reuse the block - return OutputAqlItemRow(input, toWrite, toKeep, toClear, - std::move(callCopy), copyBehaviour); - } - // Otherwise we need to create a fresh block (or forward nullptr) - if (input == nullptr) { - SharedAqlItemBlockPtr outBlock{nullptr}; - return OutputAqlItemRow(outBlock, toWrite, toKeep, toClear, - std::move(callCopy), copyBehaviour); - } - SharedAqlItemBlockPtr outBlock{ - new AqlItemBlock(manager(), input->size(), input->getNrRegs())}; - return OutputAqlItemRow(outBlock, toWrite, toKeep, toClear, - std::move(callCopy), copyBehaviour); + // For passthrough we reuse the block + return OutputAqlItemRow(input, toWrite, toKeep, toClear, std::move(callCopy), + OutputAqlItemRow::CopyRowBehavior::DoNotCopyInputRows); } }; @@ -270,13 +257,9 @@ auto clientCalls = testing::Values(AqlCall{}, // unlimited call AqlCall{0, AqlCall::Infinity{}, 7, true} // hardlimit call (note this is larger than length of input data), with fullcount ); -auto copyBehaviours = testing::Values(OutputAqlItemRow::CopyRowBehavior::CopyInputRows, // Create a new row and write the data - OutputAqlItemRow::CopyRowBehavior::DoNotCopyInputRows // Just passthrough (production) -); - INSTANTIATE_TEST_CASE_P(IdExecutorTest, IdExecutorTestCombiner, ::testing::Combine(inputs, upstreamStates, clientCalls, - ::testing::Bool(), copyBehaviours)); + ::testing::Bool())); class IdExecutionBlockTest : public AqlExecutorTestCase<> {};