8000 [APM-84] Support for external result sets (#16421) · arangodb/arangodb@b43fbec · GitHub
[go: up one dir, main page]

Skip to content

Commit b43fbec

Browse files
cpjuliajsteemannneunhoef
authored
[APM-84] Support for external result sets (#16421)
* Started creating API for rocksdb temp storage of query entries and modified executor to handle inputs incrementally intead of matrix * Started a rocksdb instance using a feature as API wrapper instead of RocksDBEngine * "Added sort executor to execution list" * Implemented rocksdb storage for SORT entries over threshold * Updated tests with new signature * Added deletion from rocksdb, fixed typo * Started using RocksDBSstFileMethods for insertions with SstFileWriter for optimizing, removed already inserted include * Updated sort executor * Updated sort executor for compilation * create a storage-engine-agnostic TemporaryStorage feature This is WIP, as the feature does not nothing so far except creating a temp directory and cleaning it. Next step is to wire the storage engine. * add RocksDB implementation for sorted rows storage backend * added two-stage storage backend, with spillover from memory to rocksdb * fix staged backend * allow using process id ('$PID') in startup parameter * allow using process id ('$PID') in startup parameter * adjust CHANGELOG * fix encryption * prevent duplicate cleanup * make log message less verbose * set several RocksDB options * Added the startup option of temp dir storage --temp.intermediate-results-path to server startup for unittests * turn off intermediate storage on agents improve efficiency of inserts into intermediate storage * Changed name of rocksdb comparator for temp storage in queries * fix performance regression in in-memory sorter * fix assertion failure * Fixed comparator name * simplification * follow coding guidelines * micro optimization for serialization * Added threshold memory usage parameter to query * Added test, to be continued * fix broken tests * split tests * small cleanup * fix test * cleanup * added to do * cleanup, failure points * renamed "thresholdNumRows" to "spillOverThresholdNumRows" and added "spillOverThresholdMemoryUsage" * Added max disk capacity control and failure tests * mark options as experimental * added code comments * properly honor capacity limit * add more startup options for thresholds * fix compile errors * added CHANGELOG * fix compile error on macOS * fix another compile error * Update CHANGELOG Co-authored-by: Max Neunhöffer <max@arangodb.com> * Update CHANGELOG Co-authored-by: Max Neunhöffer <max@arangodb.com> * Update CHANGELOG Co-authored-by: Max Neunhöffer <max@arangodb.com> * Addressed suggestion of reserve row indexes size * Addressed hasReachedCapacityLimit comments * Added comment explaining the bytes in rocksdb custom comparator * Addressed suggestion to check if we actually have the prefixId's correct size so don't read out of scope in the comparator Co-authored-by: jsteemann <jsteemann@users.noreply.github.com> Co-authored-by: Max Neunhöffer <max@arangodb.com>
1 parent ed1fc6f commit b43fbec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2595
-220
lines changed

CHANGELOG

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,53 @@
11
devel
22
-----
33

4+
* APM-84: Added option to spill intermediate AQL query results from RAM to
5+
disk when their size exceeds certain thresholds. Currently the only AQL
6+
operation that can make use of this is the SortExecutor (AQL SORT operation
7+
without using a LIMIT). Further AQL executor types will be supported in
8+
future releases.
9+
10+
Spilling over query results from RAM to disk is off by default and currently
11+
in an experimental stage. In order to opt in to the feature, it is required
12+
to set the following startup option `--temp.intermediate-results-path`.
13+
The directory specified here must not be located underneath the instance's
14+
database directory.
15+
When this startup option is specified, ArangoDB assumes ownership of that
16+
directory and will wipe its contents on startup and shutdown. The directory
17+
can be placed on ephemeral storage, as the data stored inside it is there
18+
only temporarily, while the instance is running. It does not need to be
19+
persisted across instance restarts and does not need to be backed up.
20+
21+
When a directory is specified via the startup option, the following
22+
additional configuration options can be used to control the threshold
23+
values for spilling over data:
24+
25+
* `--temp.intermediate-results-capacity`: maximum on-disk size (in bytes)
26+
for intermediate results. If set to 0, it means that the on-disk size
27+
is not constrained. It can be set to a value other than 0 to restrict the
28+
size of the temporary directory. Once the cumulated on-disk size of
29+
intermediate results reaches the configured maximum capacity, the
30+
query will be aborted with failure "disk capacity limit for intermediate
31+
results exceeded".
32+
* `--temp.intermediate-results-spillover-threshold-num-rows`: number of
33+
result rows from which on a spillover from RAM to disk will happen.
34+
* `--temp.intermediate-results-spillover-threshold-memory-usage`: memory
35+
usage (in bytes) after which a spillover from RAM to disk will happen.
36+
* `--temp.intermediate-results-encryption`: whether or not the on-disk
37+
data should be encrypted. This option is only available in the Enterprise
38+
Edition.
39+
* `--temp.-intermediate-results-encryption-hardware-acceleration`: whether
40+
or not to use hardware acceleration for the on-disk encryption. This
41+
option is only available in the Enterprise Edition.
42+
43+
Please note that the feature is currently still experimental and may slightly
44+
change in future releases. As mentioned, the only Executor that can make
45+
use of spilling data to disk is the SortExecutor (SORT without LIMIT).
46+
Also note that the query results will still be built up entirely in RAM
47+
on coordinators and single servers for non-streaming queries. In order to
48+
avoid the buildup of the entire query result in RAM, a streaming query
49+
should be used.
50+
451
* Enterprise only: Added `MINHASH`, `MINHASH_MATCH`, `MINHASH_ERROR`,
552
`MINHASH_COUNT` AQL functions.
653

@@ -20,7 +67,7 @@ devel
2067
values.
2168

2269
* Added startup option `--rocksdb.compaction-style` to configure the compaction
23-
style which is used to pick the next file(s) to be compacted.
70+
style which is used to pick the next file(s) to be compacted.
2471

2572
* BugFix in Pregel's Label Propagation: the union of three undirected cliques
2673
of size at least three connected by an undirected triangle now returns

arangod/Aql/AqlItemBlock.cpp

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ AqlItemBlock::AqlItemBlock(AqlItemBlockManager& manager, size_t numRows,
9898
}
9999

100100
/// @brief init the block from VelocyPack, note that this can throw
101-
void AqlItemBlock::initFromSlice(VPackSlice const slice) {
101+
void AqlItemBlock::initFromSlice(VPackSlice slice) {
102102
int64_t numRows =
103103
VelocyPackHelper::getNumericValue<int64_t>(slice, "nrItems", 0);
104104
if (numRows <= 0) {
@@ -871,25 +871,23 @@ void AqlItemBlock::toVelocyPack(size_t from, size_t to,
871871
}
872872

873873
void AqlItemBlock::rowToSimpleVPack(
874-
size_t const row, velocypack::Options const* options,
874+
size_t row, velocypack::Options const* options,
875875
arangodb::velocypack::Builder& builder) const {
876-
{
877-
VPackArrayBuilder rowBuilder{&builder};
876+
VPackArrayBuilder rowBuilder{&builder};
878877

879-
if (isShadowRow(row)) {
880-
builder.add(VPackValue(getShadowRowDepth(row)));
878+
if (isShadowRow(row)) {
879+
builder.add(VPackValue(getShadowRowDepth(row)));
880+
} else {
881+
builder.add(VPackSlice::nullSlice());
882+
}
883+
auto const n = numRegisters();
884+
for (RegisterId::value_t reg = 0; reg < n; ++reg) {
885+
AqlValue const& ref = getValueReference(row, reg);
886+
if (ref.isEmpty()) {
887+
builder.add(VPackSlice::noneSlice());
881888
} else {
882-
builder.add(VPackSlice::nullSlice());
883-
}
884-
auto const n = numRegisters();
885-
for (RegisterId::value_t reg = 0; reg < n; ++reg) {
886-
AqlValue const& ref = getValueReference(row, reg);
887-
if (ref.isEmpty()) {
888-
builder.add(VPackSlice::noneSlice());
889-
} else {
890-
ref.toVelocyPack(options, builder, /*resolveExternals*/ false,
891-
/*allowUnindexed*/ true);
892-
}
889+
ref.toVelocyPack(options, builder, /*resolveExternals*/ false,
890+
/*allowUnindexed*/ true);
893891
}
894892
}
895893
}
@@ -963,10 +961,13 @@ AqlItemBlock::~AqlItemBlock() {
963961

964962
void AqlItemBlock::increaseMemoryUsage(size_t value) {
965963
resourceMonitor().increaseMemoryUsage(value);
964+
_memoryUsage += value;
966965
}
967966

968967
void AqlItemBlock::decreaseMemoryUsage(size_t value) noexcept {
969968
resourceMonitor().decreaseMemoryUsage(value);
969+
TRI_ASSERT(_memoryUsage >= value);
970+
_memoryUsage -= value;
970971
}
971972

972973
AqlValue AqlItemBlock::getValue(size_t index, RegisterId varNr) const {

arangod/Aql/AqlItemBlock.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,9 @@ class AqlItemBlock {
323323
/// @brief return the number of ShadowRows
324324
size_t numShadowRows() const noexcept;
325325

326+
/// @brief get the current memory usage
327+
std::uint64_t getMemoryUsage() const noexcept { return _memoryUsage; }
328+
326329
/// @brief Moves all values *from* source *to* this block.
327330
/// Returns the row index of the last written row plus one (may equal size()).
328331
/// Expects size() - targetRow >= source->size(); and, of course, an equal
@@ -376,6 +379,9 @@ class AqlItemBlock {
376379
/// should be added to this map. Other types (VPACK_INLINE) are not supported.
377380
containers::FlatHashMap<void const*, ValueInfo> _valueCount;
378381

382+
/// @brief _memoryUsage, memory usage
383+
uint64_t _memoryUsage = 0;
384+
379385
/// @brief _numRows, number of rows
380386
size_t _numRows = 0;
381387

arangod/Aql/MultiAqlItemBlockInputRange.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ auto MultiAqlItemBlockInputRange::resizeOnce(MainQueryState state,
6666
}
6767

6868
auto MultiAqlItemBlockInputRange::upstreamState(
69-
size_t const dependency) const noexcept -> ExecutorState {
69+
size_t dependency) const noexcept -> ExecutorState {
7070
TRI_ASSERT(dependency < _inputs.size());
7171
return _inputs.at(dependency).upstreamState();
7272
}
@@ -83,37 +83,37 @@ auto MultiAqlItemBlockInputRange::hasDataRow() const noexcept -> bool {
8383
[](AqlItemBlockInputRange const& i) -> bool { return i.hasDataRow(); });
8484
}
8585

86-
auto MultiAqlItemBlockInputRange::hasDataRow(
87-
size_t const dependency) const noexcept -> bool {
86+
auto MultiAqlItemBlockInputRange::hasDataRow(size_t dependency) const noexcept
87+
-> bool {
8888
TRI_ASSERT(dependency < _inputs.size());
8989
return _inputs.at(dependency).hasDataRow();
9090
}
9191

92-
auto MultiAqlItemBlockInputRange::rangeForDependency(size_t const dependency)
92+
auto MultiAqlItemBlockInputRange::rangeForDependency(size_t dependency)
9393
-> AqlItemBlockInputRange& {
9494
TRI_ASSERT(dependency < _inputs.size());
9595
return _inputs.at(dependency);
9696
}
9797

98-
auto MultiAqlItemBlockInputRange::peekDataRow(size_t const dependency) const
98+
auto MultiAqlItemBlockInputRange::peekDataRow(size_t dependency) const
9999
-> std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> {
100100
TRI_ASSERT(dependency < _inputs.size());
101101
return _inputs.at(dependency).peekDataRow();
102102
}
103103

104-
auto MultiAqlItemBlockInputRange::skipAll(size_t const dependency) noexcept
104+
auto MultiAqlItemBlockInputRange::skipAll(size_t dependency) noexcept
105105
-> std::size_t {
106106
TRI_ASSERT(dependency < _inputs.size());
107107
return _inputs.at(dependency).skipAll();
108108
}
109109

110110
auto MultiAqlItemBlockInputRange::skippedInFlight(
111-
size_t const dependency) const noexcept -> std::size_t {
111+
size_t dependency) const noexcept -> std::size_t {
112112
TRI_ASSERT(dependency < _inputs.size());
113113
return _inputs.at(dependency).skippedInFlight();
114114
}
115115

116-
auto MultiAqlItemBlockInputRange::nextDataRow(size_t const dependency)
116+
auto MultiAqlItemBlockInputRange::nextDataRow(size_t dependency)
117117
-> std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> {
118118
TRI_ASSERT(dependency < _inputs.size());
119119
return _inputs.at(dependency).nextDataRow();
@@ -220,14 +220,14 @@ auto MultiAqlItemBlockInputRange::nextShadowRow()
220220
return {state, std::move(shadowRow)};
221221
}
222222

223-
auto MultiAqlItemBlockInputRange::getBlock(
224-
size_t const dependency) const noexcept -> SharedAqlItemBlockPtr {
223+
auto MultiAqlItemBlockInputRange::getBlock(size_t dependency) const noexcept
224+
-> SharedAqlItemBlockPtr {
225225
TRI_ASSERT(dependency < _inputs.size());
226226
return _inputs[dependency].getBlock();
227227
}
228228

229229
auto MultiAqlItemBlockInputRange::setDependency(
230-
size_t const dependency, AqlItemBlockInputRange const& range) -> void {
230+
size_t dependency, AqlItemBlockInputRange const& range) -> void {
231231
TRI_ASSERT(dependency < _inputs.size());
232232
_inputs.at(dependency) = range;
233233
}
@@ -267,14 +267,14 @@ auto MultiAqlItemBlockInputRange::skipAllShadowRowsOfDepth(size_t depth)
267267
}
268268

269269
// Subtract up to count rows from the local _skipped state
270-
auto MultiAqlItemBlockInputRange::skipForDependency(size_t const dependency,
270+
auto MultiAqlItemBlockInputRange::skipForDependency(size_t dependency,
271271
size_t count) -> size_t {
272272
TRI_ASSERT(dependency < _inputs.size());
273273
return _inputs.at(dependency).skip(count);
274274
}
275275

276276
// Skip all that is available
277-
auto MultiAqlItemBlockInputRange::skipAllForDependency(size_t const dependency)
277+
auto MultiAqlItemBlockInputRange::skipAllForDependency(size_t dependency)
278278
-> size_t {
279279
TRI_ASSERT(dependency < _inputs.size());
280280
return _inputs.at(dependency).skipAll();

arangod/Aql/MultiAqlItemBlockInputRange.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ class MultiAqlItemBlockInputRange {
5151
* @return AqlItemBlockInputRange& Modifyable reference to the input data
5252
* stream
5353
*/
54-
auto rangeForDependency(size_t const dependency) -> AqlItemBlockInputRange&;
54+
auto rangeForDependency(size_t dependency) -> AqlItemBlockInputRange&;
5555

5656
std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> peekDataRow(
57-
size_t const dependency) const;
57+
size_t dependency) const;
5858
std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> nextDataRow(
59-
size_t const dependency);
60-
auto skipAll(size_t const dependency) noexcept -> std::size_t;
59+
size_t dependency);
60+
auto skipAll(size_t dependency) noexcept -> std::size_t;
6161

6262
[[nodiscard]] auto skippedInFlight(size_t dependency) const noexcept
6363
-> std::size_t;
@@ -88,9 +88,9 @@ class MultiAqlItemBlockInputRange {
8888
auto skipAllShadowRowsOfDepth(size_t depth) -> std::vector<size_t>;
8989

9090
// Subtract up to count rows from the local _skipped state
91-
auto skipForDependency(size_t const dependency, size_t count) -> size_t;
91+
auto skipForDependency(size_t dependency, size_t count) -> size_t;
9292
// Skipp all that is available
93-
auto skipAllForDependency(size_t const dependency) -> size_t;
93+
auto skipAllForDependency(size_t dependency) -> size_t;
9494

9595
auto numberDependencies() const noexcept -> size_t;
9696

arangod/Aql/MultiDependencySingleRowFetcher.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ auto MultiDependencySingleRowFetcher::executeForDependency(
167167
}
168168
TRI_ASSERT(block != nullptr);
169169
auto [start, end] = block->getRelevantRange();
170-
return {
171-
state, skipped,
172-
AqlItemBlockInputRange{execState, skipped.getSkipCount(), block, start}};
170+
return {state, skipped,
171+
AqlItemBlockInputRange{execState, skipped.getSkipCount(),
172+
std::move(block), start}};
173173
}
174174

175175
auto MultiDependencySingleRowFetcher::execute(AqlCallStack const& stack,

arangod/Aql/OutputAqlItemRow.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ void OutputAqlItemRow::copyOrMoveRow(ItemRowType& sourceRow,
206206
// We either have a shadowRow, or we need to have all values written
207207
TRI_ASSERT((std::is_same_v<ItemRowType, ShadowAqlItemRow>) ||
208208
allValuesWritten());
209-
if (_inputRowCopied) {
209+
if (ADB_UNLIKELY(_inputRowCopied)) {
210210
_lastBaseIndex = _baseIndex;
211211
return;
212212
}
@@ -575,10 +575,10 @@ template void OutputAqlItemRow::copyRow<InputAqlItemRow>(
575575
template void OutputAqlItemRow::copyRow<ShadowAqlItemRow>(
576576
ShadowAqlItemRow const& sourceRow, bool ignoreMissing);
577577
template void OutputAqlItemRow::cloneValueInto<InputAqlItemRow>(
578-
RegisterId registerId, const InputAqlItemRow& sourceRow,
578+
RegisterId registerId, InputAqlItemRow const& sourceRow,
579579
AqlValue const& value);
580580
template void OutputAqlItemRow::cloneValueInto<ShadowAqlItemRow>(
581-
RegisterId registerId, const ShadowAqlItemRow& sourceRow,
581+
RegisterId registerId, ShadowAqlItemRow const& sourceRow,
582582
AqlValue const& value);
583583
template void OutputAqlItemRow::moveValueInto<InputAqlItemRow, AqlValueGuard>(
584584
RegisterId registerId, InputAqlItemRow const& sourceRow,

arangod/Aql/QueryOptions.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@
2323

2424
#include "QueryOptions.h"
2525

26-
#include "ApplicationFeatures/ApplicationServer.h"
2726
#include "Aql/QueryCache.h"
2827
#include "Aql/QueryRegistry.h"
2928
#include "Basics/StaticStrings.h"
30-
#include "RestServer/QueryRegistryFeature.h"
3129

3230
#include <velocypack/Builder.h>
3331
#include <velocypack/Iterator.h>
@@ -44,6 +42,8 @@ size_t QueryOptions::defaultMaxNodesPerCallstack = 150;
4442
#else
4543
size_t QueryOptions::defaultMaxNodesPerCallstack = 250;
4644
#endif
45+
size_t QueryOptions::defaultSpillOverThresholdNumRows = 5000000;
46+
size_t QueryOptions::defaultSpillOverThresholdMemoryUsage = 128 * 1024 * 1024;
4747
double QueryOptions::defaultMaxRuntime = 0.0;
4848
double QueryOptions::defaultTtl;
4949
bool QueryOptions::defaultFailOnWarning = false;
@@ -54,6 +54,9 @@ QueryOptions::QueryOptions()
5454
maxNumberOfPlans(QueryOptions::defaultMaxNumberOfPlans),
5555
maxWarningCount(10),
5656
maxNodesPerCallstack(QueryOptions::defaultMaxNodesPerCallstack),
57+
spillOverThresholdNumRows(QueryOptions::defaultSpillOverThresholdNumRows),
58+
spillOverThresholdMemoryUsage(
59+
QueryOptions::defaultSpillOverThresholdMemoryUsage),
5760
maxRuntime(0.0),
5861
satelliteSyncWait(60.0),
5962
ttl(QueryOptions::defaultTtl), // get global default ttl
@@ -141,6 +144,16 @@ void QueryOptions::fromVelocyPack(VPackSlice slice) {
141144
maxNodesPerCallstack = value.getNumber<size_t>();
142145
}
143146

147+
value = slice.get("spillOverThresholdNumRows");
148+
if (value.isNumber()) {
149+
spillOverThresholdNumRows = value.getNumber<size_t>();
150+
}
151+
152+
value = slice.get("spillOverThresholdMemoryUsage");
153+
if (value.isNumber()) {
154+
spillOverThresholdMemoryUsage = value.getNumber<size_t>();
155+
}
156+
144157
value = slice.get("maxRuntime");
145158
if (value.isNumber()) {
146159
maxRuntime = value.getNumber<double>();
@@ -261,6 +274,10 @@ void QueryOptions::toVelocyPack(VPackBuilder& builder,
261274
builder.add("maxNumberOfPlans", VPackValue(maxNumberOfPlans));
262275
builder.add("maxWarningCount", VPackValue(maxWarningCount));
263276
builder.add("maxNodesPerCallstack", VPackValue(maxNodesPerCallstack));
277+
builder.add("spillOverThresholdNumRows",
278+
VPackValue(spillOverThresholdNumRows));
279+
builder.add("spillOverThresholdMemoryUsage",
280+
VPackValue(spillOverThresholdMemoryUsage));
264281
builder.add("maxRuntime", VPackValue(maxRuntime));
265282
builder.add("satelliteSyncWait", VPackValue(satelliteSyncWait));
266283
builder.add("ttl", VPackValue(ttl));

arangod/Aql/QueryOptions.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ struct QueryOptions {
7676
size_t maxNumberOfPlans;
7777
size_t maxWarningCount;
7878
size_t maxNodesPerCallstack;
79+
size_t spillOverThresholdNumRows;
80+
size_t spillOverThresholdMemoryUsage;
7981
double maxRuntime; // query has to execute within the given time or will be
8082
// killed
8183
double satelliteSyncWait;
@@ -125,6 +127,8 @@ struct QueryOptions {
125127
static size_t defaultMemoryLimit;
126128
static size_t defaultMaxNumberOfPlans;
127129
static size_t defaultMaxNodesPerCallstack;
130+
static size_t defaultSpillOverThresholdNumRows;
131+
static size_t defaultSpillOverThresholdMemoryUsage;
128132
static double defaultMaxRuntime;
129133
static double defaultTtl;
130134
static bool defaultFailOnWarning;

0 commit comments

Comments
 (0)
0