8000 Make AQL modification operations asynchronous by goedderz · Pull Request #21068 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Make AQL modification operations asynchronous #21068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c787f51
Try to avoid synchronous waiting in AQL, and fix transaction lifetime…
goedderz Jun 6, 2024
a610077
Make the Query's resource monitor a shared_ptr
goedderz Jun 11, 2024
e16a0b4
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 11, 2024
2bd1475
Revert "Try to avoid synchronous waiting in AQL, and fix transaction …
goedderz Jun 11, 2024
6537d4b
Merge branch 'feature/make-query-resource-monitor-shared' into featur…
goedderz Jun 11, 2024
ee1b40d
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 13, 2024
effb1b8
Included review comments
goedderz Jun 13, 2024
b3ca915
Merge branch 'feature/make-query-resource-monitor-shared' of github.c…
goedderz Jun 17, 2024
5096702
Shorten BuilderLeaser lifetimes, and copy vocbase name
goedderz Jun 17, 2024
843f341
Added a maintainer-mode assertion
goedderz Jun 17, 2024
39522be
Serialize DBServer modification operations
goedderz Jun 17, 2024
193dc63
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 17, 2024
413fabc
Fixed gtests and CE compilation
goedderz Jun 17, 2024
68556fd
Fixed gtests
goedderz Jun 17, 2024
261fba2
Clean references to SharedAqlItemBlockPtr early enough
goedderz Jun 17, 2024
807fcb6
Fixed CE compilation
goedderz Jun 17, 2024
b821948
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 17, 2024
5543ebc
Merge branch 'feature/make-query-resource-monitor-shared' of github.c…
goedderz Jun 17, 2024
fb5beea
Make assertion concurrency-safe
goedderz Jun 17, 2024
afedb52
Unify handling of BuilderLeasers in processors
goedderz Jun 17, 2024
c4e42e5
Make internal gather nodes non-parallel in all cases
goedderz Jun 19, 2024
1b2d37d
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 19, 2024
ec9f11b
Update arangod/Transaction/Methods.cpp
goedderz Jun 20, 2024
d999326
Update arangod/Transaction/Methods.cpp
goedderz Jun 20, 2024
8ee9cc7
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 20, 2024
d8f6c20
Added CHANGELOG entry
goedderz Jun 20, 2024
d9bd683
Merge branch 'feature/make-aql-modifications-async' of github.com:ara…
goedderz Jun 20, 2024
d515b99
Avoid SIGSEGV when a database gets deleted mid-transaction
goedderz Jun 20, 2024
af69ed3
Merge branch 'devel' of github.com:arangodb/arangodb into feature/mak…
goedderz Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
devel
-----

* Document modification operations (insert, update, etc) during an AQL query on
DBServers no longer block threads while waiting for replication.

* Fixed a bug where accessing a collection right after creation can sometimes
fail.

