8000 [3.8] make AQL modification operations async by jsteemann · Pull Request #14525 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[3.8] make AQL modification operations async #14525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e8a24df
make AQL modification operations async
jsteemann Jul 19, 2021
de80f05
push current state, with all debugging output etc.
jsteemann Jul 21, 2021
85b7e73
Fixed last red test, by removing a subtle difference in execution of …
mchacki Jul 21, 2021
57f759f
clean up a bit
jsteemann Jul 22, 2021
eb2d2c0
Fixed counting of skipped rows
goedderz Jul 22, 2021
9d1fac1
Removed unused member variable
goedderz Jul 22, 2021
9da31dd
fix compilation
jsteemann Jul 22, 2021
3cf4de4
debugging
jsteemann Jul 23, 2021
536e03b
fix attempt, not complete
jsteemann Jul 23, 2021
80a930f
added assertions
jsteemann Jul 23, 2021
8d8beea
Fix some problems, still WIP
goedderz Jul 27, 2021
8ddca41
Some more changes
goedderz Jul 27, 2021
4582aa0
WIP: Some minor changes and preparations for ModificationExecutor ref…
goedderz Aug 4, 2021
ea6e0d9
[WIP] First refactoring step complete; works on single-server only
goedderz Aug 5, 2021
8fe9b2d
[WIP] Second step of refactoring; still only works on single-server
goedderz Aug 5, 2021
495e01b
Minor change
goedderz Aug 5, 2021
b33a22b
Merge branch '3.8' of github.com:arangodb/arangodb into feature-3.8/a…
goedderz Aug 5, 2021
8a1834d
Implemented waiting and state variables, should now work in cluster a…
goedderz Aug 5, 2021
fbe539c
Minor cleanup
goedderz Aug 5, 2021
e2c867f
...emphasis on "should". Bugfixes.
goedderz Aug 5, 2021
c70ba98
Save the execution stack in ExecutionBlockImpl
goedderz Aug 5, 2021
611c517
Added a TODO note
goedderz Aug 5, 2021
3426ccc
Changed the ModificationExecutor to read as much input as possible be…
goedderz Aug 9, 2021
9f20fa5
Merge branch '3.8' of github.com:arangodb/arangodb into feature-3.8/a…
goedderz Aug 9, 2021
ecba60e
Declared operators to make gcc happy
goedderz Aug 10, 2021
6c86555
Update arangod/Aql/AqlItemMatrix.h
goedderz Aug 10, 2021
96a6fc1
Update arangod/Aql/DependencyProxy.cpp
goedderz Aug 10, 2021
0861e6c
Re-added assertions
goedderz Aug 10, 2021
6dd4234
Addressed remaining TODOs noticed during review
goedderz Aug 10, 2021
7f89502
Fix a deadlock
goedderz Aug 11, 2021
326356c
WIP Use thenFinal and assert on exception
goedderz Aug 12, 2021
d246ce9
Handled async exceptions in SimpleModifier
goedderz Aug 12, 2021
e41751a
Added a TODO note
goedderz Aug 12, 2021
e728b22
Added CHANGELOG entry
goedderz Aug 16, 2021
6701007
Merge branch '3.8' into feature-3.8/async-aql-modification-operations
KVS85 Aug 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
v3.8.1 (XXXX-XX-XX)
-------------------

* Make AQL modification operations in a cluster asynchronous. This allows to
free the thread for other work until both the write and synchronous
replication are complete.

* When creating Pregel memory-mapped files, create them with O_TMPFILE attribute
on Linux so that files are guaranteed to vanish even if a process dies.

Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/AqlCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ struct AqlCall {
TRI_ASSERT(n <= i);
i -= n;
},
[](auto) {},
[](Infinity) {},
};
std::visit(minus, softLimit);
std::visit(minus, hardLimit);
Expand Down
3 changes: 2 additions & 1 deletion arangod/Aql/AqlExecuteResult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include <map>
#include <string_view>
#include <utility>

