8000 [APM-84] Support for external result sets by cpjulia · Pull Request #16421 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[APM-84] S 10000 upport for external result sets #16421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 104 commits into from
Jul 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
acbd780
Started creating API for rocksdb temp storage of query entries and mo…
cpjulia Jun 17, 2022
3fa9f08
Started a rocksdb instance using a feature as API wrapper instead of …
cpjulia Jun 20, 2022
cc18c9e
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 20, 2022
770ca3f
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 21, 2022
1e713e1
"Added sort executor to execution list"
cpjulia Jun 22, 2022
5384607
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 22, 2022
4c4b07b
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 23, 2022
fd2bb53
Implemented rocksdb storage for SORT entries over threshold
cpjulia Jun 23, 2022
f09a4a1
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 23, 2022
711d971
Updated tests with new signature
cpjulia Jun 23, 2022
b82e51f
Added deletion from rocksdb, fixed typo
cpjulia Jun 24, 2022
b5fd404
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 24, 2022
90108c5
Started using RocksDBSstFileMethods for insertions with SstFileWriter…
cpjulia Jun 27, 2022
3de891e
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 28, 2022
4368dac
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jun 30, 2022
d8bca10
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 1, 2022
b861d4a
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 4, 2022
222e9d5
Updated sort executor
cpjulia Jul 6, 2022
3e3870d
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 6, 2022
da85037
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 6, 2022
070ea0d
Updated sort executor for compilation
cpjulia Jul 6, 2022
d53332b
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 6, 2022
e1e5201
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
992e592
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
e4f5002
create a storage-engine-agnostic TemporaryStorage feature
jsteemann Jul 7, 2022
497a1ac
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
10fd9bb
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
015a0b3
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
b902ad6
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
fa9a627
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 7, 2022
adcf42d
add RocksDB implementation for sorted rows storage backend
jsteemann Jul 8, 2022
6a53d27
added two-stage storage backend, with spillover from memory to rocksdb
jsteemann Jul 8, 2022
b933c43
fix staged backend
jsteemann Jul 8, 2022
3acc65b
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 8, 2022
bab4078
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 10, 2022
1685275
allow using process id ('$PID') in startup parameter
jsteemann Jul 11, 2022
ea0c73b
allow using process id ('$PID') in startup parameter
jsteemann Jul 11, 2022
0b80e37
adjust CHANGELOG
jsteemann Jul 11, 2022
3adc358
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 11, 2022
3f94343
fix encryption
jsteemann Jul 11, 2022
c6f6a90
prevent duplicate cleanup
jsteemann Jul 11, 2022
81d69ca
make log message less verbose
jsteemann Jul 11, 2022
036f37c
set several RocksDB options
jsteemann Jul 11, 2022
008060b
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 11, 2022
7fd9621
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 11, 2022
53011e2
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 11, 2022
9c9f2a5
Added the startup option of temp dir storage --temp.intermediate-resu…
cpjulia Jul 11, 2022
ce8c665
turn off intermediate storage on agents
jsteemann Jul 11, 2022
88b22d7
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 11, 2022
425bb86
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 11, 2022
c543892
Changed name of rocksdb comparator for temp storage in queries
cpjulia Jul 11, 2022
08c8e08
fix performance regression in in-memory sorter
jsteemann Jul 11, 2022
adbdeea
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
jsteemann Jul 11, 2022
ace79d3
fix assertion failure
jsteemann Jul 11, 2022
055d1bf
Fixed comparator name
cpjulia Jul 11, 2022
eed8d1c
simplification
jsteemann Jul 12, 2022
fccb108
follow coding guidelines
jsteemann Jul 12, 2022
4ea41e7
micro optimization for serialization
jsteemann Jul 12, 2022 8000
379219e
Added threshold memory usage parameter to query
cpjulia Jul 12, 2022
fdb7c9c
Added test, to be continued
cpjulia Jul 12, 2022
0615afb
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 12, 2022
8878517
fix broken tests
jsteemann Jul 12, 2022
c8a7247
split tests
jsteemann Jul 12, 2022
c0c3a76
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 12, 2022
b9a9761
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 12, 2022
1db6ac4
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 12, 2022
fe2580b
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 12, 2022
cc4346b
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
jsteemann Jul 12, 2022
3515aa9
small cleanup
jsteemann Jul 12, 2022
7dc1d60
fix test
jsteemann Jul 12, 2022
b99fe7e
cleanup
jsteemann Jul 12, 2022
efa8197
added to do
jsteemann Jul 12, 2022
427c105
cleanup, failure points
jsteemann Jul 12, 2022
ba502b0
renamed "thresholdNumRows" to "spillOverThresholdNumRows"
jsteemann Jul 12, 2022
9b0515a
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 13, 2022
f4cdae4
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 13, 2022
0d97c8a
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
jsteemann Jul 13, 2022
b89bd10
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 13, 2022
5f29b39
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 13, 2022
e7a9bc0
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 13, 2022
c29c5aa
Added max disk capacity control and failure tests
cpjulia Jul 13, 2022
8c87fa5
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 13, 2022
38b9b10
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 13, 2022
a76eb31
mark options as experimental
jsteemann Jul 14, 2022
8153763
added code comments
jsteemann Jul 14, 2022
2487416
properly honor capacity limit
jsteemann Jul 14, 2022
0da00c5
add more startup options for thresholds
jsteemann Jul 14, 2022
7b298b3
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 14, 2022
00cb64c
fix compile errors
jsteemann Jul 14, 2022
44b2dc9
added CHANGELOG
jsteemann Jul 14, 2022
3c2e943
fix compile error on macOS
jsteemann Jul 14, 2022
4037b3e
fix another compile error
jsteemann Jul 14, 2022
b80fd3f
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 14, 2022
bd49144
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 14, 2022
df96e65
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
jsteemann Jul 14, 2022
8c0f8e5
Update CHANGELOG
cpjulia Jul 15, 2022
a850fbd
Update CHANGELOG
cpjulia Jul 15, 2022
8ca2916
Update CHANGELOG
cpjulia Jul 15, 2022
6fa39b3
Merge branch 'devel' of github.com:arangodb/arangodb into feature/APM…
cpjulia Jul 15, 2022
10000
3979d08
Merge branch 'feature/APM-84-rocksdb-store' of github.com:arangodb/ar…
cpjulia Jul 15, 2022
e5e4dc6
Addressed suggestion of reserve row indexes size
cpjulia Jul 15, 2022
b690358
Addressed hasReachedCapacityLimit comments
cpjulia Jul 15, 2022
290f2f1
Added comment explaining the bytes in rocksdb custom comparator
cpjulia Jul 15, 2022
6e7923a
Addressed suggestion to check if we actually have the prefixId's corr…
cpjulia Jul 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Started using RocksDBSstFileMethods for insertions with SstFileWriter…
… for optimizing, removed already inserted include
  • Loading branch information
