diff --git a/CHANGELOG b/CHANGELOG index df9724f46568..6b251f2307ac 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,10 @@ devel ----- +* Make AQL modification operations in a cluster asynchronous. This allows to + free the thread for other work until both the write and synchronous + replication are complete. + * When creating Pregel memory-mapped files, create them with O_TMPFILE attribute on Linux so that files are guaranteed to vanish even if a process dies. diff --git a/arangod/Aql/AqlCall.h b/arangod/Aql/AqlCall.h index b830a4d1a1cb..79f5522fae8e 100644 --- a/arangod/Aql/AqlCall.h +++ b/arangod/Aql/AqlCall.h @@ -191,7 +191,7 @@ struct AqlCall { TRI_ASSERT(n <= i); i -= n; }, - [](auto) {}, + [](Infinity) {}, }; std::visit(minus, softLimit); std::visit(minus, hardLimit); diff --git a/arangod/Aql/AqlExecuteResult.cpp b/arangod/Aql/AqlExecuteResult.cpp index a5e9a45e0bd9..6428a77fa2d5 100644 --- a/arangod/Aql/AqlExecuteResult.cpp +++ b/arangod/Aql/AqlExecuteResult.cpp @@ -35,6 +35,7 @@ #include #include +#include using namespace arangodb; using namespace arangodb::aql; @@ -49,7 +50,7 @@ auto getStringView(velocypack::Slice slice) -> std::string_view { AqlExecuteResult::AqlExecuteResult(ExecutionState state, SkipResult skipped, SharedAqlItemBlockPtr&& block) - : _state(state), _skipped(skipped), _block(std::move(block)) { + : _state(state), _skipped(std::move(skipped)), _block(std::move(block)) { // Make sure we only produce a valid response // The block should have checked as well. // We must return skipped and/or data when reporting HASMORE diff --git a/arangod/Aql/AqlItemBlock.cpp b/arangod/Aql/AqlItemBlock.cpp index e81d4644be35..1c7d62923900 100644 --- a/arangod/Aql/AqlItemBlock.cpp +++ b/arangod/Aql/AqlItemBlock.cpp @@ -1169,7 +1169,7 @@ size_t AqlItemBlock::maxModifiedEntries() const noexcept { return _numRegisters size_t AqlItemBlock::capacity() const noexcept { return _data.capacity(); } -bool AqlItemBlock::isShadowRow(size_t row) const { +bool AqlItemBlock::isShadowRow(size_t row) const noexcept { return _shadowRows.is(row); } diff --git a/arangod/Aql/AqlItemBlock.h b/arangod/Aql/AqlItemBlock.h index fcedb1faf3d2..94aee60ff155 100644 --- a/arangod/Aql/AqlItemBlock.h +++ b/arangod/Aql/AqlItemBlock.h @@ -298,7 +298,7 @@ class AqlItemBlock { /// @brief test if the given row is a shadow row and conveys subquery /// information only. It should not be handed to any non-subquery executor. - bool isShadowRow(size_t row) const; + bool isShadowRow(size_t row) const noexcept; /// @brief get the ShadowRowDepth /// Does only work if this row is a shadow row diff --git a/arangod/Aql/AqlItemBlockInputMatrix.cpp b/arangod/Aql/AqlItemBlockInputMatrix.cpp index 4091ac601ff7..f1f70300ab61 100644 --- a/arangod/Aql/AqlItemBlockInputMatrix.cpp +++ b/arangod/Aql/AqlItemBlockInputMatrix.cpp @@ -56,7 +56,7 @@ AqlItemBlockInputRange& AqlItemBlockInputMatrix::getInputRange() { if (_lastRange.hasDataRow()) { return _lastRange; } - // Need initialze lastRange + // Need initialize lastRange if (_aqlItemMatrix->numberOfBlocks() == 0) { _lastRange = {AqlItemBlockInputRange{upstreamState()}}; } else { @@ -69,7 +69,6 @@ AqlItemBlockInputRange& AqlItemBlockInputMatrix::getInputRange() { std::pair AqlItemBlockInputMatrix::getMatrix() noexcept { TRI_ASSERT(_aqlItemMatrix != nullptr); - TRI_ASSERT(!_shadowRow.isInitialized()); // We are always done. This InputMatrix // guarantees that we have all data in our hand at once. @@ -95,7 +94,7 @@ bool AqlItemBlockInputMatrix::hasValidRow() const noexcept { bool AqlItemBlockInputMatrix::hasDataRow() const noexcept { return _aqlItemMatrix != nullptr && !hasShadowRow() && - ((_aqlItemMatrix->stoppedOnShadowRow()) || + (_aqlItemMatrix->stoppedOnShadowRow() || (_aqlItemMatrix->size() > 0 && _finalState == ExecutorState::DONE)); } diff --git a/arangod/Aql/AqlItemBlockInputMatrix.h b/arangod/Aql/AqlItemBlockInputMatrix.h index d712c86431ab..49b9d63f94f0 100644 --- a/arangod/Aql/AqlItemBlockInputMatrix.h +++ b/arangod/Aql/AqlItemBlockInputMatrix.h @@ -63,7 +63,6 @@ class AqlItemBlockInputMatrix { size_t skipAllShadowRowsOfDepth(size_t depth); - // Will return HASMORE if we were able to increase the row index. // Otherwise will return DONE. ExecutorState incrBlockIndex(); diff --git a/arangod/Aql/AqlItemMatrix.cpp b/arangod/Aql/AqlItemMatrix.cpp index cbe9d2791e28..835c1de57e4c 100644 --- a/arangod/Aql/AqlItemMatrix.cpp +++ b/arangod/Aql/AqlItemMatrix.cpp @@ -245,3 +245,94 @@ AqlItemMatrix::AqlItemMatrix(RegisterCount nrRegs) clear(); return {skipped, ShadowAqlItemRow{CreateInvalidShadowRowHint()}}; } + +AqlItemMatrix::RowIterator AqlItemMatrix::begin() const { + if (size() > 0) { + return {this, 0, _startIndexInFirstBlock}; + } else { + return end(); + } +} + +AqlItemMatrix::RowIterator AqlItemMatrix::end() const { + return {this, this->numberOfBlocks(), 0}; +} + +AqlItemMatrix::RowIterator::RowIterator(AqlItemMatrix const* matrix, size_t blockIndex, size_t rowIndex) + : _matrix(matrix), _blockIndex(blockIndex), _rowIndex(rowIndex) {} + +AqlItemMatrix::RowIterator::value_type AqlItemMatrix::RowIterator::next() noexcept { + auto& it = *this; + auto ret = *it; + ++it; + return ret; +} + +auto AqlItemMatrix::RowIterator::isInitialized() const noexcept -> bool { + return _matrix != nullptr; +} + +auto AqlItemMatrix::RowIterator::hasMore() const noexcept -> bool { + // _blockIndex == _matrix->size() => _rowIndex == 0 + TRI_ASSERT((_matrix != nullptr && _blockIndex < _matrix->numberOfBlocks()) || _rowIndex == 0); + // If _blockIndex is valid, _rowIndex must be, too. + return ADB_LIKELY(_matrix != nullptr) && _blockIndex < _matrix->numberOfBlocks(); +} + +AqlItemMatrix::RowIterator::value_type AqlItemMatrix::RowIterator::operator*() const noexcept { + return {_matrix->getBlock(_blockIndex).first, _rowIndex}; +} + +AqlItemMatrix::RowIterator& AqlItemMatrix::RowIterator::operator++() noexcept { + // Assume ++ is only called on a valid and dereferenceable iterator + TRI_ASSERT(_matrix != nullptr); + TRI_ASSERT(_blockIndex < _matrix->numberOfBlocks()); + auto const* block = _matrix->getBlockRef(_blockIndex).first; + TRI_ASSERT(_rowIndex < block->numRows()); + TRI_ASSERT(!block->isShadowRow(_rowIndex)); + + // Increase the row index + ++_rowIndex; + if (_rowIndex >= block->numRows()) { + // If the row index is invalid, move to the next block. + // If the block index is now invalid, this is equal to the "end()" + // iterator. + ++_blockIndex; + _rowIndex = 0; + } + + if (_blockIndex < _matrix->numberOfBlocks()) { + block = _matrix->getBlockRef(_blockIndex).first; + if (block->isShadowRow(_rowIndex)) { + // If we're at a shadow row, this must be the last block. + TRI_ASSERT(_blockIndex + 1 == _matrix->numberOfBlocks()); + // This makes this equal to the "end()" iterator. + ++_blockIndex; + _rowIndex = 0; + } + } + + return *this; +} + +auto AqlItemMatrix::RowIterator::operator++(int) & noexcept -> AqlItemMatrix::RowIterator { + auto tmp = *this; + ++(*this); + return tmp; +} + +AqlItemMatrix::RowIterator::operator bool() const noexcept { + return hasMore(); +} + +bool aql::operator==(AqlItemMatrix::RowIterator const& a, + AqlItemMatrix::RowIterator const& b) { + return ADB_LIKELY(a._matrix == b._matrix) && + (ADB_UNLIKELY(a._matrix == nullptr /* => b._matrix == nullptr */) || + (ADB_LIKELY(a._blockIndex == b._blockIndex) && a._rowIndex == b._rowIndex)); +} + +bool aql::operator!=(AqlItemMatrix::RowIterator const& a, + AqlItemMatrix::RowIterator const& b) { + return !(a == b); +} diff --git a/arangod/Aql/AqlItemMatrix.h b/arangod/Aql/AqlItemMatrix.h index b59432574668..3644c543ea31 100644 --- a/arangod/Aql/AqlItemMatrix.h +++ b/arangod/Aql/AqlItemMatrix.h @@ -29,8 +29,7 @@ #include #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { class InputAqlItemRow; class SharedAqlItemBlockPtr; @@ -119,6 +118,49 @@ class AqlItemMatrix { [[nodiscard]] auto skipAllShadowRowsOfDepth(size_t depth) -> std::tuple; + class RowIterator { + public: + using value_type = InputAqlItemRow; + + RowIterator() = default; + RowIterator(AqlItemMatrix const* matrix, size_t blockIndex, size_t rowIndex); + + // Returns the current value, and move the iterator to the next value + value_type next() noexcept; + + auto isInitialized() const noexcept -> bool; + + // Returns whether the current value is valid, i.e. whether next() may be + // called + auto hasMore() const noexcept -> bool; + + value_type operator*() const noexcept; + + // This can't be implemented, as we can only create the InputAqlItemRow + // on-the-fly. + // pointer operator->(); + + // Prefix increment + RowIterator& operator++() noexcept; + + // Postfix increment. + auto operator++(int) & noexcept -> RowIterator; + + explicit operator bool() const noexcept; + + friend bool operator==(RowIterator const& a, RowIterator const& b); + friend bool operator!=(RowIterator const& a, RowIterator const& b); + + private: + AqlItemMatrix const* _matrix{}; + std::size_t _blockIndex{}; + // Invariant: _rowIndex is valid iff _blockIndex is valid. + std::size_t _rowIndex{}; + }; + + [[nodiscard]] RowIterator begin() const; + [[nodiscard]] RowIterator end() const; + private: std::vector _blocks; @@ -129,6 +171,7 @@ class AqlItemMatrix { size_t _stopIndexInLastBlock; }; -} // namespace aql -} // namespace arangodb +bool operator==(AqlItemMatrix::RowIterator const& a, AqlItemMatrix::RowIterator const& b); +bool operator!=(AqlItemMatrix::RowIterator const& a, AqlItemMatrix::RowIterator const& b); +} // namespace arangodb::aql diff --git a/arangod/Aql/ClusterNodes.cpp b/arangod/Aql/ClusterNodes.cpp index 91eb7f96e494..a00486f0a338 100644 --- a/arangod/Aql/ClusterNodes.cpp +++ b/arangod/Aql/ClusterNodes.cpp @@ -601,6 +601,7 @@ std::unique_ptr SingleRemoteOperationNode::createBlock( std::move(writableOutputRegisters)); auto executorInfos = SingleRemoteModificationInfos( + &engine, in, outputNew, outputOld, out, _plan->getAst()->query(), std::move(options), collection(), ConsultAqlWriteFilter(_options.consultAqlWriteFilter), IgnoreErrors(_options.ignoreErrors), diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 038a99f24632..534f577fd16e 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -165,6 +165,17 @@ constexpr bool executorHasSideEffects = ModificationExecutor, ModificationExecutor, UpsertModifier>>; +template +constexpr bool executorCanReturnWaiting = + is_one_of_v, + ModificationExecutor, InsertModifier>, + ModificationExecutor, + ModificationExecutor, RemoveModifier>, + ModificationExecutor, + ModificationExecutor, UpdateReplaceModifier>, + ModificationExecutor, + ModificationExecutor, UpsertModifier>>; + template ExecutionBlockImpl::ExecutionBlockImpl(ExecutionEngine* engine, ExecutionNode const* node, @@ -602,9 +613,9 @@ static SkipRowsRangeVariant constexpr skipRowsType() { useExecutor == (is_one_of_v< Executor, FilterExecutor, ShortestPathExecutor, ReturnExecutor, - KShortestPathsExecutor, - KShortestPathsExecutor, KShortestPathsExecutor, - KShortestPathsExecutor, KShortestPathsExecutor, ParallelUnsortedGatherExecutor, + KShortestPathsExecutor, KShortestPathsExecutor, + KShortestPathsExecutor, KShortestPathsExecutor, + KShortestPathsExecutor, ParallelUnsortedGatherExecutor, IdExecutor>, IdExecutor, HashedCollectExecutor, AccuWindowExecutor, WindowExecutor, IndexExecutor, EnumerateCollectionExecutor, DistinctCollectExecutor, ConstrainedSortExecutor, CountCollectExecutor, @@ -686,7 +697,7 @@ static auto fastForwardType(AqlCall const& call, Executor const& e) -> FastForwa // TODO: We only need to do this if the executor is required to call. // e.g. Modifications and SubqueryStart will always need to be called. Limit only if it needs to report fullCount if constexpr (is_one_of_v || - executorHasSideEffects) { + executorHasSideEffects || executorCanReturnWaiting) { return FastForwardVariant::EXECUTOR; } return FastForwardVariant::FETCHER; @@ -799,6 +810,9 @@ auto ExecutionBlockImpl::executeProduceRows(typename Fetcher::DataRang if constexpr (isMultiDepExecutor) { TRI_ASSERT(input.numberDependencies() == _dependencies.size()); return _executor.produceRows(input, output); + } else if constexpr (executorCanReturnWaiting) { + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL_AQL); } else { return _executor.produceRows(input, output); } @@ -819,6 +833,9 @@ auto ExecutionBlockImpl::executeSkipRowsRange(typename Fetcher::DataRa auto res = _executor.skipRowsRange(inputRange, call); _executorReturnedDone = std::get(res) == ExecutorState::DONE; return res; + } else if constexpr (executorCanReturnWaiting) { + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL_AQL); } else { auto [state, stats, skipped, localCall] = _executor.skipRowsRange(inputRange, call); _executorReturnedDone = state == ExecutorState::DONE; @@ -1237,13 +1254,19 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack const& callStack) ExecutorState localExecutorState = ExecutorState::DONE; - // We can only have returned the following internal states - TRI_ASSERT(_execState == ExecState::CHECKCALL || _execState == ExecState::SHADOWROWS || - _execState == ExecState::UPSTREAM); + if constexpr (executorCanReturnWaiting) { + TRI_ASSERT(_execState == ExecState::CHECKCALL || _execState == ExecState::SHADOWROWS || + _execState == ExecState::UPSTREAM || _execState == ExecState::PRODUCE || + _execState == ExecState::SKIP || _execState == ExecState::FASTFORWARD); + } else { + // We can only have returned the following internal states + TRI_ASSERT(_execState == ExecState::CHECKCALL || _execState == ExecState::SHADOWROWS || + _execState == ExecState::UPSTREAM); - // Skip can only be > 0 if we are in upstream cases, or if we got injected a block - TRI_ASSERT(_skipped.nothingSkipped() || _execState == ExecState::UPSTREAM || - (std::is_same_v>)); + // Skip can only be > 0 if we are in upstream cases, or if we got injected a block + TRI_ASSERT(_skipped.nothingSkipped() || _execState == ExecState::UPSTREAM || + (std::is_same_v>)); + } if constexpr (Executor::Properties::allowsBlockPassthrough == BlockPassthrough::Disable && !executorHasSideEffects) { @@ -1336,6 +1359,24 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack const& callStack) ctx.stack = _stackBeforeWaiting; } + if constexpr (executorCanReturnWaiting) { + // If state is SKIP, PRODUCE or FASTFORWARD, we were WAITING. + // The call stack must be restored in all cases, but only SKIP needs to + // restore the clientCall. + switch (_execState) { + default: + break; + case ExecState::SKIP: + TRI_ASSERT(_clientRequest.requestLessDataThan(ctx.clientCall)); + ctx.clientCall = _clientRequest; + [[fallthrough]]; + case ExecState::PRODUCE: + case ExecState::FASTFORWARD: + TRI_ASSERT(_stackBeforeWaiting.requestLessDataThan(ctx.stack)); + ctx.stack = _stackBeforeWaiting; + } + } + auto returnToState = ExecState::CHECKCALL; LOG_QUERY("007ac", DEBUG) << "starting statemachine of executor " << printBlockInfo(); @@ -1368,8 +1409,35 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack const& callStack) #endif LOG_QUERY("1f786", DEBUG) << printTypeInfo() << " call skipRows " << ctx.clientCall; - // Execute skipSome - auto [state, stats, skippedLocal, call] = executeSkipRowsRange(_lastRange, ctx.clientCall); + ExecutorState state = ExecutorState::HASMORE; + typename Executor::Stats stats; + size_t skippedLocal = 0; + AqlCallType call{}; + if constexpr (executorCanReturnWaiting) { + TRI_DEFER(ctx.clientCall.resetSkipCount()); + ExecutionState executorState = ExecutionState::HASMORE; + std::tie(executorState, stats, skippedLocal, call) = + _executor.skipRowsRange(_lastRange, ctx.clientCall); + + if (executorState == ExecutionState::WAITING) { + // We need to persist the old call before we return. + // We might have some local accounting to this call. + _clientRequest = ctx.clientCall; + // We might also have some local accounting in this stack. + _stackBeforeWaiting = ctx.stack; + // We do not return anything in WAITING state, also NOT skipped. + TRI_ASSERT(skippedLocal == 0); + return {executorState, SkipResult{}, nullptr}; + } else if (executorState == ExecutionState::DONE) { + state = ExecutorState::DONE; + } else { + state = ExecutorState::HASMORE; + } + } else { + // Execute skipSome + std::tie(state, stats, skippedLocal, call) = + executeSkipRowsRange(_lastRange, ctx.clientCall); + } #ifdef ARANGODB_ENABLE_MAINTAINER_MODE // Assertion: We did skip 'skippedLocal' documents here. @@ -1431,9 +1499,30 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack const& callStack) } TRI_ASSERT(_outputItemRow); - // Execute getSome - auto [state, stats, call] = executeProduceRows(_lastRange, *_outputItemRow); - + TRI_ASSERT(!_executorReturnedDone); + ExecutorState state = ExecutorState::HASMORE; + typename Executor::Stats stats; + auto call = AqlCallType{}; + if constexpr (executorCanReturnWaiting) { + ExecutionState executorState = ExecutionState::HASMORE; + std::tie(executorState, stats, call) = + _executor.produceRows(_lastRange, *_outputItemRow); + + if (executorState == ExecutionState::WAITING) { + // We need to persist the old stack before we return. + // We might have some local accounting in this stack. + _stackBeforeWaiting = ctx.stack; + // We do not return anything in WAITING state, also NOT skipped. + return {executorState, SkipResult{}, nullptr}; + } else if (executorState == ExecutionState::DONE) { + state = ExecutorState::DONE; + } else { + state = ExecutorState::HASMORE; + } + } else { + // Execute getSome + std::tie(state, stats, call) = executeProduceRows(_lastRange, *_outputItemRow); + } _executorReturnedDone = state == ExecutorState::DONE; _blockStats += stats; localExecutorState = state; @@ -1476,8 +1565,41 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack const& callStack) } } - // Execute skipSome - auto [state, stats, skippedLocal, call] = executeFastForward(_lastRange, callCopy); + ExecutorState state = ExecutorState::HASMORE; + typename Executor::Stats stats; + size_t skippedLocal = 0; + AqlCallType call{}; + + if constexpr (executorCanReturnWaiting) { + ExecutionState executorState = ExecutionState::HASMORE; + + AqlCall dummy; + dummy.hardLimit = 0u; + dummy.fullCount = true; + std::tie(executorState, stats, skippedLocal, call) = + _executor.skipRowsRange(_lastRange, dummy); + if (executorState == ExecutionState::WAITING) { + // We need to persist the old stack before we return. + // We might have some local accounting in this stack. + _stackBeforeWaiting = ctx.stack; + // We do not return anything in WAITING state, also NOT skipped. + TRI_ASSERT(skippedLocal == 0); + return {executorState, SkipResult{}, nullptr}; + } else if (executorState == ExecutionState::DONE) { + state = ExecutorState::DONE; + } else { + state = ExecutorState::HASMORE; + } + + if (!callCopy.needsFullCount()) { + // We forget that we skipped + skippedLocal = 0; + } + } else { + // Execute skipSome + std::tie(state, stats, skippedLocal, call) = + executeFastForward(_lastRange, callCopy); + } if constexpr (executorHasSideEffects) { if (!ctx.stack.needToSkipSubquery()) { @@ -1737,7 +1859,7 @@ ExecutionBlockImpl::executeWithoutTrace(AqlCallStack const& callStack) _skipped.reset(); if (localExecutorState == ExecutorState::HASMORE || _lastRange.hasDataRow() || _lastRange.hasShadowRow()) { - // We have skipped or/and return data, otherwise we cannot return HASMORE + // We have skipped or/and returned data, otherwise we cannot return HASMORE TRI_ASSERT(!skipped.nothingSkipped() || (outputBlock != nullptr && outputBlock->numRows() > 0)); return {ExecutionState::HASMORE, skipped, std::move(outputBlock)}; diff --git a/arangod/Aql/ExecutionState.cpp b/arangod/Aql/ExecutionState.cpp index 35ed458c52c3..c8753ed13c72 100644 --- a/arangod/Aql/ExecutionState.cpp +++ b/arangod/Aql/ExecutionState.cpp @@ -25,8 +25,7 @@ #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { std::ostream& operator<<(std::ostream& ostream, ExecutionState state) { switch (state) { @@ -51,12 +50,8 @@ std::ostream& operator<<(std::ostream& ostream, ExecutorState state) { case ExecutorState::HASMORE: ostream << "HASMORE"; break; - default: - ostream << " WAT WAT WAT"; - break; } return ostream; } -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/ExecutionState.h b/arangod/Aql/ExecutionState.h index ed5d6615e0ed..e2f9f8c962b7 100644 --- a/arangod/Aql/ExecutionState.h +++ b/arangod/Aql/ExecutionState.h @@ -25,8 +25,7 @@ #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { enum class ExecutionState { // done with this block, definitely no more results @@ -57,5 +56,4 @@ std::ostream& operator<<(std::ostream& ostream, ExecutionState state); std::ostream& operator<<(std::ostream& ostream, ExecutorState state); -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/InsertModifier.cpp b/arangod/Aql/InsertModifier.cpp index 64a7147ced39..3eda23a2dc04 100644 --- a/arangod/Aql/InsertModifier.cpp +++ b/arangod/Aql/InsertModifier.cpp @@ -50,6 +50,6 @@ ModifierOperationType InsertModifierCompletion::accumulate(ModificationExecutorA } } -OperationResult InsertModifierCompletion::transact(transaction::Methods& trx, VPackSlice const& data) { - return trx.insert(_infos._aqlCollection->name(), data, _infos._options); +futures::Future InsertModifierCompletion::transact(transaction::Methods& trx, VPackSlice const& data) { + return trx.insertAsync(_infos._aqlCollection->name(), data, _infos._options); } diff --git a/arangod/Aql/InsertModifier.h b/arangod/Aql/InsertModifier.h index ad77b976ad6a..dc33fe36da11 100644 --- a/arangod/Aql/InsertModifier.h +++ b/arangod/Aql/InsertModifier.h @@ -26,9 +26,9 @@ #include "Aql/ModificationExecutor.h" #include "Aql/ModificationExecutorAccumulator.h" #include "Aql/ModificationExecutorInfos.h" +#include "Futures/Future.h" -namespace arangodb { -namespace aql { +namespace arangodb::aql { struct ModificationExecutorInfos; @@ -40,11 +40,10 @@ class InsertModifierCompletion { ModifierOperationType accumulate(ModificationExecutorAccumulator& accu, InputAqlItemRow& row); - OperationResult transact(transaction::Methods& trx, VPackSlice const& data); + futures::Future transact(transaction::Methods& trx, VPackSlice const& data); private: ModificationExecutorInfos& _infos; }; -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/ModificationExecutor.cpp b/arangod/Aql/ModificationExecutor.cpp index 91e88b4b0bd2..33c20accee8c 100644 --- a/arangod/Aql/ModificationExecutor.cpp +++ b/arangod/Aql/ModificationExecutor.cpp @@ -29,37 +29,38 @@ #include "Aql/OutputAqlItemRow.h" #include "Aql/QueryContext.h" #include "Aql/SingleRowFetcher.h" -#include "Basics/Common.h" -#include "Basics/VelocyPackHelper.h" +#include "Basics/application-exit.h" #include "StorageEngine/TransactionState.h" -#include "VocBase/LogicalCollection.h" -#include "Aql/InsertModifier.h" -#include "Aql/RemoveModifier.h" #include "Aql/SimpleModifier.h" -#include "Aql/UpdateReplaceModifier.h" #include "Aql/UpsertModifier.h" -#include "Logger/LogMacros.h" - +#include #include -#include "velocypack/velocypack-aliases.h" using namespace arangodb; using namespace arangodb::aql; using namespace arangodb::basics; -namespace arangodb { -namespace aql { +namespace arangodb::aql { + +namespace { +auto translateReturnType(ExecutorState state) noexcept -> ExecutionState { + if (state == ExecutorState::DONE) { + return ExecutionState::DONE; + } + return ExecutionState::HASMORE; +} +} // namespace ModifierOutput::ModifierOutput(InputAqlItemRow const& inputRow, Type type) - : _inputRow(std::move(inputRow)), _type(type), _oldValue(), _newValue() {} + : _inputRow(inputRow), _type(type), _oldValue(), _newValue() {} ModifierOutput::ModifierOutput(InputAqlItemRow&& inputRow, Type type) : _inputRow(std::move(inputRow)), _type(type), _oldValue(), _newValue() {} ModifierOutput::ModifierOutput(InputAqlItemRow const& inputRow, Type type, AqlValue const& oldValue, AqlValue const& newValue) - : _inputRow(std::move(inputRow)), + : _inputRow(inputRow), _type(type), _oldValue(oldValue), _oldValueGuard(std::in_place, _oldValue.value(), true), @@ -89,184 +90,308 @@ AqlValue const& ModifierOutput::getNewValue() const { } template -ModificationExecutor::ModificationExecutor(Fetcher& fetcher, Infos& infos) - : _trx(infos._query.newTrxContext()), _lastState(ExecutionState::HASMORE), _infos(infos), _modifier(infos) {} - -// Fetches as many rows as possible from upstream and accumulates results -// through the modifier -template -auto ModificationExecutor::doCollect(AqlItemBlockInputRange& input, - size_t maxOutputs) - -> void { - ExecutionState state = ExecutionState::HASMORE; - - // Maximum number of rows we can put into output - // So we only ever produce this many here - while (_modifier.nrOfOperations() < maxOutputs && input.hasDataRow()) { - auto [state, row] = input.nextDataRow(AqlItemBlockInputRange::HasDataRow{}); - - // Make sure we have a valid row - TRI_ASSERT(row.isInitialized()); - _modifier.accumulate(row); - } - TRI_ASSERT(state == ExecutionState::DONE || state == ExecutionState::HASMORE); -} +ModificationExecutor::ModificationExecutor(Fetcher& fetcher, + Infos& infos) + : _trx(infos._query.newTrxContext()), + _infos(infos), + _modifier(std::make_shared(infos)) {} -// Outputs accumulated results, and counts the statistics template -void ModificationExecutor::doOutput(OutputAqlItemRow& output, - Stats& stats) { - typename ModifierType::OutputIterator modifierOutputIterator(_modifier); - // We only accumulated as many items as we can output, so this - // should be correct - for (auto const& modifierOutput : modifierOutputIterator) { - TRI_ASSERT(!output.isFull()); - bool written = false; - switch (modifierOutput.getType()) { - case ModifierOutput::Type::ReturnIfRequired: - if (_infos._options.returnOld) { - output.cloneValueInto(_infos._outputOldRegisterId, modifierOutput.getInputRow(), - modifierOutput.getOldValue()); - written = true; - } - if (_infos._options.returnNew) { - output.cloneValueInto(_infos._outputNewRegisterId, modifierOutput.getInputRow(), - modifierOutput.getNewValue()); - written = true; - } - [[fallthrough]]; - case ModifierOutput::Type::CopyRow: - if (!written) { - output.copyRow(modifierOutput.getInputRow()); - } - output.advanceRow(); - break; - case ModifierOutput::Type::SkipRow: - // nothing. - break; - } +template +[[nodiscard]] auto ModificationExecutor::produceOrSkip( + typename FetcherType::DataRange& input, ProduceOrSkipData& produceOrSkipData) + -> std::tuple { + TRI_IF_FAILURE("ModificationBlock::getSome") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } -} -template -[[nodiscard]] auto ModificationExecutor::produceRows( - typename FetcherType::DataRange& input, OutputAqlItemRow& output) - -> std::tuple { AqlCall upstreamCall{}; if constexpr (std::is_same_v && !std::is_same_v) { - upstreamCall.softLimit = _modifier.getBatchSize(); + upstreamCall.softLimit = _modifier->getBatchSize(); } + + // Try to initialize the RangeHandler + if (!_rangeHandler.init(input)) { + return {translateReturnType(_rangeHandler.upstreamState(input)), + ModificationStats{}, upstreamCall}; + } + auto stats = ModificationStats{}; - _modifier.reset(); - if (!input.hasDataRow()) { - // Input is empty - return {input.upstreamState(), stats, upstreamCall}; + auto const maxRows = std::invoke([&] { + if constexpr (std::is_same_v && + !std::is_same_v) { + return std::min(produceOrSkipData.maxOutputRows(), _modifier->getBatchSize()); + } else { + return produceOrSkipData.maxOutputRows(); + } + }); + + // Read as much input as possible + while (_rangeHandler.hasDataRow(input) && _modifier->nrOfOperations() < maxRows) { + auto row = _rangeHandler.nextDataRow(input); + + TRI_ASSERT(row.isInitialized()); + _modifier->accumulate(row); } - TRI_IF_FAILURE("ModificationBlock::getSome") { - THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); + bool const inputDone = _rangeHandler.upstreamState(input) == ExecutorState::DONE; + bool const enoughOutputAvailable = maxRows == _modifier->nrOfOperations(); + bool const hasAtLeastOneModification = _modifier->nrOfOperations() > 0; + + // outputFull => enoughOutputAvailable + TRI_ASSERT(produceOrSkipData.needMoreOutput() || enoughOutputAvailable); + + if (!inputDone && !enoughOutputAvailable) { + // This case handles when there's still work to do, but no input in the + // current range. + TRI_ASSERT(!_rangeHandler.hasDataRow(input)); + + return {ExecutionState::HASMORE, stats, upstreamCall}; } - // only produce at most output.numRowsLeft() many results - ExecutorState upstreamState = ExecutorState::HASMORE; - if constexpr (std::is_same_v) { - auto& range = input.getInputRange(); - doCollect(range, output.numRowsLeft()); - upstreamState = range.upstreamState(); - if (upstreamState == ExecutorState::DONE) { - // We are done with this input. - // We need to forward it to the last ShadowRow. - input.skipAllRemainingDataRows(); + if (hasAtLeastOneModification) { + TRI_ASSERT(inputDone || enoughOutputAvailable); + ExecutionState modifierState = _modifier->transact(_trx); + + if (modifierState == ExecutionState::WAITING) { + TRI_ASSERT(!ServerState::instance()->isSingleServer()); + return {ExecutionState::WAITING, stats, upstreamCall}; } - } else { - doCollect(input, output.numRowsLeft()); - upstreamState = input.upstreamState(); - } - if (_modifier.nrOfOperations() > 0) { - _modifier.transact(_trx); + TRI_ASSERT(_modifier->hasResultOrException()); + + _modifier->checkException(); if (_infos._doCount) { - stats.addWritesExecuted(_modifier.nrOfWritesExecuted()); - stats.addWritesIgnored(_modifier.nrOfWritesIgnored()); + stats.addWritesExecuted(_modifier->nrOfWritesExecuted()); + stats.addWritesIgnored(_modifier->nrOfWritesIgnored()); } - doOutput(output, stats); + produceOrSkipData.doOutput(); + _modifier->reset(); } - return {upstreamState, stats, upstreamCall}; + TRI_ASSERT(_modifier->hasNeitherResultNorOperationPending()); + + return {translateReturnType(_rangeHandler.upstreamState(input)), stats, upstreamCall}; } template -[[nodiscard]] auto ModificationExecutor::skipRowsRange( - typename FetcherType::DataRange& input, AqlCall& call) - -> std::tuple { - AqlCall upstreamCall{}; - if constexpr (std::is_same_v && - !std::is_same_v) { - upstreamCall.softLimit = _modifier.getBatchSize(); +[[nodiscard]] auto ModificationExecutor::produceRows( + typename FetcherType::DataRange& input, OutputAqlItemRow& output) + -> std::tuple { + struct ProduceData final : public IProduceOrSkipData { + OutputAqlItemRow& _output; + ModificationExecutorInfos& _infos; + ModifierType& _modifier; + bool _first = true; + + ~ProduceData() final = default; + ProduceData(OutputAqlItemRow& output, ModificationExecutorInfos& infos, ModifierType& modifier) + : _output(output), _infos(infos), _modifier(modifier) {} + + void doOutput() final { + typename ModifierType::OutputIterator modifierOutputIterator(_modifier); + // We only accumulated as many items as we can output, so this + // should be correct + for (auto const& modifierOutput : modifierOutputIterator) { + TRI_ASSERT(!_output.isFull()); + bool written = false; + switch (modifierOutput.getType()) { + case ModifierOutput::Type::ReturnIfRequired: + if (_infos._options.returnOld) { + _output.cloneValueInto(_infos._outputOldRegisterId, + modifierOutput.getInputRow(), + modifierOutput.getOldValue()); + written = true; + } + if (_infos._options.returnNew) { + _output.cloneValueInto(_infos._outputNewRegisterId, + modifierOutput.getInputRow(), + modifierOutput.getNewValue()); + written = true; + } + [[fallthrough]]; + case ModifierOutput::Type::CopyRow: + if (!written) { + _output.copyRow(modifierOutput.getInputRow()); + } + _output.advanceRow(); + break; + case ModifierOutput::Type::SkipRow: + // nothing. + break; + } + } + } + auto maxOutputRows() -> std::size_t final { return _output.numRowsLeft(); } + auto needMoreOutput() -> bool final { return !_output.isFull(); } + }; + + auto produceData = ProduceData(output, _infos, *_modifier); + auto&& [state, localStats, upstreamCall] = produceOrSkip(input, produceData); + + _stats += localStats; + + if (state == ExecutionState::WAITING) { + return {state, ModificationStats{}, upstreamCall}; } auto stats = ModificationStats{}; + std::swap(stats, _stats); - TRI_IF_FAILURE("ModificationBlock::getSome") { - THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); - } + return {state, stats, upstreamCall}; +} - // only produce at most output.numRowsLeft() many results - ExecutorState upstreamState = input.upstreamState(); - while (input.hasDataRow() && call.needSkipMore()) { - _modifier.reset(); - size_t toSkip = call.getOffset(); - if (call.getLimit() == 0 && call.hasHardLimit()) { - // We need to produce all modification operations. - // If we are bound by limits or not! - toSkip = ExecutionBlock::SkipAllSize(); +template +[[nodiscard]] auto ModificationExecutor::skipRowsRange( + typename FetcherType::DataRange& input, AqlCall& call) + -> std::tuple { + struct SkipData final : public IProduceOrSkipData { + AqlCall& _call; + ModifierType& _modifier; + + SkipData(AqlCall& call, ModifierType& modifier) + : _call(call), _modifier(modifier) {} + ~SkipData() final = default; + + void doOutput() final { + auto const skipped = std::invoke([&] { + if (_call.needsFullCount()) { + // If we need to do full count the nr of writes we did + // in this batch is always correct. + // If we are in offset phase and need to produce data + // after the toSkip is limited to offset(). + // otherwise we need to report everything we write + return _modifier.nrOfWritesExecuted(); + } else { + // If we do not need to report fullcount. + // we cannot report more than offset + // but also not more than the operations we + // have successfully executed + return (std::min)(_call.getOffset(), _modifier.nrOfWritesExecuted()); + } + }); + _call.didSkip(skipped); } - if constexpr (std::is_same_v) { - auto& range = input.getInputRange(); - if (range.hasDataRow()) { - doCollect(range, toSkip); - } - upstreamState = range.upstreamState(); - if (upstreamState == ExecutorState::DONE) { - // We are done with this input. - // We need to forward it to the last ShadowRow. - input.skipAllRemainingDataRows(); - TRI_ASSERT(input.upstreamState() == ExecutorState::DONE); + auto maxOutputRows() -> std::size_t final { + if (_call.getLimit() == 0 && _call.hasHardLimit()) { + // We need to produce all modification operations. + // If we are bound by limits or not! + return ExecutionBlock::SkipAllSize(); + } else { + return _call.getOffset(); } - } else { - doCollect(input, toSkip); - upstreamState = input.upstreamState(); } + auto needMoreOutput() -> bool final { return _call.needSkipMore(); } + }; - if (_modifier.nrOfOperations() > 0) { - _modifier.transact(_trx); + TRI_ASSERT(call.getSkipCount() == 0); + // "move" the previously saved skip count into the call + call.didSkip(_skipCount); + _skipCount = 0; - if (_infos._doCount) { - stats.addWritesExecuted(_modifier.nrOfWritesExecuted()); - stats.addWritesIgnored(_modifier.nrOfWritesIgnored()); - } + auto skipData = SkipData(call, *_modifier); + auto&& [state, localStats, upstreamCall] = produceOrSkip(input, skipData); + + _stats += localStats; + + if (state == ExecutionState::WAITING) { + // save the skip count until the next call + _skipCount = call.getSkipCount(); + return {state, ModificationStats{}, 0, upstreamCall}; + } + + auto stats = ModificationStats{}; + std::swap(stats, _stats); + + return {state, stats, call.getSkipCount(), upstreamCall}; +} + +template +auto ModificationExecutor::RangeHandler::nextDataRow( + typename FetcherType::DataRange& input) -> arangodb::aql::InputAqlItemRow { + if constexpr (inputIsMatrix) { + // Assume init() was called at least once + TRI_ASSERT(_iterator.isInitialized()); + // Assume hasDataRow() is true (caller has to make sure) + TRI_ASSERT(hasDataRow(input)); - if (call.needsFullCount()) { - // If we need to do full count the nr of writes we did - // in this batch is always correct. - // If we are in offset phase and need to produce data - // after the toSkip is limited to offset(). - // otherwise we need to report everything we write - call.didSkip(_modifier.nrOfWritesExecuted()); + auto ret = _iterator.next(); + + if (!_iterator) { + // We are done with this input. + // We need to forward it to the last ShadowRow. + // RangeHandler::hasDataRow() will continue to return false, even if + // the AqlItemBlockInputMatrix is now already in the next range. + input.skipAllRemainingDataRows(); + } + + return ret; + } else { + return input.nextDataRow(AqlItemBlockInputRange::HasDataRow{}).second; + } +} + +template +auto ModificationExecutor::RangeHandler::hasDataRow( + typename FetcherType::DataRange& input) const noexcept -> bool { + if constexpr (inputIsMatrix) { + // Assume init() was called at least once + TRI_ASSERT(_iterator.isInitialized()); + + return _iterator.hasMore(); + } else { + return input.hasDataRow(); + } +} + +template +bool ModificationExecutor::RangeHandler::init( + typename FetcherType::DataRange& input) { + if constexpr (inputIsMatrix) { + if (!_iterator.isInitialized()) { + if (input.hasValidRow()) { + auto&& [state, matrix] = input.getMatrix(); + TRI_ASSERT(state == ExecutorState::DONE); + _iterator = matrix->begin(); + if (!_iterator) { + input.skipAllRemainingDataRows(); + } + // initialized successfully + return true; } else { - // If we do not need to report fullcount. - // we cannot report more than offset - // but also not more than the operations we - // have successfully executed - call.didSkip((std::min)(call.getOffset(), _modifier.nrOfWritesExecuted())); + // can't initialize yet, no matrix + return false; } + } else { + // already initialized + return true; } + } else { + // no-op + return true; } +} - return {upstreamState, stats, call.getSkipCount(), upstreamCall}; +template +auto ModificationExecutor::RangeHandler::upstreamState( + typename FetcherType::DataRange& input) const noexcept -> ExecutorState { + if constexpr (inputIsMatrix) { + if (ADB_UNLIKELY(!_iterator.isInitialized())) { + // As long as the iterator isn't initialized, we need to pass the upstream + // state. In particular if the input is completely empty, we need to + // return DONE. + return input.upstreamState(); + } else if (hasDataRow(input)) { + return ExecutorState::HASMORE; + } else { + return ExecutorState::DONE; + } + } else { + return input.upstreamState(); + } } using NoPassthroughSingleRowFetcher = SingleRowFetcher; @@ -280,5 +405,4 @@ template class ::arangodb::aql::ModificationExecutor; template class ::arangodb::aql::ModificationExecutor; -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/ModificationExecutor.h b/arangod/Aql/ModificationExecutor.h index 694ec87ee4b3..f07626b43cf8 100644 --- a/arangod/Aql/ModificationExecutor.h +++ b/arangod/Aql/ModificationExecutor.h @@ -28,6 +28,7 @@ #include "Aql/ModificationExecutorInfos.h" #include "Aql/OutputAqlItemRow.h" #include "Aql/Stats.h" +#include "AqlItemMatrix.h" #include "Transaction/Methods.h" #include "Utils/OperationResult.h" @@ -36,13 +37,14 @@ #include #include +#include #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { struct AqlCall; class AqlItemBlockInputRange; +class AqlItemBlockInputMatrix; class InputAqlItemRow; class OutputAqlItemRow; class RegisterInfos; @@ -171,25 +173,60 @@ class ModificationExecutor { ~ModificationExecutor() = default; [[nodiscard]] auto produceRows(typename FetcherType::DataRange& input, OutputAqlItemRow& output) - -> std::tuple; + -> std::tuple; [[nodiscard]] auto skipRowsRange(typename FetcherType::DataRange& inputRange, AqlCall& call) - -> std::tuple; + -> std::tuple; protected: - void doCollect(AqlItemBlockInputRange& input, size_t maxOutputs); - void doOutput(OutputAqlItemRow& output, Stats& stats); - + // Interface of the data structure that is passed by produceRows() and + // skipRowsRange() to produceOrSkip(), which does the actual work. + // The interface isn't technically necessary (as produceOrSkip is a template), + // but I kept it for the overview it provides. + struct IProduceOrSkipData { + virtual ~IProduceOrSkipData() = default; + virtual void doOutput() = 0; + virtual auto maxOutputRows() -> std::size_t = 0; + virtual auto needMoreOutput() -> bool = 0; + }; + + template + std::tuple produceOrSkip(typename FetcherType::DataRange& input, + ProduceOrSkipData& produceOrSkipData); + transaction::Methods _trx; - // The state that was returned on the last call to produceRows. For us - // this is relevant because we might have collected some documents in the - // modifier's accumulator, but not written them yet, because we ran into - // WAITING - ExecutionState _lastState; ModificationExecutorInfos& _infos; - ModifierType _modifier; + std::shared_ptr _modifier; + + // Used to unify input consumption of both AqlItemBlockInputRange and + // AqlItemBlockInputMatrix. + struct RangeHandler { + constexpr static bool inputIsMatrix = + std::is_same_v; + + // init must be called at least once with any input. It's a no-op for + // AqlItemBlockInputRange, and initializes an iterator for a + // AqlItemBlockInputMatrix. + // Returns false if the input isn't yet in a state in which it can be used + // for initialization (i.e. the matrix isn't there yet). + bool init(typename FetcherType::DataRange& input); + + // Returns true iff we may currently call nextDataRow() on this input; + // might return true on the next input. + [[nodiscard]] auto hasDataRow(typename FetcherType::DataRange& input) const noexcept + -> bool; + // Returns the next data row and moves the internal iterator + [[nodiscard]] auto nextDataRow(typename FetcherType::DataRange& input) + -> arangodb::aql::InputAqlItemRow; + + auto upstreamState(typename FetcherType::DataRange& input) const noexcept -> ExecutorState; + + AqlItemMatrix::RowIterator _iterator{}; + } _rangeHandler{}; + + std::size_t _skipCount{}; + Stats _stats{}; }; -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/ModificationExecutorAccumulator.h b/arangod/Aql/ModificationExecutorAccumulator.h index 5a15592397e7..9fa89ae30e67 100644 --- a/arangod/Aql/ModificationExecutorAccumulator.h +++ b/arangod/Aql/ModificationExecutorAccumulator.h @@ -25,12 +25,12 @@ #include "Basics/Common.h" #include "Basics/debugging.h" +#include "Logger/LogMacros.h" -#include +#include #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { // Hack-i-ty-hack // @@ -43,8 +43,10 @@ class ModificationExecutorAccumulator { public: ModificationExecutorAccumulator() { reset(); } - VPackSlice closeAndGetContents() { + [[nodiscard]] VPackSlice closeAndGetContents() { + TRI_ASSERT(_accumulator.isOpenArray()); _accumulator.close(); + TRI_ASSERT(_accumulator.isClosed()); return _accumulator.slice(); } @@ -58,12 +60,13 @@ class ModificationExecutorAccumulator { _accumulator.openArray(); } - size_t nrOfDocuments() const { return _accumulator.slice().length(); } + [[nodiscard]] size_t nrOfDocuments() const { + TRI_ASSERT(_accumulator.isClosed()); + return _accumulator.slice().length(); + } private: - VPackBuilder _accumulator; + VPackBuilder _accumulator{}; }; -} // namespace aql -} // namespace arangodb - +} // namespace arangodb::aql diff --git a/arangod/Aql/ModificationExecutorInfos.cpp b/arangod/Aql/ModificationExecutorInfos.cpp index 5e0f101c5b5a..e2eb6221ed96 100644 --- a/arangod/Aql/ModificationExecutorInfos.cpp +++ b/arangod/Aql/ModificationExecutorInfos.cpp @@ -32,13 +32,15 @@ using namespace arangodb; using namespace arangodb::aql; ModificationExecutorInfos::ModificationExecutorInfos( + ExecutionEngine* engine, RegisterId input1RegisterId, RegisterId input2RegisterId, RegisterId input3RegisterId, RegisterId outputNewRegisterId, RegisterId outputOldRegisterId, RegisterId outputRegisterId, arangodb::aql::QueryContext& query, OperationOptions options, aql::Collection const* aqlCollection, ProducesResults producesResults, ConsultAqlWriteFilter consultAqlWriteFilter, IgnoreErrors ignoreErrors, DoCount doCount, IsReplace isReplace, IgnoreDocumentNotFound ignoreDocumentNotFound) - : _query(query), + : _engine(engine), + _query(query), _options(options), _aqlCollection(aqlCollection), _producesResults(ProducesResults(producesResults._value || !_options.silent)), diff --git a/arangod/Aql/ModificationExecutorInfos.h b/arangod/Aql/ModificationExecutorInfos.h index 8b71d13ee228..e68af138b1c5 100644 --- a/arangod/Aql/ModificationExecutorInfos.h +++ b/arangod/Aql/ModificationExecutorInfos.h @@ -36,6 +36,7 @@ namespace arangodb { namespace aql { +class ExecutionEngine; class QueryContext; struct BoolWrapper { @@ -64,7 +65,8 @@ struct IgnoreDocumentNotFound : BoolWrapper { }; struct ModificationExecutorInfos { - ModificationExecutorInfos(RegisterId input1RegisterId, RegisterId input2RegisterId, + ModificationExecutorInfos(ExecutionEngine* engine, + RegisterId input1RegisterId, RegisterId input2RegisterId, RegisterId input3RegisterId, RegisterId outputNewRegisterId, RegisterId outputOldRegisterId, RegisterId outputRegisterId, arangodb::aql::QueryContext& query, OperationOptions options, @@ -78,7 +80,10 @@ struct ModificationExecutorInfos { ModificationExecutorInfos(ModificationExecutorInfos const&) = delete; ~ModificationExecutorInfos() = default; + ExecutionEngine* engine() const { return _engine; } + /// @brief the variable produced by Return + arangodb::aql::ExecutionEngine* _engine; arangodb::aql::QueryContext& _query; OperationOptions _options; aql::Collection const* _aqlCollection; diff --git a/arangod/Aql/ModificationNodes.cpp b/arangod/Aql/ModificationNodes.cpp index 35831496ec9d..967a31cf6c4d 100644 --- a/arangod/Aql/ModificationNodes.cpp +++ b/arangod/Aql/ModificationNodes.cpp @@ -143,6 +143,7 @@ std::unique_ptr RemoveNode::createBlock( auto registerInfos = createRegisterInfos(std::move(readableInputRegisters), std::move(writableOutputRegisters)); auto executorInfos = ModificationExecutorInfos( + &engine, inDocRegister, RegisterPlan::MaxRegisterId, RegisterPlan::MaxRegisterId, outputNew, outputOld, RegisterPlan::MaxRegisterId /*output*/, _plan->getAst()->query(), std::move(options), collection(), @@ -231,6 +232,7 @@ std::unique_ptr InsertNode::createBlock( auto registerInfos = createRegisterInfos(std::move(readableInputRegisters), std::move(writableOutputRegisters)); ModificationExecutorInfos infos( + &engine, inputRegister, RegisterPlan::MaxRegisterId, RegisterPlan::MaxRegisterId, outputNew, outputOld, RegisterPlan::MaxRegisterId /*output*/, _plan->getAst()->query(), std::move(options), collection(), @@ -350,6 +352,7 @@ std::unique_ptr UpdateNode::createBlock( ModificationExecutorHelpers::convertOptions(_options, _outVariableNew, _outVariableOld); auto executorInfos = ModificationExecutorInfos( + &engine, inDocRegister, inKeyRegister, RegisterPlan::MaxRegisterId, outputNew, outputOld, RegisterPlan::MaxRegisterId /*output*/, _plan->getAst()->query(), std::move(options), collection(), ProducesResults(producesResults()), @@ -433,6 +436,7 @@ std::unique_ptr ReplaceNode::createBlock( ModificationExecutorHelpers::convertOptions(_options, _outVariableNew, _outVariableOld); auto executorInfos = ModificationExecutorInfos( + &engine, inDocRegister, inKeyRegister, RegisterPlan::MaxRegisterId, outputNew, outputOld, RegisterPlan::MaxRegisterId /*output*/, _plan->getAst()->query(), std::move(options), collection(), ProducesResults(producesResults()), @@ -538,6 +542,7 @@ std::unique_ptr UpsertNode::createBlock( ModificationExecutorHelpers::convertOptions(_options, _outVariableNew, _outVariableOld); auto executorInfos = ModificationExecutorInfos( + &engine, inDoc, insert, update, outputNew, outputOld, RegisterPlan::MaxRegisterId /*output*/, _plan->getAst()->query(), std::move(options), collection(), ProducesResults(producesResults()), diff --git a/arangod/Aql/RemoveModifier.cpp b/arangod/Aql/RemoveModifier.cpp index c970284922c0..de43284cbfaf 100644 --- a/arangod/Aql/RemoveModifier.cpp +++ b/arangod/Aql/RemoveModifier.cpp @@ -34,8 +34,6 @@ #include "Transaction/Methods.h" #include "VocBase/LogicalCollection.h" -class CollectionNameResolver; - using namespace arangodb; using namespace arangodb::aql; using namespace arangodb::aql::ModificationExecutorHelpers; @@ -73,6 +71,6 @@ ModifierOperationType RemoveModifierCompletion::accumulate(ModificationExecutorA } } -OperationResult RemoveModifierCompletion::transact(transaction::Methods& trx, VPackSlice const& data) { - return trx.remove(_infos._aqlCollection->name(), data, _infos._options); +futures::Future RemoveModifierCompletion::transact(transaction::Methods& trx, VPackSlice const& data) { + return trx.removeAsync(_infos._aqlCollection->name(), data, _infos._options); } diff --git a/arangod/Aql/RemoveModifier.h b/arangod/Aql/RemoveModifier.h index 8a1030886538..0dea099de2e8 100644 --- a/arangod/Aql/RemoveModifier.h +++ b/arangod/Aql/RemoveModifier.h @@ -26,11 +26,11 @@ #include "Aql/ModificationExecutor.h" #include "Aql/ModificationExecutorAccumulator.h" #include "Aql/ModificationExecutorInfos.h" +#include "Futures/Future.h" #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { struct ModificationExecutorInfos; @@ -41,12 +41,11 @@ class RemoveModifierCompletion { ModifierOperationType accumulate(ModificationExecutorAccumulator& accu, InputAqlItemRow& row); - OperationResult transact(transaction::Methods& trx, VPackSlice const& data); + futures::Future transact(transaction::Methods& trx, VPackSlice const& data); private: ModificationExecutorInfos& _infos; arangodb::velocypack::Builder _keyDocBuilder; }; -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/SimpleModifier.cpp b/arangod/Aql/SimpleModifier.cpp index 362d617840bb..f7af05b8672c 100644 --- a/arangod/Aql/SimpleModifier.cpp +++ b/arangod/Aql/SimpleModifier.cpp @@ -25,12 +25,17 @@ #include "Aql/AqlValue.h" #include "Aql/Collection.h" +#include "Aql/ExecutionEngine.h" #include "Aql/ModificationExecutor.h" #include "Aql/ModificationExecutorHelpers.h" #include "Aql/OutputAqlItemRow.h" +#include "Aql/SharedQueryState.h" #include "Basics/Common.h" -#include "Basics/VelocyPackHelper.h" #include "Basics/StaticStrings.h" +#include "Basics/StringUtils.h" +#include "Basics/VelocyPackHelper.h" +#include "Basics/application-exit.h" +#include "Cluster/ServerState.h" #include "VocBase/LogicalCollection.h" #include @@ -120,25 +125,123 @@ SimpleModifier::OutputIterator::end() const { return it; } +template +void SimpleModifier::checkException() const { + throwOperationResultException(_infos, std::get(_results)); +} + +template +void SimpleModifier::resetResult() noexcept { + std::lock_guard guard(_resultMutex); + _results = NoResult{}; +} + template void SimpleModifier::reset() { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + { + std::unique_lock guard(_resultMutex, std::try_to_lock); + TRI_ASSERT(guard.owns_lock()); + TRI_ASSERT(!std::holds_alternative(_results)); + } +#endif _accumulator.reset(); _operations.clear(); - _results.reset(); + resetResult(); } template -Result SimpleModifier::accumulate(InputAqlItemRow& row) { +void SimpleModifier::accumulate(InputAqlItemRow& row) { auto result = _completion.accumulate(_accumulator, row); _operations.push_back({result, row}); - return Result{}; } template -void SimpleModifier::transact(transaction::Methods& trx) { - _results = _completion.transact(trx, _accumulator.closeAndGetContents()); +ExecutionState SimpleModifier::transact(transaction::Methods& trx) { + std::unique_lock guard(_resultMutex); + if (std::holds_alternative(_results)) { + return ExecutionState::WAITING; + } else if (std::holds_alternative(_results)) { + return ExecutionState::DONE; + } else if (auto* ex = std::get_if(&_results); ex != nullptr) { + std::rethrow_exception(*ex); + } else { + TRI_ASSERT(std::holds_alternative(_results)); + } + + _results = NoResult{}; + + auto result = _completion.transact(trx, _accumulator.closeAndGetContents()); + + if (result.isReady()) { + _results = std::move(result.get()); + return ExecutionState::DONE; + } - throwOperationResultException(_infos, _results); + _results = Waiting{}; + + TRI_ASSERT(!ServerState::instance()->isSingleServer()); + TRI_ASSERT(_infos.engine() != nullptr); + TRI_ASSERT(_infos.engine()->sharedState() != nullptr); + + // The guard has to be unlocked before "thenValue" is called, otherwise locking + // the mutex there will cause a deadlock if the result is already available. + guard.unlock(); + + auto self = this->shared_from_this(); + std::move(result).thenFinal([self, sqs = _infos.engine()->sharedState()](futures::Try&& opRes) { + sqs->executeAndWakeup([&]() noexcept { + std::unique_lock guard(self->_resultMutex); + try { + TRI_ASSERT(std::holds_alternative(self->_results)); + if (std::holds_alternative(self->_results)) { + // get() will throw if opRes holds an exception, which is intended. + self->_results = std::move(opRes.get()); + } else { + // This can never happen. + using namespace std::string_literals; + auto state = + std::visit(overload{[&](NoResult) { return "NoResults"s; }, + [&](Waiting) { return "Waiting"s; }, + [&](OperationResult const&) { + return "Result"s; + }, + [&](std::exception_ptr const& ep) { + auto what = std::string{}; + try { + std::rethrow_exception(ep); + } catch (std::exception const& ex) { + what = ex.what(); + } catch (...) { + // Exception unknown, give up immediately. + LOG_TOPIC("4646a", FATAL, Logger::AQL) << "Caught an exception while handling another one, giving up."; + FATAL_ERROR_ABORT(); + } + return StringUtils::concatT("Exception: ", what); + }}, + self->_results); + auto message = StringUtils::concatT( + "Unexpected state when reporting modification result, expected " + "'Waiting' but got: ", + state); + LOG_TOPIC("1f48d", ERR, Logger::AQL) << message; + if (std::holds_alternative(self->_results)) { + // Avoid overwriting an exception with another exception. + LOG_TOPIC("2d310", FATAL, Logger::AQL) + << "Caught an exception while handling another one, giving up."; + FATAL_ERROR_ABORT(); + } + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL_AQL, std::move(message)); + } + } catch(...) { + auto exptr = std::current_exception(); + self->_results = exptr; + } + return true; + }); + }); + + return ExecutionState::WAITING; } template @@ -153,17 +256,19 @@ size_t SimpleModifier::nrOfDocuments() const { template size_t SimpleModifier::nrOfResults() const { - if (_results.hasSlice() && _results.slice().isArray()) { - return _results.slice().length(); + if (auto* res = std::get_if(&_results); + res != nullptr && res->hasSlice() && res->slice().isArray()) { + return res->slice().length(); + } else { + return 0; } - return 0; } template size_t SimpleModifier::nrOfErrors() const { size_t nrOfErrors{0}; - for (auto const& pair : _results.countErrorCodes) { + for (auto const& pair : std::get(_results).countErrorCodes) { nrOfErrors += pair.second; } return nrOfErrors; @@ -198,12 +303,35 @@ bool SimpleModifier::resultAvailable() const { template VPackArrayIterator SimpleModifier::getResultsIterator() const { if (resultAvailable()) { - TRI_ASSERT(_results.hasSlice() && _results.slice().isArray()); - return VPackArrayIterator{_results.slice()}; + auto const& results = std::get(_results); + TRI_ASSERT(results.hasSlice() && results.slice().isArray()); + return VPackArrayIterator{results.slice()}; } return VPackArrayIterator(VPackArrayIterator::Empty{}); } +template +bool SimpleModifier::hasResultOrException() const noexcept { + return std::visit(overload{ + [](NoResult) { return false; }, + [](Waiting) { return false; }, + [](OperationResult const&) { return true; }, + [](std::exception_ptr const&) { return true; }, + }, + _results); +} + +template +bool SimpleModifier::hasNeitherResultNorOperationPending() const noexcept { + return std::visit(overload{ + [](NoResult) { return true; }, + [](Waiting) { return false; }, + [](OperationResult const&) { return false; }, + [](std::exception_ptr const&) { return false; }, + }, + _results); +} + template class ::arangodb::aql::SimpleModifier; template class ::arangodb::aql::SimpleModifier; template class ::arangodb::aql::SimpleModifier; diff --git a/arangod/Aql/SimpleModifier.h b/arangod/Aql/SimpleModifier.h index d5bc0423d63a..36b6fd8c286b 100644 --- a/arangod/Aql/SimpleModifier.h +++ b/arangod/Aql/SimpleModifier.h @@ -24,6 +24,9 @@ #pragma once #include "Aql/ExecutionBlock.h" +#include "Aql/ExecutionState.h" +#include "Aql/ExecutionEngine.h" + #include "Aql/ModificationExecutorAccumulator.h" #include "Aql/ModificationExecutorInfos.h" @@ -31,10 +34,10 @@ #include "Aql/RemoveModifier.h" #include "Aql/UpdateReplaceModifier.h" +#include #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { struct ModificationExecutorInfos; @@ -71,11 +74,15 @@ struct is_modifier_completion_trait : std::true }; template ::value>> -class SimpleModifier { +class SimpleModifier : public std::enable_shared_from_this> { friend class InsertModifierCompletion; friend class RemoveModifierCompletion; friend class UpdateReplaceModifierCompletion; + struct NoResult {}; + struct Waiting {}; + using ResultType = std::variant; + public: using ModOp = std::pair; @@ -87,9 +94,9 @@ class SimpleModifier { OutputIterator& operator++(); bool operator!=(OutputIterator const& other) const noexcept; - ModifierOutput operator*() const; - OutputIterator begin() const; - OutputIterator end() const; + [[nodiscard]] ModifierOutput operator*() const; + [[nodiscard]] OutputIterator begin() const; + [[nodiscard]] OutputIterator end() const; private: OutputIterator& next(); @@ -103,37 +110,45 @@ class SimpleModifier { explicit SimpleModifier(ModificationExecutorInfos& infos) : _infos(infos), _completion(infos), - _results(Result(), infos._options), - _resultsIterator(VPackArrayIterator::Empty{}), - _batchSize(ExecutionBlock::DefaultBatchSize) {} - ~SimpleModifier() = default; + _batchSize(ExecutionBlock::DefaultBatchSize), + _results(NoResult{}) { + TRI_ASSERT(_infos.engine() != nullptr); + } + + virtual ~SimpleModifier() = default; void reset(); - Result accumulate(InputAqlItemRow& row); - void transact(transaction::Methods& trx); + void accumulate(InputAqlItemRow& row); + [[nodiscard]] ExecutionState transact(transaction::Methods& trx); + + void checkException() const; + void resetResult() noexcept; // The two numbers below may not be the same: Operations // can skip or ignore documents. // The number of operations - size_t nrOfOperations() const; + [[nodiscard]] size_t nrOfOperations() const; // The number of documents in the accumulator - size_t nrOfDocuments() const; + [[nodiscard]] size_t nrOfDocuments() const; // The number of entries in the results slice - size_t nrOfResults() const; + [[nodiscard]] [[maybe_unused]] size_t nrOfResults() const; + + [[nodiscard]] size_t nrOfErrors() const; - size_t nrOfErrors() const; + [[nodiscard]] size_t nrOfWritesExecuted() const; + [[nodiscard]] size_t nrOfWritesIgnored() const; - size_t nrOfWritesExecuted() const; - size_t nrOfWritesIgnored() const; + [[nodiscard]] ModificationExecutorInfos& getInfos() const noexcept; + [[nodiscard]] size_t getBatchSize() const noexcept; - ModificationExecutorInfos& getInfos() const noexcept; - size_t getBatchSize() const noexcept; + bool hasResultOrException() const noexcept; + bool hasNeitherResultNorOperationPending() const noexcept; private: - bool resultAvailable() const; - VPackArrayIterator getResultsIterator() const; + [[nodiscard]] bool resultAvailable() const; + [[nodiscard]] VPackArrayIterator getResultsIterator() const; ModificationExecutorInfos& _infos; ModifierCompletion _completion; @@ -141,18 +156,15 @@ class SimpleModifier { std::vector _operations; ModificationExecutorAccumulator _accumulator; - OperationResult _results; - - std::vector::const_iterator _operationsIterator; - VPackArrayIterator _resultsIterator; - size_t const _batchSize; + + /// @brief mutex that protects _results + mutable std::mutex _resultMutex; + ResultType _results; }; using InsertModifier = SimpleModifier; using RemoveModifier = SimpleModifier; using UpdateReplaceModifier = SimpleModifier; -} // namespace aql -} // namespace arangodb - +} // namespace arangodb::aql diff --git a/arangod/Aql/SingleRemoteModificationExecutor.h b/arangod/Aql/SingleRemoteModificationExecutor.h index 29e115652dd2..a5d5975fbc8b 100644 --- a/arangod/Aql/SingleRemoteModificationExecutor.h +++ b/arangod/Aql/SingleRemoteModificationExecutor.h @@ -34,9 +34,11 @@ namespace arangodb { namespace aql { +class ExecutionEngine; struct SingleRemoteModificationInfos : ModificationExecutorInfos { - SingleRemoteModificationInfos(RegisterId inputRegister, RegisterId outputNewRegisterId, + SingleRemoteModificationInfos(ExecutionEngine* engine, + RegisterId inputRegister, RegisterId outputNewRegisterId, RegisterId outputOldRegisterId, RegisterId outputRegisterId, arangodb::aql::QueryContext& query, OperationOptions options, aql::Collection const* aqlCollection, @@ -44,7 +46,7 @@ struct SingleRemoteModificationInfos : ModificationExecutorInfos { IgnoreErrors ignoreErrors, IgnoreDocumentNotFound ignoreDocumentNotFound, std::string key, bool hasParent, bool replaceIndex) - : ModificationExecutorInfos(inputRegister, RegisterPlan::MaxRegisterId, + : ModificationExecutorInfos(engine, inputRegister, RegisterPlan::MaxRegisterId, RegisterPlan::MaxRegisterId, outputNewRegisterId, outputOldRegisterId, outputRegisterId, query, std::move(options), aqlCollection, diff --git a/arangod/Aql/UpdateReplaceModifier.cpp b/arangod/Aql/UpdateReplaceModifier.cpp index b7a50aef92b5..7f16b9548bcf 100644 --- a/arangod/Aql/UpdateReplaceModifier.cpp +++ b/arangod/Aql/UpdateReplaceModifier.cpp @@ -107,10 +107,10 @@ ModifierOperationType UpdateReplaceModifierCompletion::accumulate( } } -OperationResult UpdateReplaceModifierCompletion::transact(transaction::Methods& trx, VPackSlice const data) { +futures::Future UpdateReplaceModifierCompletion::transact(transaction::Methods& trx, VPackSlice const data) { if (_infos._isReplace) { - return trx.replace(_infos._aqlCollection->name(), data, _infos._options); + return trx.replaceAsync(_infos._aqlCollection->name(), data, _infos._options); } else { - return trx.update(_infos._aqlCollection->name(), data, _infos._options); + return trx.updateAsync(_infos._aqlCollection->name(), data, _infos._options); } } diff --git a/arangod/Aql/UpdateReplaceModifier.h b/arangod/Aql/UpdateReplaceModifier.h index b3d9308597f6..56900ca76002 100644 --- a/arangod/Aql/UpdateReplaceModifier.h +++ b/arangod/Aql/UpdateReplaceModifier.h @@ -26,11 +26,11 @@ #include "Aql/ModificationExecutor.h" #include "Aql/ModificationExecutorAccumulator.h" #include "Aql/ModificationExecutorInfos.h" +#include "Futures/Future.h" #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { struct ModificationExecutorInfos; @@ -42,7 +42,7 @@ class UpdateReplaceModifierCompletion { ModifierOperationType accumulate(ModificationExecutorAccumulator& accu, InputAqlItemRow& row); - OperationResult transact(transaction::Methods& trx, VPackSlice const data); + futures::Future transact(transaction::Methods& trx, VPackSlice data); private: ModificationExecutorInfos& _infos; @@ -50,5 +50,4 @@ class UpdateReplaceModifierCompletion { arangodb::velocypack::Builder _keyDocBuilder; }; -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/arangod/Aql/UpsertModifier.cpp b/arangod/Aql/UpsertModifier.cpp index 534cd8d69546..025e7c6e0888 100644 --- a/arangod/Aql/UpsertModifier.cpp +++ b/arangod/Aql/UpsertModifier.cpp @@ -33,6 +33,7 @@ #include "Basics/Common.h" #include "Basics/StaticStrings.h" #include "Basics/VelocyPackHelper.h" +#include "Basics/application-exit.h" #include "Transaction/Methods.h" #include "VocBase/LogicalCollection.h" @@ -41,8 +42,6 @@ #include "Logger/LogMacros.h" -class CollectionNameResolver; - using namespace arangodb; using namespace arangodb::aql; using namespace arangodb::aql::ModificationExecutorHelpers; @@ -130,13 +129,33 @@ typename UpsertModifier::OutputIterator UpsertModifier::OutputIterator::end() co return it; } +ModificationExecutorResultState UpsertModifier::resultState() const noexcept { + std::lock_guard guard(_resultStateMutex); + return _resultState; +} + void UpsertModifier::reset() { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + { + std::unique_lock guard(_resultStateMutex, std::try_to_lock); + TRI_ASSERT(guard.owns_lock()); + TRI_ASSERT(_resultState != ModificationExecutorResultState::WaitingForResult); + } +#endif + _insertAccumulator.reset(); _insertResults.reset(); _updateAccumulator.reset(); _updateResults.reset(); _operations.clear(); + + resetResult(); +} + +void UpsertModifier::resetResult() noexcept { + std::lock_guard guard(_resultStateMutex); + _resultState = ModificationExecutorResultState::NoResult; } UpsertModifier::OperationType UpsertModifier::updateReplaceCase( @@ -214,7 +233,7 @@ VPackArrayIterator UpsertModifier::getInsertResultsIterator() const { return VPackArrayIterator(VPackArrayIterator::Empty{}); } -Result UpsertModifier::accumulate(InputAqlItemRow& row) { +void UpsertModifier::accumulate(InputAqlItemRow& row) { RegisterId const inDocReg = _infos._input1RegisterId; RegisterId const insertReg = _infos._input2RegisterId; RegisterId const updateReg = _infos._input3RegisterId; @@ -233,11 +252,29 @@ Result UpsertModifier::accumulate(InputAqlItemRow& row) { auto insertDoc = row.getValue(insertReg); result = insertCase(_insertAccumulator, insertDoc); } - _operations.push_back({result, row}); - return Result{}; + _operations.emplace_back(result, row); } -Result UpsertModifier::transact(transaction::Methods& trx) { +ExecutionState UpsertModifier::transact(transaction::Methods& trx) { + std::lock_guard guard(_resultStateMutex); + + switch (_resultState) { + case ModificationExecutorResultState::WaitingForResult: + // WAITING is not yet implemented for UpsertModifier, we shouldn't get here + TRI_ASSERT(false); + return ExecutionState::WAITING; + case ModificationExecutorResultState::HaveResult: + // WAITING is not yet implemented for UpsertModifier, we shouldn't get here + TRI_ASSERT(false); + return ExecutionState::DONE; + case ModificationExecutorResultState::NoResult: + // continue + break; + } + + TRI_ASSERT(_resultState == ModificationExecutorResultState::NoResult); + _resultState = ModificationExecutorResultState::NoResult; + auto toInsert = _insertAccumulator.closeAndGetContents(); if (toInsert.isArray() && toInsert.length() > 0) { _insertResults = @@ -256,8 +293,10 @@ Result UpsertModifier::transact(transaction::Methods& trx) { } throwOperationResultException(_infos, _updateResults); } + + _resultState = ModificationExecutorResultState::HaveResult; - return Result{}; + return ExecutionState::DONE; } size_t UpsertModifier::nrOfDocuments() const { @@ -297,3 +336,11 @@ size_t UpsertModifier::nrOfWritesExecuted() const { size_t UpsertModifier::nrOfWritesIgnored() const { return nrOfErrors(); } size_t UpsertModifier::getBatchSize() const { return _batchSize; } + +bool UpsertModifier::hasResultOrException() const noexcept { + return resultState() == ModificationExecutorResultState::HaveResult; +} + +bool UpsertModifier::hasNeitherResultNorOperationPending() const noexcept { + return resultState() == ModificationExecutorResultState::NoResult; +} diff --git a/arangod/Aql/UpsertModifier.h b/arangod/Aql/UpsertModifier.h index c56a1d157b23..020d1560a78c 100644 --- a/arangod/Aql/UpsertModifier.h +++ b/arangod/Aql/UpsertModifier.h @@ -30,13 +30,32 @@ #include "Aql/InsertModifier.h" #include "Aql/UpdateReplaceModifier.h" +#include + #include -namespace arangodb { -namespace aql { +namespace arangodb::aql { struct ModificationExecutorInfos; +// TODO Remove this state, and use a variant as in SimpleModifier. +// It makes most sense to do this when implementing async upsert operations, +// so I'm leaving it for now. +enum class ModificationExecutorResultState { + // State that is used when the Executor's modifier has not been + // asked to produce a result. + // this is also the initial state. + NoResult, + // State that is used when the Executor's modifier has been asked + // to produce a result, but it returned a WAITING status, i.e. the + // result is not yet ready to consume. + // This state cannot happen in single servers! + WaitingForResult, + // State that is used when the Executor's modifier has produced + // a result that is ready to consume. + HaveResult, + }; + class UpsertModifier { public: enum class OperationType { @@ -59,11 +78,11 @@ class UpsertModifier { OutputIterator& operator++(); bool operator!=(OutputIterator const& other) const noexcept; ModifierOutput operator*() const; - OutputIterator begin() const; - OutputIterator end() const; + [[nodiscard]] OutputIterator begin() const; + [[nodiscard]] OutputIterator end() const; private: - OutputIterator& next(); + [[nodiscard]] OutputIterator& next(); UpsertModifier const& _modifier; std::vector::const_iterator _operationsIterator; @@ -81,24 +100,32 @@ class UpsertModifier { // writes. // This behaviour could be improved, if we can prove that an UPSERT // does not need to see its own writes - _batchSize(1) {} + _batchSize(1), + _resultState(ModificationExecutorResultState::NoResult) {} ~UpsertModifier() = default; + ModificationExecutorResultState resultState() const noexcept; + + void checkException() const {} + void resetResult() noexcept; void reset(); - Result accumulate(InputAqlItemRow& row); - Result transact(transaction::Methods& trx); + void accumulate(InputAqlItemRow& row); + ExecutionState transact(transaction::Methods& trx); size_t nrOfOperations() const; size_t nrOfDocuments() const; - size_t nrOfResults() const; + [[maybe_unused]] size_t nrOfResults() const; size_t nrOfErrors() const; size_t nrOfWritesExecuted() const; size_t nrOfWritesIgnored() const; size_t getBatchSize() const; + bool hasResultOrException() const noexcept; + bool hasNeitherResultNorOperationPending() const noexcept; + private: bool resultAvailable() const; VPackArrayIterator getUpdateResultsIterator() const; @@ -119,7 +146,9 @@ class UpsertModifier { arangodb::velocypack::Builder _keyDocBuilder; size_t const _batchSize; + + mutable std::mutex _resultStateMutex; + ModificationExecutorResultState _resultState; }; -} // namespace aql -} // namespace arangodb +} // namespace arangodb::aql diff --git a/tests/Aql/AqlSharedExecutionBlockImplTest.cpp b/tests/Aql/AqlSharedExecutionBlockImplTest.cpp index 58a4b593d6da..1bd67ff20b66 100644 --- a/tests/Aql/AqlSharedExecutionBlockImplTest.cpp +++ b/tests/Aql/AqlSharedExecutionBlockImplTest.cpp @@ -260,7 +260,8 @@ class AqlSharedExecutionBlockImplTest : public ::testing::Test { auto col = collections.get(collectionName); TRI_ASSERT(col != nullptr); // failed to add collection OperationOptions opts{}; - ModificationExecutorInfos execInfos{0, + ModificationExecutorInfos execInfos{fakedQuery->rootEngine(), + 0, RegisterPlan::MaxRegisterId, RegisterPlan::MaxRegisterId, 0, diff --git a/tests/js/server/aql/aql-modify-subqueries.js b/tests/js/server/aql/aql-modify-subqueries.js index 4ef0ebad6e73..3d9e5cf0a3a8 100644 --- a/tests/js/server/aql/aql-modify-subqueries.js +++ b/tests/js/server/aql/aql-modify-subqueries.js @@ -1623,7 +1623,6 @@ function ahuacatlModifySuite () { c2.truncate({ compact: false }); } }, - }; } @@ -1746,6 +1745,14 @@ function ahuacatlModifySkipSuite () { assertEqual(1000, db[cn].count()); }, + + testSubqueryFullCount : function () { + const query = "LET sub = NOOPT(FOR doc IN " + cn + " REMOVE doc IN " + cn + " RETURN OLD) COLLECT WITH COUNT INTO l RETURN l"; + let result = AQL_EXECUTE(query, null, { optimizer: { rules: ["-all"] } }); + assertEqual(1, result.json.length); + assertEqual(1, result.json[0]); + }, + }; } @@ -1898,7 +1905,7 @@ function ahuacatlGeneratedSuite() { assertEqual(10, res.toArray().length); } }; -}; +} jsunity.run(ahuacatlModifySuite); jsunity.run(ahuacatlModifySkipSuite); diff --git a/tests/js/server/aql/aql-optimizer-rule-move-calculations-down.js b/tests/js/server/aql/aql-optimizer-rule-move-calculations-down.js index c3331f5aa3ee..70d3c56776f5 100644 --- a/tests/js/server/aql/aql-optimizer-rule-move-calculations-down.js +++ b/tests/js/server/aql/aql-optimizer-rule-move-calculations-down.js @@ -664,10 +664,15 @@ function optimizerRuleTestSuite () { expected.push("test" + i + "-" + i); } - var query = - "FOR i IN 0..99 LET result = (UPDATE {_key: CONCAT('test', TO_STRING(i))} WITH {updated: true} IN " + - cn + - " RETURN CONCAT(NEW._key, '-', NEW.value)) LIMIT 10 RETURN result[0]"; + var query = ` + FOR i IN 0..99 + LET result = ( + UPDATE {_key: CONCAT('test', TO_STRING(i))} WITH {updated: true} IN ${cn} + RETURN CONCAT(NEW._key, '-', NEW.value) + ) + LIMIT 10 + RETURN result[0] + `; var planDisabled = AQL_EXPLAIN(query, {}, paramDisabled); var planEnabled = AQL_EXPLAIN(query, {}, paramEnabled);