File tree Expand file tree Collapse file tree 6 files changed +36
-0
lines changed Expand file tree Collapse file tree 6 files changed +36
-0
lines changed Original file line number Diff line number Diff line change @@ -432,6 +432,20 @@ arangodb::AsyncAgencyComm::FutureResult agencyAsyncSend(
432
432
433
433
namespace arangodb {
434
434
435
+ futures::Future<consensus::index_t > AsyncAgencyComm::getCurrentCommitIndex ()
436
+ const {
437
+ auto future = sendWithFailover (fuerte::RestVerb::Get, " /_api/agency/config" ,
438
+ 120s, RequestType::READ, {});
439
+ return std::move (future).thenValue ([](AsyncAgencyCommResult&& response) {
440
+ if (auto result = response.asResult (); result.fail ()) {
441
+ THROW_ARANGO_EXCEPTION (result);
442
+ }
443
+
444
+ auto slice = response.slice ();
445
+ return slice.get (" commitIndex" ).extract <consensus::index_t >();
446
+ });
447
+ }
448
+
435
449
AsyncAgencyComm::FutureResult AsyncAgencyComm::sendWithFailover (
436
450
arangodb::fuerte::RestVerb method, std::string const & url,
437
451
network::Timeout timeout, RequestType type, uint64_t index) const {
Original file line number Diff line number Diff line change 40
40
#include " Futures/Future.h"
41
41
#include " Network/Methods.h"
42
42
#include " Network/Utils.h"
43
+ #include " AgencyCommon.h"
43
44
44
45
namespace arangodb {
45
46
@@ -201,6 +202,9 @@ class AsyncAgencyComm final {
201
202
[[nodiscard]] FutureResult poll (network::Timeout timeout,
202
203
uint64_t index) const ;
203
204
205
+ [[nodiscard]] futures::Future<consensus::index_t > getCurrentCommitIndex ()
206
+ const ;
207
+
204
208
template <typename T>
205
209
[[nodiscard]] FutureResult setValue (
206
210
network::Timeout timeout,
Original file line number Diff line number Diff line change @@ -163,6 +163,12 @@ futures::Future<arangodb::Result> AgencyCache::waitFor(index_t index) {
163
163
->second .getFuture ();
164
164
}
165
165
166
+ futures::Future<Result> AgencyCache::waitForLatestCommitIndex () {
167
+ AsyncAgencyComm ac;
168
+ return ac.getCurrentCommitIndex ().thenValue (
169
+ [this ](consensus::index_t idx) { return this ->waitFor (idx); });
170
+ }
171
+
166
172
index_t AgencyCache::index () const {
167
173
std::shared_lock g (_storeLock);
168
174
return _commitIndex;
Original file line number Diff line number Diff line change @@ -117,6 +117,10 @@ class AgencyCache final : public ServerThread<ArangodServer> {
117
117
// / @brief Wait to be notified, when a Raft index has arrived.
118
118
[[nodiscard]] futures::Future<Result> waitFor (consensus::index_t index);
119
119
120
+ // / @brief Queries the agency for the latest commit index and waits for the
121
+ // / local cache to reach this index.
122
+ [[nodiscard]] futures::Future<Result> waitForLatestCommitIndex ();
123
+
120
124
// / @brief Cache has these path? AgencyCommHelper::path is prepended
121
125
bool has (std::string const & path) const ;
122
126
Original file line number Diff line number Diff line change @@ -289,6 +289,10 @@ auto methods::replaceReplicatedStateParticipant(
289
289
return std::move (precs).isEqual (*path->leader (),
290
290
*currentLeader);
291
291
})
292
+ .cond (not currentLeader.has_value (),
293
+ [&](auto && precs) {
294
+ return std::move (precs).isEmpty (*path->leader ());
295
+ })
292
296
.end ()
293
297
.done ();
294
298
}
Original file line number Diff line number Diff line change @@ -106,6 +106,10 @@ RestStatus RestLogHandler::handlePostRequest(
106
106
namespace paths = ::arangodb::cluster::paths;
107
107
auto & agencyCache =
108
108
_vocbase.server ().getFeature <ClusterFeature>().agencyCache ();
109
+ if (auto result = agencyCache.waitForLatestCommitIndex ().get ();
110
+ result.fail ()) {
111
+ THROW_ARANGO_EXCEPTION (result);
112
+ }
109
113
auto path = paths::aliases::target ()
110
114
->replicatedLogs ()
111
115
->database (_vocbase.name ())
You can’t perform that action at this time.
0 commit comments