Expand Down
16 changes: 9 additions & 7 deletions arangod/Aql/AqlItemBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ class AqlItemBlock {
AqlItemBlock(AqlItemBlock const&) = delete;
AqlItemBlock& operator=(AqlItemBlock const&) = delete;

/// @brief create the block
AqlItemBlock(AqlItemBlockManager&, size_t numRows,
RegisterCount numRegisters);

void initFromSlice(arangodb::velocypack::Slice);

/// @brief auxiliary struct to track how often the same AqlValue is
Expand Down Expand Up @@ -113,8 +109,13 @@ class AqlItemBlock {
using ShadowRowIterator = std::vector<uint32_t>::const_iterator;

protected:
/// @brief create the block
/// Should only ever be called by AqlItemBlockManager, so it's protected
AqlItemBlock(AqlItemBlockManager&, size_t numRows,
RegisterCount numRegisters);

/// @brief destroy the block
/// Should only ever be deleted by AqlItemManager::returnBlock, so the
/// Should only ever be deleted by AqlItemBlockManager::returnBlock, so the
/// destructor is protected.
~AqlItemBlock();

Expand Down Expand Up @@ -407,11 +408,12 @@ class AqlItemBlock {
explicit OwnershipChecker(std::atomic<std::thread::id>& v) : _v(v) {
auto old =
_v.exchange(std::this_thread::get_id(), std::memory_order_relaxed);
TRI_ASSERT(old == std::thread::id());
TRI_ASSERT(old == std::thread::id()) << "old=" << old;
}
~OwnershipChecker() {
auto old = _v.exchange(std::thread::id(), std::memory_order_relaxed);
TRI_ASSERT(old == std::this_thread::get_id());
TRI_ASSERT(old == std::this_thread::get_id())
<< "old=" << old << ", this=" << std::this_thread::get_id();
}
std::atomic<std::thread::id>& _v;
};
Expand Down
19 changes: 18 additions & 1 deletion arangod/Aql/AqlItemBlockManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ AqlItemBlockManager::AqlItemBlockManager(
: _resourceMonitor(resourceMonitor) {}

/// @brief destroy the manager
AqlItemBlockManager::~AqlItemBlockManager() { delete _constValueBlock; }
AqlItemBlockManager::~AqlItemBlockManager() {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_leasedBlocks == 0) << "leased blocks left: " << _leasedBlocks;
#endif

delete _constValueBlock;
}

void AqlItemBlockManager::initializeConstValueBlock(RegisterCount nrRegs) {
TRI_ASSERT(_constValueBlock == nullptr);
Expand Down Expand Up @@ -89,13 +95,24 @@ SharedAqlItemBlockPtr AqlItemBlockManager::requestBlock(
TRI_ASSERT(block->getRefCount() == 0);
TRI_ASSERT(block->hasShadowRows() == false);

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
++_leasedBlocks;
#endif

return SharedAqlItemBlockPtr{block};
}

/// @brief return a block to the manager
void AqlItemBlockManager::returnBlock(AqlItemBlock*& block) noexcept {
TRI_ASSERT(block != nullptr);
TRI_ASSERT(block->getRefCount() == 0);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto leasedBlocks = _leasedBlocks.load();
do {
TRI_ASSERT(leasedBlocks > 0);
} while (
!_leasedBlocks.compare_exchange_weak(leasedBlocks, leasedBlocks - 1));
#endif

size_t const targetSize = block->capacity();
uint32_t const i = Bucket::getId(targetSize);
Expand Down
3 changes: 3 additions & 0 deletions arangod/Aql/AqlItemBlockManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class AqlItemBlockManager {

static constexpr uint32_t numBuckets = 12;
static constexpr size_t numBlocksPerBucket = 7;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
std::atomic<size_t> _leasedBlocks = 0;
#endif

struct Bucket {
std::array<AqlItemBlock*, numBlocksPerBucket> blocks;
Expand Down
37 changes: 22 additions & 15 deletions arangod/Aql/ExecutionNode/GatherNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ constexpr std::string_view kParallelismParallel("parallel");
constexpr std::string_view kParallelismSerial("serial");
constexpr std::string_view kParallelismUndefined("undefined");

constexpr std::string_view toString(GatherNode::Parallelism value) noexcept {
switch (value) {
case GatherNode::Parallelism::Parallel:
return kParallelismParallel;
case GatherNode::Parallelism::Serial:
return kParallelismSerial;
case GatherNode::Parallelism::Undefined:
default:
return kParallelismUndefined;
}
}

constexpr GatherNode::Parallelism parallelismFromString(
std::string_view value) noexcept {
if (value == kParallelismParallel) {
Expand Down Expand Up @@ -100,7 +88,28 @@ bool toSortMode(std::string_view str, GatherNode::SortMode& mode) noexcept {
return true;
}

std::string_view toString(GatherNode::SortMode mode) noexcept {
} // namespace

auto arangodb::aql::toString(GatherNode::Parallelism value) noexcept
-> std::string_view {
switch (value) {
case GatherNode::Parallelism::Parallel:
return kParallelismParallel;
case GatherNode::Parallelism::Serial:
return kParallelismSerial;
case GatherNode::Parallelism::Undefined:
return kParallelismUndefined;
default:
LOG_TOPIC("c9367", FATAL, Logger::AQL)
<< "Invalid value for parallelism: "
<< static_cast<std::underlying_type_t<GatherNode::Parallelism>>(
value);
FATAL_ERROR_ABORT();
}
}

auto arangodb::aql::toString(GatherNode::SortMode mode) noexcept
-> std::string_view {
switch (mode) {
case GatherNode::SortMode::MinElement:
return kSortModeMinElement;
Expand All @@ -114,8 +123,6 @@ std::string_view toString(GatherNode::SortMode mode) noexcept {
}
}

} // namespace

/*static*/ Collection const* GatherNode::findCollection(
GatherNode const& root) noexcept {
ExecutionNode const* node = root.getFirstDependency();
Expand Down
3 changes: 3 additions & 0 deletions arangod/Aql/ExecutionNode/GatherNode.h
10000
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,8 @@ class GatherNode final : public ExecutionNode {
size_t _limit;
};

auto toString(GatherNode::SortMode mode) noexcept -> std::string_view;
auto toString(GatherNode::Parallelism) noexcept -> std::string_view;

} // namespace aql
} // namespace arangodb
10 changes: 9 additions & 1 deletion arangod/Aql/Executor/ModificationExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ModificationExecutor {
using Stats = ModificationStats;

ModificationExecutor(FetcherType&, Infos&);
~ModificationExecutor() = default;
~ModificationExecutor();

[[nodiscard]] auto produceRows(typename FetcherType::DataRange& input,
OutputAqlItemRow& output)
Expand Down Expand Up @@ -206,4 +206,12 @@ class ModificationExecutor {
Stats _stats{};
};

template<typename FetcherType, typename ModifierType>
ModificationExecutor<FetcherType, ModifierType>::~ModificationExecutor() {
// Clear all InputAqlItemRows the modifier still holds, and with it the
// SharedAqlItemBlockPtrs. This is so the referenced AqlItemBlocks can be
// returned to the AqlItemBlockManager now, while the latter still exists.
_modifier->clearRows();
}

} // namespace arangodb::aql
14 changes: 12 additions & 2 deletions arangod/Aql/QuerySnippet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include "QuerySnippet.h"

#include "Aql/Collection.h"
#include "Aql/ExecutionNode/CollectionAccessingNode.h"
#include "Aql/ExecutionNode/DistributeConsumerNode.h"
#include "Aql/ExecutionNode/DistributeNode.h"
Expand All @@ -40,6 +39,8 @@
#include "Basics/Exceptions.h"
#include "Basics/StringUtils.h"
#include "Cluster/ServerState.h"
#include "Logger/LogLevel.h"
#include "Logger/LogMacros.h"

#ifdef USE_ENTERPRISE
#include "Enterprise/Aql/LocalGraphNode.h"
Expand Down Expand Up @@ -470,6 +471,15 @@ void QuerySnippet::serializeIntoBuilder(
// it needs to expose its input register by all means
internalGather->setVarsUsedLater(_nodes.front()->getVarsUsedLaterStack());
internalGather->setRegsToClear({});
// No DBServer-internal parallelism (yet)
auto const parallelism = internalGather->parallelism();
LOG_TOPIC_IF("dd0f1", DEBUG, Logger::QUERIES,
parallelism != GatherNode::Parallelism::Serial)
<< "Overriding parallelism of " << toString(parallelism) << " with "
<< toString(GatherNode::Parallelism::Serial)
<< " on the DBServer's gather node (belonging to " << _sinkNode->id()
<< ")";
internalGather->setParallelism(GatherNode::Parallelism::Serial);
auto const reservedId = ExecutionNodeId::InternalNode;
nodeAliases.try_emplace(internalGather->id(), reservedId);

Expand Down Expand Up @@ -520,7 +530,7 @@ void QuerySnippet::serializeIntoBuilder(
}
} else {
// In this case we actually do not care for the real value, we just need
// to ensure that every client get's exactly one copy.
// to ensure that every client gets exactly one copy.
for (size_t i = 0; i < numberOfShardsToPermutate; i++) {
distIds.emplace_back(StringUtils::itoa(i));
}
Expand Down
12 changes: 5 additions & 7 deletions arangod/Aql/SimpleModifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,6 @@ ExecutionState SimpleModifier<ModifierCompletion, Enable>::transact(

auto result = _completion.transact(trx, _accumulator.closeAndGetContents());

// we are currently waiting here for the `result` future to get
// ready before we continue. this makes the AQL modification
// operations blocking as in previous versions of ArangoDB.
// TODO: fix this and make it truly non-blocking (requires to
// fix some lifecycle issues for AQL queries first).
result.wait();

if (result.isReady()) {
_results = std::move(result.waitAndGet());
return ExecutionState::DONE;
Expand Down Expand Up @@ -360,6 +353,11 @@ bool SimpleModifier<ModifierCompletion,
_results);
}

template<typename ModifierCompletion, typename Enable>
void SimpleModifier<ModifierCompletion, Enable>::clearRows() noexcept {
_operations.clear();
}

template class ::arangodb::aql::SimpleModifier<InsertModifierCompletion>;
template class ::arangodb::aql::SimpleModifier<RemoveModifierCompletion>;
template class ::arangodb::aql::SimpleModifier<UpdateReplaceModifierCompletion>;
5 changes: 5 additions & 0 deletions arangod/Aql/SimpleModifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ class SimpleModifier : public std::enable_shared_from_this<
bool hasResultOrException() const noexcept;
bool hasNeitherResultNorOperationPending() const noexcept;

// Destroy all InputAqlItemRows, and with it SharedAqlItemBlockPtrs, this
// holds. This is necessary to ensure the lifetime of the AqlItemBlocks is
// shorter than of the AqlItemBlockManager, to which they are returned.
void clearRows() noexcept;

private:
[[nodiscard]] bool resultAvailable() const;
[[nodiscard]] VPackArrayIterator getResultsIterator() const;
Expand Down
2 changes: 2 additions & 0 deletions arangod/Aql/UpsertModifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,5 @@ bool UpsertModifier::hasResultOrException() const noexcept {
bool UpsertModifier::hasNeitherResultNorOperationPending() const noexcept {
return resultState() == ModificationExecutorResultState::NoResult;
}

void UpsertModifier::clearRows() noexcept { _operations.clear(); }
5 changes: 5 additions & 0 deletions arangod/Aql/UpsertModifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ class UpsertModifier {
bool hasResultOrException() const noexcept;
bool hasNeitherResultNorOperationPending() const noexcept;

// Destroy all InputAqlItemRows, and with it SharedAqlItemBlockPtrs, this
// holds. This is necessary to ensure the lifetime of the AqlItemBlocks is
// shorter than of the AqlItemBlockManager, to which they are returned.
void clearRows() noexcept;

private:
bool resultAvailable() const;
VPackArrayIterator getUpdateResultsIterator() const;
Expand Down
5 changes: 4 additions & 1 deletion arangod/Transaction/Helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -905,9 +905,12 @@ BuilderLeaser::BuilderLeaser(Context* transactionContext)
BuilderLeaser::BuilderLeaser(Methods* trx)
: BuilderLeaser{trx->transactionContextPtr()} {}

BuilderLeaser::~BuilderLeaser() {
BuilderLeaser::~BuilderLeaser() { clear(); }

void BuilderLeaser::clear() {
if (_builder != nullptr) {
_transactionContext->returnBuilder(_builder);
_builder = nullptr;
}
}

Expand Down
2 changes: 2 additions & 0 deletions arangod/Transaction/Helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ class BuilderLeaser {
velocypack::Builder* get() const noexcept { return _builder; }
velocypack::Builder* steal() { return std::exchange(_builder, nullptr); }

void clear();

private:
Context* _transactionContext;
velocypack::Builder* _builder;
Expand Down
Loading
0