cpjulia committed Jun 27, 2022
commit 90108c5ea63c97e23cf436cd6a85e97e797eb734
133 changes: 105 additions & 28 deletions arangod/Aql/SortExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "Aql/SingleRowFetcher.h"
#include "Aql/SortRegister.h"
#include "Aql/Stats.h"
#include "Basics/application-exit.h"
#include "Basics/ResourceUsage.h"
#include "RocksDBEngine/RocksDBFormat.h"

Expand Down Expand Up @@ -140,10 +141,15 @@ size_t SortExecutorInfos::limit() const noexcept { return _limit; }

SortExecutor::SortExecutor(Fetcher&, SortExecutorInfos& infos)
: _infos(infos),
_input(nullptr),
_currentRow(CreateInvalidInputRowHint{}),
_returnNext(0),
_memoryUsageForRowIndexes(0),
_rocksDBfeature(_infos.getStorageFeature()),
_tempDB(_rocksDBfeature.tempDB()),
_cfHandle(_rocksDBfeature.cfHandles()[0]),
_methods(std::make_unique<RocksDBSstFileMethods>(
_tempDB, _cfHandle, _rocksDBfeature.tempDBOptions(),
_rocksDBfeature.dataPath())),
_keyPrefix(++::keyPrefixCounter) {
rocksutils::uintToPersistentBigEndian<std::uint64_t>(_upperBoundPrefix,
_keyPrefix + 1);
Expand All @@ -154,16 +160,10 @@ SortExecutor::SortExecutor(Fetcher&, SortExecutorInfos& infos)
}

