8000 make iterator usage safer after intermediate commits by jsteemann · Pull Request #14346 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b0d7f00
make iterator usage safer after intermediate commits
jsteemann Jun 8, 2021
b61528e
more adjustments
jsteemann Jun 8, 2021
d79df00
more cleanup
jsteemann Jun 8, 2021
4ef5aa0
added CHANGELOG entry
jsteemann Jun 8, 2021
c727070
updated CHANGELOG
jsteemann Jun 8, 2021
670150d
Merge branch 'devel' of github.com:arangodb/arangodb into bug-fix/saf…
jsteemann Jun 10, 2021
b35fb33
revert most changes
jsteemann Jun 10, 2021
4ffe59b
Merge branch 'devel' of github.com:arangodb/arangodb into bug-fix/saf…
jsteemann Jun 14, 2021
174d1b1
Merge branch 'devel' of github.com:arangodb/arangodb into bug-fix/saf…
jsteemann Jun 14, 2021
0eb8ee8
fix issues with invalid iterator states
jsteemann Jun 14, 2021
aafea55
Merge branch 'devel' into bug-fix/safe-iterators-after-intermediate-c…
jsteemann Jul 7, 2021
777d07f
simplify PR
jsteemann Jul 7, 2021
9a48e3c
Merge remote-tracking branch 'origin' into bug-fix/safe-iterators-aft…
mpoeter Aug 19, 2021
a0f1596
Remove code duplication introduced by merge.
mpoeter Aug 19, 2021
f3f0dde
Merge remote-tracking branch 'origin/devel' into bug-fix/safe-iterato…
mpoeter Aug 19, 2021
201a018
Extend test suite
mpoeter Aug 19, 2021
18f0420
Introduce ReadOwnWrites flag to control what iterators/read operation…
mpoeter Aug 19, 2021
316ba3c
Introduce dontDisableIndexing operation option for UPSERT inserts.
mpoeter Aug 19, 2021
f92f765
Adapt unit tests to changed interface.
mpoeter Aug 19, 2021
3bbc8ee
Fix: iteratorMustCheckBounds must consider readOwnWrites flag.
mpoeter Aug 19, 2021
2e07cf3
Fix tests
mpoeter Aug 19, 2021
9138b6c
Fix read-own-write semantic for streaming transactions.
mpoeter Aug 19, 2021
6ccb926
Fix jslint errors
mpoeter Aug 20, 2021
ec31df3
Update CHANGELOG
mpoeter Aug 23, 2021
d99224b
Fix: recalc varUsage after inserting distribute input calculation.
mpoeter Aug 23, 2021
83b8e63
Disable intermediate commits for queries with UPSERTs
mpoeter Aug 23, 2021
79a17e2
Activate truncate operations in the chaos-load tests
mpoeter Aug 23, 2021
451b8cb
Update CHANGELOG
mpoeter Aug 23, 2021
5a08885
Fix non-maintainer build
mpoeter Aug 24, 2021
b819dc1
Reactivate intermediate commits in tests
mpoeter Aug 24, 2021
b6f1e58
Refactor disabling of intermediate commits in case of UPSERTs
mpoeter Aug 24, 2021
2fb8e5a
Merge remote-tracking branch 'origin/devel' into bug-fix/safe-iterato…
mpoeter Aug 24, 2021
0e4bc2b
Fix: only set INTERMEDIATE_COMMIT hint for follower trx if intermedia…
mpoeter Aug 24, 2021
8377b31
Fix: shell-iterator tests should not run in cluster.
mpoeter Aug 25, 2021
4e77316
Disable only tests that rely on unique secondary indexes.
mpoeter Aug 25, 2021
a60ff8d
Attempt to fix test based on edge index.
mpoeter Aug 25, 2021
124117b
Fix cluster tests
mpoeter Aug 25, 2021
70cfef9
Next attempt to fix cluster tests.
mpoeter Aug 27, 2021
082bc2a
Forgot some files.
mpoeter Aug 28, 2021
211fd86
Address review comments
mpoeter Aug 28, 2021
17fe27f
Address review comments and add more explanatory comments
mpoeter Aug 30, 2021
53aa2d6
Address review comments
mpoeter Aug 30, 2021
5820997
Merge remote-tracking branch 'origin/devel' into bug-fix/safe-iterato…
mpoeter Aug 30, 2021
0629da1
Merge remote-tracking branch 'origin/devel' into bug-fix/safe-iterato…
mpoeter Aug 30, 2021
d100fda
Merge branch 'devel' into bug-fix/safe-iterators-after-intermediate-c…
jsteemann Aug 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor disabling of intermediate commits in case of UPSERTs
  • Loading branch information
