8000 [CINFRA] Replace Participant Race with Agency Cache (#17920) · rohankumardubey/arangodb@8491f69 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8491f69

Browse files
author
Lars Maier
authored
[CINFRA] Replace Participant Race with Agency Cache (arangodb#17920)
* Wait for agency cache to be upto date, before replacing participant. * Applied review comments.
1 parent b306bbe commit 8491f69

File tree

6 files changed

+36
-0
lines changed

6 files changed

+36
-0
lines changed

arangod/Agency/AsyncAgencyComm.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,20 @@ arangodb::AsyncAgencyComm::FutureResult agencyAsyncSend(
432432

433433
namespace arangodb {
434434

435+
futures::Future<consensus::index_t> AsyncAgencyComm::getCurrentCommitIndex()
436+
const {
437+
auto future = sendWithFailover(fuerte::RestVerb::Get, "/_api/agency/config",
438+
120s, RequestType::READ, {});
439+
return std::move(future).thenValue([](AsyncAgencyCommResult&& response) {
440+
if (auto result = response.asResult(); result.fail()) {
441+
THROW_ARANGO_EXCEPTION(result);
442+
}
443+
444+
auto slice = response.slice();
445+
return slice.get("commitIndex").extract<consensus::index_t>();
446+
});
447+
}
448+
435449
AsyncAgencyComm::FutureResult AsyncAgencyComm::sendWithFailover(
436450
arangodb::fuerte::RestVerb method, std::string const& url,
437451
network::Timeout timeout, RequestType type, uint64_t index) const {

arangod/Agency/AsyncAgencyComm.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "Futures/Future.h"
4141
#include "Network/Methods.h"
4242
#include "Network/Utils.h"
43+
#include "AgencyCommon.h"
4344

4445
namespace arangodb {
4546

@@ -201,6 +202,9 @@ class AsyncAgencyComm final {
201202
[[nodiscard]] FutureResult poll(network::Timeout timeout,
202203
uint64_t index) const;
203204

205+
[[nodiscard]] futures::Future<consensus::index_t> getCurrentCommitIndex()
206+
const;
207+
204208
template<typename T>
205209
[[nodiscard]] FutureResult setValue(
206210
network::Timeout timeout,

arangod/Cluster/AgencyCache.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,12 @@ futures::Future<arangodb::Result> AgencyCache::waitFor(index_t index) {
163163
->second.getFuture();
164164
}
165165

166+
futures::Future<Result> AgencyCache::waitForLatestCommitIndex() {
167+
AsyncAgencyComm ac;
168+
return ac.getCurrentCommitIndex().thenValue(
169+
[this](consensus::index_t idx) { return this->waitFor(idx); });
170+
}
171+
166172
index_t AgencyCache::index() const {
167173
std::shared_lock g(_storeLock);
168174
return _commitIndex;

arangod/Cluster/AgencyCache.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ class AgencyCache final : public ServerThread<ArangodServer> {
117117
/// @brief Wait to be notified, when a Raft index has arrived.
118118
[[nodiscard]] futures::Future<Result> waitFor(consensus::index_t index);
119119

120+
/// @brief Queries the agency for the latest commit index and waits for the
121+
/// local cache to reach this index.
122+
[[nodiscard]] futures::Future<Result> waitForLatestCommitIndex();
123+
120124
/// @brief Cache has these path? AgencyCommHelper::path is prepended
121125
bool has(std::string const& path) const;
122126

arangod/Replication2/AgencyMethods.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,10 @@ auto methods::replaceReplicatedStateParticipant(
289289
return std::move(precs).isEqual(*path->leader(),
290290
*currentLeader);
291291
})
292+
.cond(not currentLeader.has_value(),
293+
[&](auto&& precs) {
294+
return std::move(precs).isEmpty(*path->leader());
295+
})
292296
.end()
293297
.done();
294298
}

arangod/RestHandler/RestLogHandler.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ RestStatus RestLogHandler::handlePostRequest(
106106
namespace paths = ::arangodb::cluster::paths;
107107
auto& agencyCache =
108108
_vocbase.server().getFeature<ClusterFeature>().agencyCache();
109+
if (auto result = agencyCache.waitForLatestCommitIndex().get();
110+
result.fail()) {
111+
THROW_ARANGO_EXCEPTION(result);
112+
}
109113
auto path = paths::aliases::target()
110114
->replicatedLogs()
111115
->database(_vocbase.name())

0 commit comments

Comments
 (0)
0