SortExecutor::~SortExecutor() {
_inputReady = false;
_mustStoreInput = false;
_rowIndexes.clear();
_inputBlocks.clear();
_infos.getResourceMonitor().decreaseMemoryUsage(_memoryUsageForRowIndexes);
}

void SortExecutor::consumeInputForStorage() {
auto& rocksDBfeature = _infos.getStorageFeature();
rocksdb::DB* tempDB = rocksDBfeature.tempDB();
std::string keyWithPrefix;

size_t const avgSliceSize = 50;
Expand All @@ -190,13 +190,27 @@ void SortExecutor::consumeInputForStorage() {
builder.openObject();
newRow.toVelocyPack(_infos.vpackOptions(), builder);
builder.close();
rocksdb::Slice keySlice = keyWithPrefix;
RocksDBKey rocksDBkey(keySlice);
auto res = _methods->Put(
_cfHandle, rocksDBkey,
{builder.slice().startAs<char const>(), builder.slice().byteSize()},
true);
if (!res.ok()) {
LOG_TOPIC("20e7f", FATAL, arangodb::Logger::STARTUP)
<< "unable to write in entry: "
<< rocksutils::convertStatus(res).errorMessage();
FATAL_ERROR_EXIT();
}
/*
_batch.Put(
rocksDBfeature.cfHandles()[0], keyWithPrefix,
_cfHandle, keyWithPrefix,
{builder.slice().startAs<char const>(), builder.slice().byteSize()});
*/
++_inputCounterForStorage;
}
rocksdb::Status s = tempDB->Write(rocksdb::WriteOptions(), &_batch);
_batch.Clear();
// rocksdb::Status s = _tempDB->Write(rocksdb::WriteOptions(), &_batch);
// _batch.Clear();
_rowIndexes.clear();
_inputBlocks.clear();
}
Expand All @@ -215,7 +229,8 @@ void SortExecutor::consumeInput(AqlItemBlockInputRange& inputRange,
// This executor is passthrough. it has enough place to write.
_rowIndexes.emplace_back(
std::make_pair(_inputBlocks.size() - 1, inputRange.getRowIndex()));
if (_memoryUsage >= kMemoryLowerBound) {
if (_rowIndexes.size() >= 5) {
// if (_memoryUsage >= kMemoryLowerBound) {
_mustStoreInput = true;
consumeInputForStorage();
}
Expand All @@ -231,16 +246,58 @@ void SortExecutor::consumeInput(AqlItemBlockInputRange& inputRange,
_memoryUsageForRowIndexes += memoryUsageForRowIndexes;
}

rocksdb::Status SortExecutor::deleteRangeInStorage() {
return rocksdb::DeleteFilesInRange(_infos.getStorageFeature().tempDB(),
_infos.getStorageFeature().cfHandles()[0],
&_lowerBoundSlice, &_upperBoundSlice,
true);
Result SortExecutor::deleteRangeInStorage() { // return our result instead of
// rocksdb one

rocksdb::Status s = rocksdb::DeleteFilesInRange(
_tempDB, _cfHandle, &_lowerBoundSlice, &_upperBoundSlice, false);
if (!s.ok()) {
return rocksutils::convertStatus(s);
}
s = _tempDB->DeleteRange(rocksdb::WriteOptions(), _cfHandle, _lowerBoundSlice,
_upperBoundSlice);

rocksdb::ReadOptions readOptions;
readOptions.iterate_upper_bound = &_upperBoundSlice;

readOptions.prefix_same_as_start = true;
auto it = std::unique_ptr<rocksdb::Iterator>(
_tempDB->NewIterator(readOptions, _cfHandle));

/*
return _infos.getStorageFeature().tempDB()->DeleteRange(
rocksdb::WriteOptions(), _infos.getStorageFeature().cfHandles()[0],
_lowerBoundSlice, _upperBoundSlice);
it->Seek(_lowerBoundSlice);
std::uint64_t counter = 0;
for (; it->Valid(); it->Next()) {
++counter;
}
LOG_DEVEL << "COUNTER " << counter;
*/

return rocksutils::convertStatus(s);
}

Result SortExecutor::ingestFilesForStorage() {
std::vector<std::string> fileNames;
Result res = static_cast<RocksDBSstFileMethods*>(_methods.get())
->stealFileNames(fileNames);
rocksdb::Status s;
if (res.ok() && !fileNames.empty()) {
rocksdb::IngestExternalFileOptions ingestOptions;
ingestOptions.move_files = true;
ingestOptions.failed_move_fall_back_to_copy = true;
ingestOptions.snapshot_consistency = false;
ingestOptions.write_global_seqno = false;
ingestOptions.verify_checksums_before_ingest = false;

s = _tempDB->IngestExternalFile(_cfHandle, fileNames,
std::move(ingestOptions));
}
if (!s.ok()) {
res = rocksutils::convertStatus(s);
LOG_TOPIC("11080", WARN, Logger::ENGINES)
<< "Error in file ingestion: " << res.errorMessage();
}
return res;
}

std::tuple<ExecutorState, NoStats, AqlCall> SortExecutor::produceRows(
Expand Down Expand Up @@ -268,7 +325,9 @@ std::tuple<ExecutorState, NoStats, AqlCall> SortExecutor::produceRows(
auto status = deleteRangeInStorage();
if (!status.ok()) {
LOG_TOPIC("20e7f", FATAL, arangodb::Logger::STARTUP)
<< "unable to initialize RocksDB engine: " << status.ToString();
<< "unable to delete range in RocksDB storage: "
<< status.errorMessage();
FATAL_ERROR_EXIT();
}
}
return {ExecutorState::DONE, NoStats{}, upstreamCall};
Expand All @@ -286,21 +345,27 @@ std::tuple<ExecutorState, NoStats, AqlCall> SortExecutor::produceRows(
}
_inputReady = true;
if (_mustStoreInput) {
auto status = ingestFilesForStorage();
if (!status.ok()) {
LOG_TOPIC("b9731", FATAL, arangodb::Logger::STARTUP)
<< "unable to ingest files in RocksDB storage: "
<< status.errorMessage();
FATAL_ERROR_EXIT();
}
rocksdb::ReadOptions readOptions;

std::string keyPrefix;
rocksutils::uintToPersistentBigEndian<std::uint64_t>(keyPrefix,
_keyPrefix);

readOptions.auto_prefix_mode = true;
// readOptions.auto_prefix_mode = true;

readOptions.iterate_upper_bound = &_upperBoundSlice;

readOptions.prefix_same_as_start = true;

_curIt = std::unique_ptr<rocksdb::Iterator>(
_infos.getStorageFeature().tempDB()->NewIterator(
readOptions, _infos.getStorageFeature().cfHandles()[0]));
_tempDB->NewIterator(readOptions, _cfHandle));
_curIt->Seek(keyPrefix);
}
}
Expand Down Expand Up @@ -341,7 +406,9 @@ std::tuple<ExecutorState, NoStats, AqlCall> SortExecutor::produceRows(
auto status = deleteRangeInStorage();
if (!status.ok()) {
LOG_TOPIC("5a64a", FATAL, arangodb::Logger::STARTUP)
<< "unable to initialize RocksDB engine: " << status.ToString();
<< "unable to delete range in RocksDB storage: "
<< status.errorMessage();
FATAL_ERROR_EXIT();
}
}
state = ExecutorState::DONE;
Expand Down Expand Up @@ -394,7 +461,9 @@ std::tuple<ExecutorState, NoStats, size_t, AqlCall> SortExecutor::skipRowsRange(
auto status = deleteRangeInStorage();
if (!status.ok()) {
LOG_TOPIC("c7490", FATAL, arangodb::Logger::STARTUP)
<< "unable to initialize RocksDB engine: " << status.ToString();
<< "unable to delete range in RocksDB storage: "
<< status.errorMessage();
FATAL_ERROR_EXIT();
}
}
return {ExecutorState::DONE, NoStats{}, 0, upstreamCall};
Expand All @@ -410,6 +479,13 @@ std::tuple<ExecutorState, NoStats, size_t, AqlCall> SortExecutor::skipRowsRange(
}
_inputReady = true;
if (_mustStoreInput) {
auto status = ingestFilesForStorage();
if (!status.ok()) {
LOG_TOPIC("59437", FATAL, arangodb::Logger::STARTUP)
<< "unable to ingest files in RocksDB storage: "
<< status.errorMessage();
FATAL_ERROR_EXIT();
}
rocksdb::ReadOptions readOptions;

readOptions.iterate_upper_bound = &_upperBoundSlice;
Expand All @@ -420,8 +496,7 @@ std::tuple<ExecutorState, NoStats, size_t, AqlCall> SortExecutor::skipRowsRange(
readOptions.prefix_same_as_start = true;

_curIt = std::unique_ptr<rocksdb::Iterator>(
_infos.getStorageFeature().tempDB()->NewIterator(
readOptions, _infos.getStorageFeature().cfHandles()[0]));
_tempDB->NewIterator(readOptions, _cfHandle));
_curIt->Seek(keyPrefix);
}
}
Expand Down Expand Up @@ -450,7 +525,9 @@ std::tuple<ExecutorState, NoStats, size_t, AqlCall> SortExecutor::skipRowsRange(
auto status = deleteRangeInStorage();
if (!status.ok()) {
LOG_TOPIC("43bf7", FATAL, arangodb::Logger::STARTUP)
<< "unable to initialize RocksDB engine: " << status.ToString();
<< "unable to delete range in RocksDB storage: "
<< status.errorMessage();
FATAL_ERROR_EXIT();
}
}
return {ExecutorState::DONE, NoStats{}, call.getSkipCount(), upstreamCall};
Expand Down
9 changes: 7 additions & 2 deletions arangod/Aql/SortExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "Aql/InputAqlItemRow.h"
#include "Aql/RegisterInfos.h"
#include "RestServer/RocksDBTempStorageFeature.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/Methods/RocksDBSstFileMethods.h"
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>

Expand Down Expand Up @@ -156,15 +158,15 @@ class SortExecutor {
void doSorting();
void consumeInput(AqlItemBlockInputRange& inputRange, ExecutorState& state);
void consumeInputForStorage();
rocksdb::Status deleteRangeInStorage();
Result deleteRangeInStorage();
Result ingestFilesForStorage();

private:
static constexpr size_t kMemoryLowerBound = 1024 * 1024;
bool _inputReady = false;
bool _mustStoreInput = false;
Infos& _infos;

AqlItemMatrix const* _input;
InputAqlItemRow _currentRow;

std::vector<AqlItemMatrix::RowIndex> _sortedIndexes;
Expand All @@ -175,8 +177,11 @@ class SortExecutor {
std::vector<SharedAqlItemBlockPtr> _inputBlocks;
std::vector<AqlItemMatrix::RowIndex> _rowIndexes;
std::unique_ptr<rocksdb::Iterator> _curIt;
RocksDBTempStorageFeature& _rocksDBfeature;
SharedAqlItemBlockPtr _curBlock = nullptr;
rocksdb::DB* _tempDB;
rocksdb::ColumnFamilyHandle* _cfHandle = nullptr;
std::unique_ptr<RocksDBSstFileMethods> _methods;
std::uint64_t const _keyPrefix;
std::string _lowerBoundPrefix;
rocksdb::WriteBatch _batch;
Expand Down
8 changes: 6 additions & 2 deletions arangod/RestServer/RocksDBTempStorageFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,15 @@ void RocksDBTempStorageFeature::start() {
std::string path(dataPath());
cleanUpTempStorageFiles(path);

RocksDBEngine& rocksDBEngine = (RocksDBEngine&)(engine);
RocksDBEngine& rocksDBEngine = static_cast<RocksDBEngine&>(engine);
_dbOptions = rocksDBEngine.rocksDBOptions();
_options.create_missing_column_families = true;
_options.create_if_missing = true;
_options.env = _dbOptions.env;
_options.env = rocksdb::Env::Default();

#ifdef USE_ENTERPRISE
rocksDBEngine.configureEnterpriseRocksDBOptions(_options, true);
#endif
_options.prefix_extractor.reset(
rocksdb::NewFixedPrefixTransform(sizeof(uint64_t)));

Expand Down
26 changes: 20 additions & 6 deletions arangod/RocksDBEngine/Methods/RocksDBSstFileMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,36 @@ RocksDBSstFileMethods::RocksDBSstFileMethods(
_isForeground(isForeground),
_rootDB(rootDB),
_trxColl(trxColl),
_ridx(ridx),
_ridx(&ridx),
_cf(ridx.columnFamily()),
_sstFileWriter(rocksdb::EnvOptions(dbOptions), dbOptions,
ridx.columnFamily()->GetComparator(), ridx.columnFamily()),
_cf(ridx.columnFamily()),
_idxPath(idxPath) {}

RocksDBSstFileMethods::RocksDBSstFileMethods(rocksdb::DB* rootDB,
rocksdb::ColumnFamilyHandle* cf,
rocksdb::Options const& dbOptions,
std::string const& idxPath)
: RocksDBMethods(),
_isForeground(false),
_rootDB(rootDB),
_trxColl(nullptr),
// _ridx(nullptr),
_cf(cf),
_sstFileWriter(rocksdb::EnvOptions(dbOptions), dbOptions,
_cf->GetComparator(), _cf),
_idxPath(idxPath) {}

RocksDBSstFileMethods::~RocksDBSstFileMethods() { cleanUpFiles(); }

void RocksDBSstFileMethods::insertEstimators() {
auto ops = _trxColl->stealTrackedIndexOperations();
if (!ops.empty()) {
TRI_ASSERT(_ridx.hasSelectivityEstimate() && ops.size() == 1);
TRI_ASSERT(_ridx->hasSelectivityEstimate() && ops.size() == 1);
auto it = ops.begin();
TRI_ASSERT(_ridx.id() == it->first);
TRI_ASSERT(_ridx->id() == it->first);

auto* estimator = _ridx.estimator();
auto* estimator = _ridx->estimator();
if (estimator != nullptr) {
if (_isForeground) {
estimator->insert(it->second.inserts);
Expand Down Expand Up @@ -102,7 +116,7 @@ rocksdb::Status RocksDBSstFileMethods::writeToFile() {
}
if (!res.ok()) {
cleanUpFiles();
} else {
} else if (_ridx.get() != nullptr) {
insertEstimators();
}
}
Expand Down
8 changes: 6 additions & 2 deletions arangod/RocksDBEngine/Methods/RocksDBSstFileMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class RocksDBSstFileMethods final : public RocksDBMethods {
RocksDBIndex& ridx,
rocksdb::Options const& dbOptions,
std::string const& idxPath);
explicit RocksDBSstFileMethods(rocksdb::DB* rootDB,
rocksdb::ColumnFamilyHandle* cf,
rocksdb::Options const& dbOptions,
std::string const& idxPath);
~RocksDBSstFileMethods();

rocksdb::Status Get(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&,
Expand Down Expand Up @@ -76,9 +80,9 @@ class RocksDBSstFileMethods final : public RocksDBMethods {
bool _isForeground;
rocksdb::DB* _rootDB;
RocksDBTransactionCollection* _trxColl;
RocksDBIndex& _ridx;
rocksdb::SstFileWriter _sstFileWriter;
std::unique_ptr<RocksDBIndex> _ridx;
rocksdb::ColumnFamilyHandle* _cf;
rocksdb::SstFileWriter _sstFileWriter;
std::string _idxPath;
std::vector<std::string> _sstFileNames;
std::vector<std::pair<std::string, std::string>> _keyValPairs;
Expand Down
1 change: 0 additions & 1 deletion arangod/RocksDBEngine/RocksDBBuilderIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
#include "StorageEngine/EngineSelectorFeature.h"
Expand Down
Loading
0