8000 Make AQL modification operations asynchronous (#21068) · arangodb/arangodb@a789fad · GitHub
[go: up one dir, main page]

Skip to content

Commit a789fad

Browse files
goedderzjsteemann
andauthored
Make AQL modification operations asynchronous (#21068)
* Try to avoid synchronous waiting in AQL, and fix transaction lifetime issues * Make the Query's resource monitor a shared_ptr * Revert "Try to avoid synchronous waiting in AQL, and fix transaction lifetime issues" This reverts commit c787f51. * Included review comments * Shorten BuilderLeaser lifetimes, and copy vocbase name * Added a maintainer-mode assertion * Serialize DBServer modification operations * Fixed gtests and CE compilation * Fixed gtests * Clean references to SharedAqlItemBlockPtr early enough * Fixed CE compilation * Make assertion concurrency-safe * Unify handling of BuilderLeasers in processors * Make internal gather nodes non-parallel in all cases * Update arangod/Transaction/Methods.cpp Co-authored-by: Jan <jsteemann@users.noreply.github.com> * Update arangod/Transaction/Methods.cpp Co-authored-by: Jan <jsteemann@users.noreply.github.com> * Added CHANGELOG entry * Avoid SIGSEGV when a database gets deleted mid-transaction --------- Co-authored-by: Jan <jsteemann@users.noreply.github.com>
1 parent 3100cf6 commit a789fad

30 files changed

+248
-121
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
devel
22
-----
33

4+
* Document modification operations (insert, update, etc) during an AQL query on
5+
DBServers no longer block threads while waiting for replication.
6+
47
* Fixed a bug where accessing a collection right after creation can sometimes
58
fail.
69

arangod/Aql/AqlItemBlock.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,6 @@ class AqlItemBlock {
7171
AqlItemBlock(AqlItemBlock const&) = delete;
7272
AqlItemBlock& operator=(AqlItemBlock const&) = delete;
7373

74-
/// @brief create the block
75-
AqlItemBlock(AqlItemBlockManager&, size_t numRows,
76-
RegisterCount numRegisters);
77-
7874
void initFromSlice(arangodb::velocypack::Slice);
7975

8076
/// @brief auxiliary struct to track how often the same AqlValue is
@@ -113,8 +109,13 @@ class AqlItemBlock {
113109
using ShadowRowIterator = std::vector<uint32_t>::const_iterator;
114110

115111
protected:
112+
/// @brief create the block
113+
/// Should only ever be called by AqlItemBlockManager, so it's protected
114+
AqlItemBlock(AqlItemBlockManager&, size_t numRows,
115+
RegisterCount numRegisters);
116+
116117
/// @brief destroy the block
117-
/// Should only ever be deleted by AqlItemManager::returnBlock, so the
118+
/// Should only ever be deleted by AqlItemBlockManager::returnBlock, so the
118119
/// destructor is protected.
119120
~AqlItemBlock();
120121

@@ -407,11 +408,12 @@ class AqlItemBlock {
407408
explicit OwnershipChecker(std::atomic<std::thread::id>& v) : _v(v) {
408409
auto old =
409410
_v.exchange(std::this_thread::get_id(), std::memory_order_relaxed);
410-
TRI_ASSERT(old == std::thread::id());
411+
TRI_ASSERT(old == std::thread::id()) << "old=" << old;
411412
}
412413
~OwnershipChecker() {
413414
auto old = _v.exchange(std::thread::id(), std::memory_order_relaxed);
414-
TRI_ASSERT(old == std::this_thread::get_id());
415+
TRI_ASSERT(old == std::this_thread::get_id())
416+
<< "old=" << old << ", this=" << std::this_thread::get_id();
415417
}
416418
std::atomic<std::thread::id>& _v;
417419
};

arangod/Aql/AqlItemBlockManager.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ AqlItemBlockManager::AqlItemBlockManager(
3838
: _resourceMonitor(resourceMonitor) {}
3939

4040
/// @brief destroy the manager
41-
AqlItemBlockManager::~AqlItemBlockManager() { delete _constValueBlock; }
41+
AqlItemBlockManager::~AqlItemBlockManager() {
42+
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
43+
TRI_ASSERT(_leasedBlocks == 0) << "leased blocks left: " << _leasedBlocks;
44+
#endif
45+
46+
delete _constValueBlock;
47+
}
4248

4349
void AqlItemBlockManager::initializeConstValueBlock(RegisterCount nrRegs) {
4450
TRI_ASSERT(_constValueBlock == nullptr);
@@ -89,13 +95,24 @@ SharedAqlItemBlockPtr AqlItemBlockManager::requestBlock(
8995
TRI_ASSERT(block->getRefCount() == 0);
9096
TRI_ASSERT(block->hasShadowRows() == false);
9197

98+
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
99+
++_leasedBlocks;
100+
#endif
101+
92102
return SharedAqlItemBlockPtr{block};
93103
}
94104

95105
/// @brief return a block to the manager
96106
void AqlItemBlockManager::returnBlock(AqlItemBlock*& block) noexcept {
97107
TRI_ASSERT(block != nullptr);
98108
TRI_ASSERT(block->getRefCount() == 0);
109+
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
110+
auto leasedBlocks = _leasedBlocks.load();
111+
do {
112+
TRI_ASSERT(leasedBlocks > 0);
113+
} while (
114+
!_leasedBlocks.compare_exchange_weak(leasedBlocks, leasedBlocks - 1));
115+
#endif
99116

100117
size_t const targetSize = block->capacity();
101118
uint32_t const i = Bucket::getId(targetSize);

arangod/Aql/AqlItemBlockManager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ class AqlItemBlockManager {
8989

9090
static constexpr uint32_t numBuckets = 12;
9191
static constexpr size_t numBlocksPerBucket = 7;
92+
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
93+
std::atomic<size_t> _leasedBlocks = 0;
94+
#endif
9295

9396
struct Bucket {
9497
std::array<AqlItemBlock*, numBlocksPerBucket> blocks;

arangod/Aql/ExecutionNode/GatherNode.cpp

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,6 @@ constexpr std::string_view kParallelismParallel("parallel");
5959
constexpr std::string_view kParallelismSerial("serial");
6060
constexpr std::string_view kParallelismUndefined("undefined");
6161

62-
constexpr std::string_view toString(GatherNode::Parallelism value) noexcept {
63-
switch (value) {
64-
case GatherNode::Parallelism::Parallel:
65-
return kParallelismParallel;
66-
case GatherNode::Parallelism::Serial:
67-
return kParallelismSerial;
68-
case GatherNode::Parallelism::Undefined:
69-
default:
70-
return kParallelismUndefined;
71-
}
72-
}
73-
7462
constexpr GatherNode::Parallelism parallelismFromString(
7563
std::string_view value) noexcept {
7664
if (value == kParallelismParallel) {
@@ -100,7 +88,28 @@ bool toSortMode(std::string_view str, GatherNode::SortMode& mode) noexcept {
10088
return true;
10189
}
10290

103-
std::string_view toString(GatherNode::SortMode mode) noexcept {
91+
} // namespace
92+
93+
auto arangodb::aql::toString(GatherNode::Parallelism value) noexcept
94+
-> std::string_view {
95+
switch (value) {
96+
case GatherNode::Parallelism::Parallel:
97+
return kParallelismParallel;
98+
case GatherNode::Parallelism::Serial:
99+
return kParallelismSerial;
100+
case GatherNode::Parallelism::Undefined:
101+
return kParallelismUndefined;
102+
default:
103+
LOG_TOPIC("c9367", FATAL, Logger::AQL)
104+
<< "Invalid value for parallelism: "
105+
<< static_cast<std::underlying_type_t<GatherNode::Parallelism>>(
106+
value);
107+
FATAL_ERROR_ABORT();
108+
}
109+
}
110+
111+
auto arangodb::aql::toString(GatherNode::SortMode mode) noexcept
112+
-> std::string_view {
104113
switch (mode) {
105114
case GatherNode::SortMode::MinElement:
106115
return kSortModeMinElement;
@@ -114,8 +123,6 @@ std::string_view toString(GatherNode::SortMode mode) noexcept {
114123
}
115124
}
116125

117-
} // namespace
118-
119126
/*static*/ Collection const* GatherNode::findCollection(
120127
GatherNode const& root) noexcept {
121128
ExecutionNode const* node = root.getFirstDependency();

arangod/Aql/ExecutionNode/GatherNode.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,5 +147,8 @@ class GatherNode final : public ExecutionNode {
147147
size_t _limit;
148148
};
149149

150+
auto toString(GatherNode::SortMode mode) noexcept -> std::string_view;
151+
auto toString(GatherNode::Parallelism) noexcept -> std::string_view;
152+
150153
} // namespace aql
151154
} // namespace arangodb

arangod/Aql/Executor/ModificationExecutor.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class ModificationExecutor {
170170
using Stats = ModificationStats;
171171

172172
ModificationExecutor(FetcherType&, Infos&);
173-
~ModificationExecutor() = default;
173+
~ModificationExecutor();
174174

175175
[[nodiscard]] auto produceRows(typename FetcherType::DataRange& input,
176176
OutputAqlItemRow& output)
@@ -206,4 +206,12 @@ class ModificationExecutor {
206206
Stats _stats{};
207207
};
208208

209+
template<typename FetcherType, typename ModifierType>
210+
ModificationExecutor<FetcherType, ModifierType>::~ModificationExecutor() {
211+
// Clear all InputAqlItemRows the modifier still holds, and with it the
212+
// SharedAqlItemBlockPtrs. This is so the referenced AqlItemBlocks can be
213+
// returned to the AqlItemBlockManager now, while the latter still exists.
214+
_modifier->clearRows();
215+
}
216+
209217
} // namespace arangodb::aql

arangod/Aql/QuerySnippet.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
#include "QuerySnippet.h"
2525

26-
#include "Aql/Collection.h"
2726
#include "Aql/ExecutionNode/CollectionAccessingNode.h"
2827
#include "Aql/ExecutionNode/DistributeConsumerNode.h"
2928
#include "Aql/ExecutionNode/DistributeNode.h"
@@ -40,6 +39,8 @@
4039
#include "Basics/Exceptions.h"
4140
#include "Basics/StringUtils.h"
4241
#include "Cluster/ServerState.h"
42+
#include "Logger/LogLevel.h"
43+
#include "Logger/LogMacros.h"
4344

4445
#ifdef USE_ENTERPRISE
4546
#include "Enterprise/Aql/LocalGraphNode.h"
@@ -470,6 +471,15 @@ void QuerySnippet::serializeIntoBuilder(
470471
// it needs to expose its input register by all means
471472
internalGather->setVarsUsedLater(_nodes.front()->getVarsUsedLaterStack());
472473
internalGather->setRegsToClear({});
474+
// No DBServer-internal parallelism (yet)
475+
auto const parallelism = internalGather->parallelism();
476+
LOG_TOPIC_IF("dd0f1", DEBUG, Logger::QUERIES,
477+
parallelism != GatherNode::Parallelism::Serial)
478+
<< "Overriding parallelism of " << toString(parallelism) << " with "
479+
<< toString(GatherNode::Parallelism::Serial)
480+
<< " on the DBServer's gather node (belonging to " << _sinkNode->id()
481+
<< ")";
482+
internalGather->setParallelism(GatherNode::Parallelism::Serial);
473483
auto const reservedId = ExecutionNodeId::InternalNode;
474484
nodeAliases.try_emplace(internalGather->id(), reservedId);
475485

@@ -520,7 +530,7 @@ void QuerySnippet::serializeIntoBuilder(
520530
}
521531
} else {
522532
// In this case we actually do not care for the real value, we just need
523-
// to ensure that every client get's exactly one copy.
533+
// to ensure that every client gets exactly one copy.
524534
for (size_t i = 0; i < numberOfShardsToPermutate; i++) {
525535
distIds.emplace_back(StringUtils::itoa(i));
526536
}

arangod/Aql/SimpleModifier.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,6 @@ ExecutionState SimpleModifier<ModifierCompletion, Enable>::transact(
186186

187187
auto result = _completion.transact(trx, _accumulator.closeAndGetContents());
188188

189-
// we are currently waiting here for the `result` future to get
190-
// ready before we continue. this makes the AQL modification
191-
// operations blocking as in previous versions of ArangoDB.
192-
// TODO: fix this and make it truly non-blocking (requires to
193-
// fix some lifecycle issues for AQL queries first).
194-
result.wait();
195-
196189
if (result.isReady()) {
197190
_results = std::move(result.waitAndGet());
198191
return ExecutionState::DONE;
@@ -360,6 +353,11 @@ bool SimpleModifier<ModifierCompletion,
360353
_results);
361354
}
362355

356+
template<typename ModifierCompletion, typename Enable>
357+
void SimpleModifier<ModifierCompletion, Enable>::clearRows() noexcept {
358+
_operations.clear();
359+
}
360+
363361
template class ::arangodb::aql::SimpleModifier<InsertModifierCompletion>;
364362
template class ::arangodb::aql::SimpleModifier<RemoveModifierCompletion>;
365363
template class ::arangodb::aql::SimpleModifier<UpdateReplaceModifierCompletion>;

arangod/Aql/SimpleModifier.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ class SimpleModifier : public std::enable_shared_from_this<
150150
bool hasResultOrException() const noexcept;
151151
bool hasNeitherResultNorOperationPending() const noexcept;
152152

153+
// Destroy all InputAqlItemRows, and with it SharedAqlItemBlockPtrs, this
154+
// holds. This is necessary to ensure the lifetime of the AqlItemBlocks is
155+
// shorter than of the AqlItemBlockManager, to which they are returned.
156+
void clearRows() noexcept;
157+
153158
private:
154159
[[nodiscard]] bool resultAvailable() const;
155160
[[nodiscard]] VPackArrayIterator getResultsIterator() const;

arangod/Aql/UpsertModifier.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,5 @@ bool UpsertModifier::hasResultOrException() const noexcept {
368368
bool UpsertModifier::hasNeitherResultNorOperationPending() const noexcept {
369369
return resultState() == ModificationExecutorResultState::NoResult;
370370
}
371+
372+
void UpsertModifier::clearRows() noexcept { _operations.clear(); }

arangod/Aql/UpsertModifier.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ class UpsertModifier {
116116
bool hasResultOrException() const noexcept;
117117
bool hasNeitherResultNorOperationPending() const noexcept;
118118

119+
// Destroy all InputAqlItemRows, and with it SharedAqlItemBlockPtrs, this
120+
// holds. This is necessary to ensure the lifetime of the AqlItemBlocks is
121+
// shorter than of the AqlItemBlockManager, to which they are returned.
122+
void clearRows() noexcept;
123+
119124
private:
120125
bool resultAvailable() const;
121126
VPackArrayIterator getUpdateResultsIterator() const;

arangod/Transaction/Helpers.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,9 +905,12 @@ BuilderLeaser::BuilderLeaser(Context* transactionContext)
905905
BuilderLeaser::BuilderLeaser(Methods* trx)
906906
: BuilderLeaser{trx->transactionContextPtr()} {}
907907

908-
BuilderLeaser::~BuilderLeaser() {
908+
BuilderLeaser::~BuilderLeaser() { clear(); }
909+
910+
void BuilderLeaser::clear() {
909911
if (_builder != nullptr) {
910912
_transactionContext->returnBuilder(_builder);
913+
_builder = nullptr;
911914
}
912915
}
913916

arangod/Transaction/Helpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ class BuilderLeaser {
189189
velocypack::Builder* get() const noexcept { return _builder; }
190190
velocypack::Builder* steal() { return std::exchange(_builder, nullptr); }
191191

192+
void clear();
193+
192194
private:
193195
Context* _transactionContext;
194196
velocypack::Builder* _builder;

0 commit comments

Comments
 (0)
0