mpoeter committed Aug 24, 2021
commit b6f1e5822cbc6883c13e2a1373f802be66f9c46f
8 changes: 6 additions & 2 deletions arangod/Aql/AqlTransaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ AqlTransaction::AqlTransaction(
std::shared_ptr<transaction::Context> const& transactionContext,
transaction::Options const& options)
: transaction::Methods(transactionContext, options) {
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
if (options.isIntermediateCommitEnabled()) {
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
}
}

/// protected so we can create different subclasses
Expand All @@ -64,7 +66,9 @@ AqlTransaction::AqlTransaction(
aql::Collections const& collections,
transaction::Options const& options)
: transaction::Methods(transactionContext, options) {
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
if (options.isIntermediateCommitEnabled()) {
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
}

collections.visit([this](std::string const&, aql::Collection& collection) {
Result res = processCollection(collection);
Expand Down
8 changes: 8 additions & 0 deletions arangod/Aql/Ast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ AstNode* Ast::createNodeUpsert(AstNodeType type, AstNode const* docVariable,
node->addMember(createNodeReference(Variable::NAME_OLD));
node->addMember(createNodeVariable(TRI_CHAR_LENGTH_PAIR(Variable::NAME_NEW), false));

this->setContainsUpsertNode();

return node;
}

Expand Down Expand Up @@ -4184,6 +4186,12 @@ void Ast::setContainsModificationNode() noexcept {
_containsModificationNode = true;
}

bool Ast::containsUpsertNode() const noexcept { return _containsUpsertNode; }

void Ast::setContainsUpsertNode() noexcept {
_containsUpsertNode = true;
}

void Ast::setContainsParallelNode() noexcept {
#ifdef USE_ENTERPRISE
_containsParallelNode = true;
Expand Down
4 changes: 4 additions & 0 deletions arangod/Aql/Ast.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ class Ast {
/// @brief whether or not query contains any modification operations
bool containsModificationNode() const noexcept;
void setContainsModificationNode() noexcept;
bool containsUpsertNode() const noexcept;
void setContainsUpsertNode() noexcept;
void setContainsParallelNode() noexcept;
bool willUseV8() const noexcept;
void setWillUseV8() noexcept;
Expand Down Expand Up @@ -647,6 +649,8 @@ class Ast {
/// @brief contains INSERT / UPDATE / REPLACE / REMOVE
bool _containsModificationNode;

bool _containsUpsertNode{false};

/// @brief contains a parallel traversal
bool _containsParallelNode;

Expand Down
6 changes: 0 additions & 6 deletions arangod/Aql/ExecutionPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ class ExecutionPlan {
void enableAsyncPrefetching() noexcept { _isAsyncPrefetchEnabled = true; }

bool isAsyncPrefetchEnabled() const noexcept { return _isAsyncPrefetchEnabled; }

void disableIntermediateCommits() noexcept { _isIntermediateCommitAllowed = false; }

bool isIntermediateCommitAllowed() const noexcept { return _isIntermediateCommitAllowed; }

/// @brief get the node where variable with id <id> is introduced . . .
ExecutionNode* getVarSetBy(VariableId id) const {
Expand Down Expand Up @@ -420,8 +416,6 @@ class ExecutionPlan {
/// @brief flag to indicate whether the postprocessing step to enable async
/// prefetching on the node level should be executed.
bool _isAsyncPrefetchEnabled{false};

bool _isIntermediateCommitAllowed{true};

/// @brief current nesting level while building the plan
int _nestingLevel;
Expand Down
7 changes: 0 additions & 7 deletions arangod/Aql/OptimizerRules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7878,13 +7878,6 @@ void arangodb::aql::enableReadOwnWritesForUpsertSubquery(ExecutionPlan& plan) {
::arangodb::containers::SmallVector<ExecutionNode*> nodes{a};
plan.findNodesOfType(nodes, EN::UPSERT, true);

if (!nodes.empty()) {
// UPSERTs and intermediate commits do not play nice together, because the
// intermediate commit invalidates the read-own-write iterator required by
// the subquery, so we simply disable them.
plan.disableIntermediateCommits();
}

for (auto n : nodes) {
auto node = ExecutionNode::castTo<UpsertNode const*>(n);
auto inputVar = node->inDocVariable();
Expand Down
13 changes: 9 additions & 4 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,15 @@ std::unique_ptr<ExecutionPlan> Query::preparePlan() {
// put in bind parameters
parser.ast()->injectBindParameters(_bindParameters, this->resolver());

if (parser.ast()->containsUpsertNode()) {
// UPSERTs and intermediate commits do not play nice together, because the
// intermediate commit invalidates the read-own-write iterator required by
// the subquery. Setting intermediateCommitSize and intermediateCommitCount
// to UINT64_MAX allows us to disable intermediate commits.
_queryOptions.transactionOptions.intermediateCommitSize = UINT64_MAX;
_queryOptions.transactionOptions.intermediateCommitCount = UINT64_MAX;
}

TRI_ASSERT(_trx == nullptr);
// needs to be created after the AST collected all collections
std::unordered_set<std::string> inaccessibleCollections;
Expand Down Expand Up @@ -376,10 +385,6 @@ std::unique_ptr<ExecutionPlan> Query::preparePlan() {

TRI_ASSERT(plan != nullptr);

if (!plan->isIntermediateCommitAllowed()) {
_trx->state()->disableIntermediateCommits();
}

// return the V8 context if we are in one
exitV8Context();

Expand Down
8 changes: 0 additions & 8 deletions arangod/RocksDBEngine/Methods/RocksDBTrxMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ Result RocksDBTrxMethods::beginTransaction() {
return result;
}

void RocksDBTrxMethods::disableIntermediateCommits() {
TRI_ASSERT(!hasIntermediateCommitsEnabled());
if (_iteratorReadSnapshot) {
_db->ReleaseSnapshot(_iteratorReadSnapshot);
_iteratorReadSnapshot = nullptr;
}
}

rocksdb::ReadOptions RocksDBTrxMethods::iteratorReadOptions() const {
if (hasIntermediateCommitsEnabled()) {
rocksdb::ReadOptions ro = _readOptions;
Expand Down
2 changes: 0 additions & 2 deletions arangod/RocksDBEngine/Methods/RocksDBTrxMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class RocksDBTrxMethods : public RocksDBTrxBaseMethods {

Result beginTransaction() override;

void disableIntermediateCommits() override;

rocksdb::ReadOptions iteratorReadOptions() const override;

void prepareOperation(DataSourceId cid, RevisionId rid, TRI_voc_document_operation_e operationType) override;
Expand Down
2 changes: 0 additions & 2 deletions arangod/RocksDBEngine/RocksDBTransactionMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class RocksDBTransactionMethods : public RocksDBMethods {
virtual Result commitTransaction() = 0;

virtual Result abortTransaction() = 0;

virtual void disableIntermediateCommits() {}

// Only relevant for RocksDBTrxMethods
virtual Result checkIntermediateCommit(bool& hasPerformedIntermediateCommit) { return {}; }
Expand Down
7 changes: 0 additions & 7 deletions arangod/RocksDBEngine/RocksDBTransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,6 @@ Result RocksDBTransactionState::abortTransaction(transaction::Methods* activeTrx
return result;
}

void RocksDBTransactionState::disableIntermediateCommits() {
TransactionState::disableIntermediateCommits();
if (_rocksMethods) {
_rocksMethods->disableIntermediateCommits();
}
}

void RocksDBTransactionState::beginQuery(bool isModificationQuery) {
auto* trxMethods = dynamic_cast<RocksDBTrxMethods*>(_rocksMethods.get());
if (trxMethods) {
Expand Down
2 changes: 0 additions & 2 deletions arangod/RocksDBEngine/RocksDBTransactionState.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class RocksDBTransactionState final : public TransactionState {
/// @brief abort a transaction
Result abortTransaction(transaction::Methods* trx) override;

void disableIntermediateCommits() override;

/// @returns tick of last operation in a transaction
/// @note the value is guaranteed to be valid only after
/// transaction is committed
Expand Down
2 changes: 0 additions & 2 deletions arangod/StorageEngine/TransactionState.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ class TransactionState {
/// @brief whether or not a specific hint is set for the transaction
bool hasHint(transaction::Hints::Hint hint) const { return _hints.has(hint); }

virtual void disableIntermediateCommits() { _hints.unset(transaction::Hints::Hint::INTERMEDIATE_COMMITS); }

/// @brief begin a transaction
virtual arangodb::Result beginTransaction(transaction::Hints hints) = 0;

Expand Down
4 changes: 4 additions & 0 deletions arangod/Transaction/Options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ void Options::setLimits(uint64_t maxTransactionSize, uint64_t intermediateCommit
defaultIntermediateCommitCount = intermediateCommitCount;
}

bool Options::isIntermediateCommitEnabled() const noexcept {
return intermediateCommitSize != UINT64_MAX || intermediateCommitCount != UINT64_MAX;
}

void Options::fromVelocyPack(arangodb::velocypack::Slice const& slice) {
VPackSlice value;

Expand Down
2 changes: 2 additions & 0 deletions arangod/Transaction/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct Options {
static void adjustIntermediateCommitCount(Options& options);
#endif

bool isIntermediateCommitEnabled() const noexcept;

static constexpr double defaultLockTimeout = 900.0;
static uint64_t defaultMaxTransactionSize;
static uint64_t defaultIntermediateCommitSize;
Expand Down
0