using namespace arangodb;
using namespace arangodb::aql;
Expand All @@ -49,7 +50,7 @@ auto getStringView(velocypack::Slice slice) -> std::string_view {

AqlExecuteResult::AqlExecuteResult(ExecutionState state, SkipResult skipped,
SharedAqlItemBlockPtr&& block)
: _state(state), _skipped(skipped), _block(std::move(block)) {
: _state(state), _skipped(std::move(skipped)), _block(std::move(block)) {
// Make sure we only produce a valid response
// The block should have checked as well.
// We must return skipped and/or data when reporting HASMORE
Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/AqlItemBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ size_t AqlItemBlock::maxModifiedEntries() const noexcept { return _numRegisters

size_t AqlItemBlock::capacity() const noexcept { return _data.capacity(); }

bool AqlItemBlock::isShadowRow(size_t row) const {
bool AqlItemBlock::isShadowRow(size_t row) const noexcept {
return _shadowRows.is(row);
}

Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/AqlItemBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class AqlItemBlock {

/// @brief test if the given row is a shadow row and conveys subquery
/// information only. It should not be handed to any non-subquery executor.
bool isShadowRow(size_t row) const;
bool isShadowRow(size_t row) const noexcept;

/// @brief get the ShadowRowDepth
/// Does only work if this row is a shadow row
Expand Down
9 changes: 4 additions & 5 deletions arangod/Aql/AqlItemBlockInputMatrix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ AqlItemBlockInputMatrix::AqlItemBlockInputMatrix(ExecutorState state)
}

// only used for block passthrough
AqlItemBlockInputMatrix::AqlItemBlockInputMatrix(arangodb::aql::SharedAqlItemBlockPtr const& block)
: _block{block}, _aqlItemMatrix{nullptr} {
AqlItemBlockInputMatrix::AqlItemBlockInputMatrix(arangodb::aql::SharedAqlItemBlockPtr block)
: _block{std::move(block)}, _aqlItemMatrix{nullptr} {
TRI_ASSERT(_aqlItemMatrix == nullptr);
TRI_ASSERT(!hasDataRow());
}
Expand All @@ -64,7 +64,7 @@ AqlItemBlockInputRange& AqlItemBlockInputMatrix::getInputRange() {
if (_lastRange.hasDataRow()) {
return _lastRange;
}
// Need initialze lastRange
// Need initialize lastRange
if (_aqlItemMatrix->numberOfBlocks() == 0) {
_lastRange = {AqlItemBlockInputRange{upstreamState()}};
} else {
Expand All @@ -83,7 +83,6 @@ SharedAqlItemBlockPtr AqlItemBlockInputMatrix::getBlock() const noexcept {
std::pair<ExecutorState, AqlItemMatrix const*> AqlItemBlockInputMatrix::getMatrix() noexcept {
TRI_ASSERT(_aqlItemMatrix != nullptr);
TRI_ASSERT(_block == nullptr);
TRI_ASSERT(!_shadowRow.isInitialized());

// We are always done. This InputMatrix
// guarantees that we have all data in our hand at once.
Expand All @@ -109,7 +108,7 @@ bool AqlItemBlockInputMatrix::hasValidRow() const noexcept {

bool AqlItemBlockInputMatrix::hasDataRow() const noexcept {
return _aqlItemMatrix != nullptr && !hasShadowRow() &&
((_aqlItemMatrix->stoppedOnShadowRow()) ||
(_aqlItemMatrix->stoppedOnShadowRow() ||
(_aqlItemMatrix->size() > 0 && _finalState == ExecutorState::DONE));
}

Expand Down
8 changes: 2 additions & 6 deletions arangod/Aql/AqlItemBlockInputMatrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////

#ifndef ARANGOD_AQL_AQLITEMBLOCKMATRIXITERATOR_H
#define ARANGOD_AQL_AQLITEMBLOCKMATRIXITERATOR_H
#pragma once

#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/AqlItemMatrix.h"
Expand All @@ -38,7 +37,7 @@ class AqlItemBlockInputMatrix {
public:
explicit AqlItemBlockInputMatrix(ExecutorState state);

AqlItemBlockInputMatrix(arangodb::aql::SharedAqlItemBlockPtr const&);
explicit AqlItemBlockInputMatrix(arangodb::aql::SharedAqlItemBlockPtr);

AqlItemBlockInputMatrix(ExecutorState state, AqlItemMatrix* aqlItemMatrix);

Expand All @@ -63,7 +62,6 @@ class AqlItemBlockInputMatrix {

size_t skipAllShadowRowsOfDepth(size_t depth);


// Will return HASMORE if we were able to increase the row index.
// Otherwise will return DONE.
ExecutorState incrBlockIndex();
Expand Down Expand Up @@ -101,5 +99,3 @@ class AqlItemBlockInputMatrix {
};

} // namespace arangodb::aql

#endif // ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H
91 changes: 91 additions & 0 deletions arangod/Aql/AqlItemMatrix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,94 @@ AqlItemMatrix::AqlItemMatrix(RegisterCount nrRegs)
clear();
return {skipped, ShadowAqlItemRow{CreateInvalidShadowRowHint()}};
}

AqlItemMatrix::RowIterator AqlItemMatrix::begin() const {
if (size() > 0) {
return {this, 0, _startIndexInFirstBlock};
} else {
return end();
}
}

AqlItemMatrix::RowIterator AqlItemMatrix::end() const {
return {this, this->numberOfBlocks(), 0};
}

AqlItemMatrix::RowIterator::RowIterator(AqlItemMatrix const* matrix, size_t blockIndex, size_t rowIndex)
: _matrix(matrix), _blockIndex(blockIndex), _rowIndex(rowIndex) {}

AqlItemMatrix::RowIterator::value_type AqlItemMatrix::RowIterator::next() noexcept {
auto& it = *this;
auto ret = *it;
++it;
return ret;
}

auto AqlItemMatrix::RowIterator::isInitialized() const noexcept -> bool {
return _matrix != nullptr;
}

auto AqlItemMatrix::RowIterator::hasMore() const noexcept -> bool {
// _blockIndex == _matrix->size() => _rowIndex == 0
TRI_ASSERT((_matrix != nullptr && _blockIndex < _matrix->numberOfBlocks()) || _rowIndex == 0);
// If _blockIndex is valid, _rowIndex must be, too.
return ADB_LIKELY(_matrix != nullptr) && _blockIndex < _matrix->numberOfBlocks();
}

AqlItemMatrix::RowIterator::value_type AqlItemMatrix::RowIterator::operator*() const noexcept {
return {_matrix->getBlock(_blockIndex).first, _rowIndex};
}

AqlItemMatrix::RowIterator& AqlItemMatrix::RowIterator::operator++() noexcept {
// Assume ++ is only called on a valid and dereferenceable iterator
TRI_ASSERT(_matrix != nullptr);
TRI_ASSERT(_blockIndex < _matrix->numberOfBlocks());
auto const* block = _matrix->getBlockRef(_blockIndex).first;
TRI_ASSERT(_rowIndex < block->numRows());
TRI_ASSERT(!block->isShadowRow(_rowIndex));

// Increase the row index
++_rowIndex;
if (_rowIndex >= block->numRows()) {
// If the row index is invalid, move to the next block.
// If the block index is now invalid, this is equal to the "end()"
// iterator.
++_blockIndex;
_rowIndex = 0;
}

if (_blockIndex < _matrix->numberOfBlocks()) {
block = _matrix->getBlockRef(_blockIndex).first;
if (block->isShadowRow(_rowIndex)) {
// If we're at a shadow row, this must be the last block.
TRI_ASSERT(_blockIndex + 1 == _matrix->numberOfBlocks());
// This makes this equal to the "end()" iterator.
++_blockIndex;
_rowIndex = 0;
}
}

return *this;
}

auto AqlItemMatrix::RowIterator::operator++(int) & noexcept -> AqlItemMatrix::RowIterator {
auto tmp = *this;
++(*this);
return tmp;
}

AqlItemMatrix::RowIterator::operator bool() const noexcept {
return hasMore();
}

bool aql::operator==(AqlItemMatrix::RowIterator const& a,
AqlItemMatrix::RowIterator const& b) {
return ADB_LIKELY(a._matrix == b._matrix) &&
(ADB_UNLIKELY(a._matrix == nullptr /* => b._matrix == nullptr */) ||
(ADB_LIKELY(a._blockIndex == b._blockIndex) && a._rowIndex == b._rowIndex));
}

bool aql::operator!=(AqlItemMatrix::RowIterator const& a,
AqlItemMatrix::RowIterator const& b) {
return !(a == b);
}
55 changes: 48 additions & 7 deletions arangod/Aql/AqlItemMatrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////

#ifndef ARANGOD_AQL_AQL_ITEM_MATRIX_H
#define ARANGOD_AQL_AQL_ITEM_MATRIX_H 1
#pragma once

#include "Aql/ShadowAqlItemRow.h"

#include <cstdint>
#include <utility>
#include <vector>

namespace arangodb {
namespace aql {
namespace arangodb::aql {

class InputAqlItemRow;
class SharedAqlItemBlockPtr;
Expand Down Expand Up @@ -120,6 +118,49 @@ class AqlItemMatrix {
[[nodiscard]] auto skipAllShadowRowsOfDepth(size_t depth)
-> std::tuple<size_t, ShadowAqlItemRow>;

class RowIterator {
public:
using value_type = InputAqlItemRow;

RowIterator() = default;
RowIterator(AqlItemMatrix const* matrix, size_t blockIndex, size_t rowIndex);

// Returns the current value, and move the iterator to the next value
value_type next() noexcept;

auto isInitialized() const noexcept -> bool;

// Returns whether the current value is valid, i.e. whether next() may be
// called
auto hasMore() const noexcept -> bool;

value_type operator*() const noexcept;

// This can't be implemented, as we can only create the InputAqlItemRow
// on-the-fly.
// pointer operator->();

// Prefix increment
RowIterator& operator++() noexcept;

// Postfix increment.
auto operator++(int) & noexcept -> RowIterator;

explicit operator bool() const noexcept;

friend bool operator==(RowIterator const& a, RowIterator const& b);
friend bool operator!=(RowIterator const& a, RowIterator const& b);

private:
AqlItemMatrix const* _matrix{};
std::size_t _blockIndex{};
// Invariant: _rowIndex is valid iff _blockIndex is valid.
std::size_t _rowIndex{};
};

[[nodiscard]] RowIterator begin() const;
[[nodiscard]] RowIterator end() const;

private:
std::vector<SharedAqlItemBlockPtr> _blocks;

Expand All @@ -130,7 +171,7 @@ class AqlItemMatrix {
size_t _stopIndexInLastBlock;
};

} // namespace aql
} // namespace arangodb
bool operator==(AqlItemMatrix::RowIterator const& a, AqlItemMatrix::RowIterator const& b);
bool operator!=(AqlItemMatrix::RowIterator const& a, AqlItemMatrix::RowIterator const& b);

#endif
} // namespace arangodb::aql
1 change: 1 addition & 0 deletions arangod/Aql/ClusterNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ std::unique_ptr<ExecutionBlock> SingleRemoteOperationNode::createBlock(
std::move(writableOutputRegisters));

auto executorInfos = SingleRemoteModificationInfos(
&engine,
in, outputNew, outputOld, out, _plan->getAst()->query(), std::move(options),
collection(), ConsultAqlWriteFilter(_options.consultAqlWriteFilter),
IgnoreErrors(_options.ignoreErrors),
Expand Down
Loading
0