8000 Try to avoid synchronous waiting in AQL, and fix transaction lifetime… · arangodb/arangodb@c787f51 · GitHub
[go: up one dir, main page]

Skip to content

Commit c787f51

Browse files
committed
Try to avoid synchronous waiting in AQL, and fix transaction lifetime issues
1 parent a7f69dd commit c787f51

File tree

4 files changed

+15
-15
lines changed

4 files changed

+15
-15
lines changed

arangod/Aql/SimpleModifier.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,6 @@ ExecutionState SimpleModifier<ModifierCompletion, Enable>::transact(
186186

187187
auto result = _completion.transact(trx, _accumulator.closeAndGetContents());
188188

189-
// we are currently waiting here for the `result` future to get
190-
// ready before we continue. this makes the AQL modification
191-
// operations blocking as in previous versions of ArangoDB.
192-
// TODO: fix this and make it truly non-blocking (requires to
193-
// fix some lifecycle issues for AQL queries first).
194-
result.wait();
195-
196189
if (result.isReady()) {
197190
_results = std::move(result.get());
198191
return ExecutionState::DONE;

arangod/Transaction/Methods.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3600,11 +3600,13 @@ Future<Result> Methods::replicateOperations(
36003600
// we continue with the operation, since most likely, the follower was
36013601
// simply dropped in the meantime.
36023602
// In any case, we drop the follower here (just in case).
3603-
auto cb = [=, this](std::vector<futures::Try<network::Response>>&& responses)
3603+
auto cb = [followerList, startTimeReplication, opName, collection, count,
3604+
vocbase = vocbase().getSharedPtr(), state = _state](
3605+
std::vector<futures::Try<network::Response>>&& responses)
36043606
-> futures::Future<Result> {
36053607
auto duration = std::chrono::steady_clock::now() - startTimeReplication;
36063608
auto& replMetrics =
3607-
vocbase().server().getFeature<ReplicationMetricsFeature>();
3609+
vocbase->server().getFeature<ReplicationMetricsFeature>();
36083610
replMetrics.synchronousOpsTotal() += 1;
36093611
replMetrics.synchronousTimeTotal() +=
36103612
std::chrono::nanoseconds(duration).count();
@@ -3642,8 +3644,7 @@ Future<Result> Methods::replicateOperations(
36423644
// follower, but simply return the error and abort our local
36433645
// transaction.
36443646
if (r.is(TRI_ERROR_TRANSACTION_ABORTED) &&
3645-
this->state()->hasHint(
3646-
transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) {
3647+
state->hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL)) {
36473648
return r;
36483649
}
36493650

@@ -3655,8 +3656,7 @@ Future<Result> Methods::replicateOperations(
36553656
absl::StrCat("got error from follower: ", r.errorMessage());
36563657

36573658
if (followerRefused) {
3658-
++vocbase()
3659-
.server()
3659+
++vocbase->server()
36603660
.getFeature<arangodb::ClusterFeature>()
36613661
.followersRefusedCounter();
36623662

@@ -3676,7 +3676,7 @@ Future<Result> Methods::replicateOperations(
36763676
}
36773677

36783678
if (!replicationFailureReason.empty()) {
3679-
if (!vocbase().server().isStopping()) {
3679+
if (!vocbase->server().isStopping()) {
36803680
LOG_TOPIC("12d8c", WARN, Logger::REPLICATION)
36813681
<< "synchronous replication of " << opName << " operation "
36823682
<< "(" << count << " doc(s)): "
@@ -3738,7 +3738,7 @@ Future<Result> Methods::replicateOperations(
37383738
return Result{TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED};
37393739
} else {
37403740
// execute a deferred intermediate commit, if required.
3741-
return performIntermediateCommitIfRequired(collection->id());
3741+
return state->performIntermediateCommitIfRequired(collection->id());
37423742
}
37433743
};
37443744
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));

arangod/VocBase/vocbase.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ void TRI_vocbase_t::release() noexcept {
129129
TRI_ASSERT(v >= 2);
130130
}
131131

132+
arangodb::VocbasePtr TRI_vocbase_t::getSharedPtr() noexcept {
133+
return VocbasePtr{use() ? this : nullptr};
134+
}
135+
132136
bool TRI_vocbase_t::isDangling() const noexcept {
133137
auto const v = _refCount.load(std::memory_order_acquire);
134138
TRI_ASSERT((v & 1) == 0 || !isSystem());

arangod/VocBase/vocbase.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "Containers/FlatHashMap.h"
4242
#include "Replication2/Version.h"
4343
#include "RestServer/arangod.h"
44+
#include "Utils/DatabaseGuard.h"
4445
#include "Utils/VersionTracker.h"
4546
#include "VocBase/Identifiers/DataSourceId.h"
4647
#include "VocBase/Identifiers/TransactionId.h"
@@ -293,6 +294,8 @@ struct TRI_vocbase_t {
293294
/// @brief decrease the reference counter for a database
294295
void release() noexcept;
295296

297+
arangodb::VocbasePtr getSharedPtr() noexcept;
298+
296299
/// @brief returns whether the database is dangling
297300
bool isDangling() const noexcept;
298301

0 commit comments

Comments
 (0)
0