10000 Bug-fix-3.8/make iterator usage safer after intermediate commits (#14… · arangodb/arangodb@e028d82 · GitHub
[go: up one dir, main page]

Skip to content

Commit e028d82

Browse files
mpoetergoedderzKVS85jsteemann
authored
Bug-fix-3.8/make iterator usage safer after intermediate commits (#14729)
* [3.8] Lower priority of AQL lanes (#14699) * Lower priority of AQL lanes * Added CHANGELOG entry * Improved comments Co-authored-by: Vadim <vadim@arangodb.com> * added a test for statistics behavior (#14703) * properly rename test file (#14705) * Refactor rocks db transaction state and rocks db methods (#14543) * make iterator usage safer after intermediate commits Co-authored-by: mpoeter <manuel@arangodb.com> * Fix build * Fix handling of old subqueries * Attempt to fix chaos load tests. * Fix assertion * Revert last two commits. * Next attempt to fix chaos load tests. * Fix: truncate must not trigger intermediate commits in streaming trx. * Update chaos-load tests * fix method signature * fix test compilation * Update CHANGELOG * Revert unintended changes. * Update arangod/RocksDBEngine/RocksDBTransactionMethods.cpp Co-authored-by: Tobias Gödderz <tobias@arangodb.com> Co-authored-by: Vadim <vadim@arangodb.com> Co-authored-by: Jan <jsteemann@users.noreply.github.com>
1 parent de9427c commit e028d82

File tree

118 files changed

+4140
-1755
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+4140
-1755
lines changed

CHANGELOG

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
v3.8.2 (XXXX-XX-XX)
22
-------------------
33

4+
* Fix internal iterator states after intermediate commits in write transactions.
5+
Iterators could point to invalid data after an intermediate commit, producing
6+
undefined behavior.
7+
8+
* Fix read-own-write behavior in different scenarios:
9+
- in some cases writes performed by an AQL query could be observed within
10+
the same query. This was not intended and is fixed now.
11+
- AQL queries in streaming transactions could observe their own writes in
12+
even more cases, which could potentially result in an endless loop when
13+
the query iterates over the same collection that it is inserting documents
14+
into.
15+
- UPSERT did not find documents inserted by a previous iteration if the
16+
subquery relied on a non-unique secondary index.
17+
- disabled intermediate commits for queries with UPSERTs, because
18+
intermediate commits can invalidate the internal read-own-write iterator
19+
required by UPSERT. Previously, UPSERTs that triggered intermediate
20+
commits could have produced unexpected results (e.g., previous inserts
21+
that have been committed might not be visible) or even crashes.
22+
To achieve the correct read-own-write behavior in streaming transactions, we
23+
sometimes have to copy the internal WriteBatch from the underlying RocksDB
24+
transaction. In particular, the copy is created whenever an AQL query with
25+
modification operations (INSERT/REMOVE/UPDATE/UPSERT/REPLACE) is executed in
26+
the streaming transaction. If there have not been any other modifications so
27+
far (queries/document operations), then the WriteBatch is empty and creating
28+
the copy is essentially a no-op. However, if the transaction already contains
29+
a lot of modifications, creating the WriteBatch copy might incur some
30+
overhead that can now lead to decreased performance.
31+
432
* Updated OpenSSL to 1.1.1l and OpenLDAP to 2.4.59.
533

634
* No runtime limits for shard move and server cleanout jobs, instead possibility

arangod/Aql/AqlTransaction.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "Aql/Collection.h"
2727
#include "Aql/Collections.h"
2828
#include "StorageEngine/TransactionState.h"
29+
#include "Transaction/Context.h"
2930
#include "Utils/CollectionNameResolver.h"
3031
#include "VocBase/LogicalCollection.h"
3132

@@ -54,7 +55,9 @@ AqlTransaction::AqlTransaction(
5455
std::shared_ptr<transaction::Context> const& transactionContext,
5556
transaction::Options const& options)
5657
: transaction::Methods(transactionContext, options) {
57-
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
58+
if (options.isIntermediateCommitEnabled()) {
59+
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
60+
}
5861
}
5962

6063
/// protected so we can create different subclasses
@@ -63,7 +66,10 @@ AqlTransaction::AqlTransaction(
6366
aql::Collections const& collections,
6467
transaction::Options const& options)
6568
: transaction::Methods(transactionContext, options) {
66-
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
69+
TRI_ASSERT(state() != nullptr);
70+
if (options.isIntermediateCommitEnabled()) {
71+
addHint(transaction::Hints::Hint::INTERMEDIATE_COMMITS);
72+
}
6773

6874
collections.visit([this](std::string const&, aql::Collection& collection) {
6975
Result res = processCollection(collection);

arangod/Aql/AqlTransaction.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,14 @@ class AqlTransaction : public transaction::Methods {
5353
std::unordered_set<std::string> inaccessibleCollections =
5454
std::unordered_set<std::string>());
5555

56-
/// @brief end the transaction
57-
~AqlTransaction() override = default;
58-
5956
AqlTransaction(std::shared_ptr<transaction::Context> const& transactionContext,
6057
transaction::Options const& options);
6158

6259
/// protected so we can create different subclasses
6360
AqlTransaction(std::shared_ptr<transaction::Context> const& transactionContext,
6461
aql::Collections const& collections,
6562
transaction::Options const& options);
66-
63+
6764
protected:
6865
/// @brief add a collection to the transaction
6966
Result processCollection(aql::Collection&);

arangod/Aql/Ast.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,8 @@ AstNode* Ast::createNodeUpsert(AstNodeType type, AstNode const* docVariable,
562562
node->addMember(createNodeReference(Variable::NAME_OLD));
563563
node->addMember(createNodeVariable(TRI_CHAR_LENGTH_PAIR(Variable::NAME_NEW), false));
564564

565+
this->setContainsUpsertNode();
566+
565567
return node;
566568
}
567569

@@ -4167,6 +4169,12 @@ void Ast::setContainsModificationNode() noexcept {
41674169
_containsModificationNode = true;
41684170
}
41694171

4172+
bool Ast::containsUpsertNode() const noexcept { return _containsUpsertNode; }
4173+
4174+
void Ast::setContainsUpsertNode() noexcept {
4175+
_containsUpsertNode = true;
4176+
}
4177+
41704178
void Ast::setContainsParallelNode() noexcept {
41714179
#ifdef USE_ENTERPRISE
41724180
_containsParallelNode = true;

arangod/Aql/Ast.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ class Ast {
127127
/// @brief whether or not query contains any modification operations
128128
bool containsModificationNode() const noexcept;
129129
void setContainsModificationNode() noexcept;
130+
bool containsUpsertNode() const noexcept;
131+
void setContainsUpsertNode() noexcept;
130132
void setContainsParallelNode() noexcept;
131133
bool willUseV8() const noexcept;
132134
void setWillUseV8() noexcept;
@@ -640,6 +642,8 @@ class Ast {
640642
/// @brief contains INSERT / UPDATE / REPLACE / REMOVE
641643
bool _containsModificationNode;
642644

645+
bool _containsUpsertNode{false};
646+
643647
/// @brief contains a parallel traversal
644648
bool _containsParallelNode;
645649

arangod/Aql/ClusterQuery.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "Aql/Timing.h"
3030
#include "Aql/QueryRegistry.h"
3131
#include "Aql/QueryProfile.h"
32+
#include "Basics/ScopeGuard.h"
3233
#include "Cluster/ServerState.h"
3334
#include "StorageEngine/TransactionState.h"
3435
#include "Transaction/Context.h"
@@ -142,6 +143,8 @@ void ClusterQuery::prepareClusterQuery(VPackSlice querySlice,
142143
TRI_ASSERT(_trx->state()->isCoordinator());
143144
QueryRegistryFeature::registry()->registerSnippets(_snippets);
144145
}
146+
147+
registerQueryInTransactionState();
145148
}
146149

147150
if (traverserSlice.isArray()) {
@@ -167,7 +170,7 @@ void ClusterQuery::prepareClusterQuery(VPackSlice querySlice,
167170
futures::Future<Result> ClusterQuery::finalizeClusterQuery(ErrorCode errorCode) {
168171
TRI_ASSERT(_trx);
169172
TRI_ASSERT(ServerState::instance()->isDBServer());
170-
173+
171174
// technically there is no need for this in DBServers, but it should
172175
// be good practice to prevent the other cleanup code from running
173176
ShutdownState exp = ShutdownState::None;
@@ -210,6 +213,8 @@ futures::Future<Result> ClusterQuery::finalizeClusterQuery(ErrorCode errorCode)
210213
_execStats.setExecutionTime(elapsedSince(_startTime));
211214
_shutdownState.store(ShutdownState::Done);
212215

216+
unregisterQueryInTransactionState();
217+
213218
LOG_TOPIC("5fde0", DEBUG, Logger::QUERIES)
214219
<< elapsedSince(_startTime)
215220
<< " ClusterQuery::finalizeClusterQuery: done"
@@ -218,4 +223,5 @@ futures::Future<Result> ClusterQuery::finalizeClusterQuery(ErrorCode errorCode)
218223
return res;
219224
});
220225
}
221-
226+
227+

arangod/Aql/DocumentProducingNode.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,15 @@ DocumentProducingNode::DocumentProducingNode(ExecutionPlan* plan,
6666
}
6767

6868
_count = arangodb::basics::VelocyPackHelper::getBooleanValue(slice, "count", false);
69+
_readOwnWrites = arangodb::basics::VelocyPackHelper::getBooleanValue(slice, "readOwnWrites", false) ? ReadOwnWrites::yes : ReadOwnWrites::no;
6970
}
7071

7172
void DocumentProducingNode::cloneInto(ExecutionPlan* plan, DocumentProducingNode& c) const {
7273
if (_filter != nullptr) {
7374
c.setFilter(std::unique_ptr<Expression>(_filter->clone(plan->getAst())));
7475
}
7576
c.copyCountFlag(this);
77+
c.setCanReadOwnWrites(canReadOwnWrites());
7678
}
7779

7880
void DocumentProducingNode::toVelocyPack(arangodb::velocypack::Builder& builder,
@@ -95,6 +97,7 @@ void DocumentProducingNode::toVelocyPack(arangodb::velocypack::Builder& builder,
9597
} else {
9698
builder.add(::producesResultKey, VPackValue(_filter != nullptr || dynamic_cast<ExecutionNode const*>(this)->isVarUsedLater(_outVariable)));
9799
}
100+
builder.add("readOwnWrites", VPackValue(_readOwnWrites == ReadOwnWrites::yes));
98101
}
99102

100103
Variable const* DocumentProducingNode::outVariable() const {

arangod/Aql/DocumentProducingNode.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ class DocumentProducingNode {
8181
/// @brief wheter or not the node can be used for counting
8282
bool doCount() const;
8383

84+
ReadOwnWrites canReadOwnWrites() const noexcept { return _readOwnWrites; }
85+
86+
void setCanReadOwnWrites(ReadOwnWrites v) noexcept { _readOwnWrites = v; }
87+
8488
protected:
8589
Variable const* _outVariable;
8690

@@ -91,6 +95,10 @@ class DocumentProducingNode {
9195
std::unique_ptr<Expression> _filter;
9296

9397
bool _count;
98+
99+
/// @brief Whether we should read our own writes performed by the current query.
100+
/// ATM this is only necessary for UPSERTS.
101+
ReadOwnWrites _readOwnWrites{ReadOwnWrites::no};
94102
};
95103

96104
} // namespace aql

arangod/Aql/EnumerateCollectionExecutor.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
#include "Aql/SingleRowFetcher.h"
4141
#include "Aql/Stats.h"
4242
#include "AqlCall.h"
43-
#include "Indexes/IndexIterator.h"
4443
#include "Transaction/Methods.h"
4544

4645
#include <Logger/LogMacros.h>
@@ -57,7 +56,7 @@ EnumerateCollectionExecutorInfos::EnumerateCollectionExecutorInfos(
5756
RegisterId outputRegister, aql::QueryContext& query,
5857
Collection const* collection, Variable const* outVariable, bool produceResult,
5958
Expression* filter, arangodb::aql::Projections projections,
60-
bool random, bool count)
59+
bool random, bool count, ReadOwnWrites readOwnWrites)
6160
: _query(query),
6261
_collection(collection),
6362
_outVariable(outVariable),
@@ -66,7 +65,8 @@ EnumerateCollectionExecutorInfos::EnumerateCollectionExecutorInfos(
6665
_outputRegisterId(outputRegister),
6766
_produceResult(produceResult),
6867
_random(random),
69-
_count(count) {}
68+
_count(count),
69+
_readOwnWrites(readOwnWrites) {}
7070

7171
Collection const* EnumerateCollectionExecutorInfos::getCollection() const {
7272
return _collection;
@@ -113,9 +113,10 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos
113113
_currentRow(InputAqlItemRow{CreateInvalidInputRowHint{}}) {
114114
TRI_ASSERT(_trx.status() == transaction::Status::RUNNING);
115115
_cursor = _trx.indexScan(_infos.getCollection()->name(),
116-
(_infos.getRandom()
117-
? transaction::Methods::CursorType::ANY
118-
: transaction::Methods::CursorType::ALL));
116+
(_infos.getRandom()
117+
? transaction::Methods::CursorType::ANY
118+
: transaction::Methods::CursorType::ALL),
119+
infos.canReadOwnWrites());
119120

120121
if (_infos.getProduceResult()) {
121122
_documentProducer =

arangod/Aql/EnumerateCollectionExecutor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class EnumerateCollectionExecutorInfos {
6868
Collection const* collection, Variable const* outVariable,
6969
bool produceResult, Expression* filter,
7070
arangodb::aql::Projections projections,
71-
bool random, bool count);
71+
bool random, bool count, ReadOwnWrites readOwnWrites);
7272

7373
EnumerateCollectionExecutorInfos() = delete;
7474
EnumerateCollectionExecutorInfos(EnumerateCollectionExecutorInfos&&) = default;
@@ -85,6 +85,7 @@ class EnumerateCollectionExecutorInfos {
8585
bool getCount() const noexcept;
8686
RegisterId getOutputRegisterId() const;
8787

88+
ReadOwnWrites ca 70B7 nReadOwnWrites() const noexcept { return _readOwnWrites; }
8889
private:
8990
aql::QueryContext& _query;
9091
Collection const* _collection;
@@ -95,6 +96,7 @@ class EnumerateCollectionExecutorInfos {
9596
bool _produceResult;
9697
bool _random;
9798
bool _count;
99+
ReadOwnWrites const _readOwnWrites;
98100
};
99101

100102
/**

0 commit comments

Comments
 (0)
0