8000 [CINFRA-314] Follower Snapshot Retry (#15941) · meta-foundation/arangodb@ede77c9 · GitHub
[go: up one dir, main page]

Skip to content

Commit ede77c9

Browse files
author
Lars Maier
authored
[CINFRA-314] Follower Snapshot Retry (arangodb#15941)
* Implemented follower retry when snapshot transfer failed. * Fixing mac compile. * Adding inline keyword.
1 parent c50c64d commit ede77c9

File tree

6 files changed

+127
-21
lines changed

6 files changed

+127
-21
lines changed

arangod/Replication2/ReplicatedState/FollowerStateManager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ struct FollowerStateManager
7777
void checkSnapshot(std::shared_ptr<IReplicatedFollowerState<S>>);
7878
void tryTransferSnapshot(std::shared_ptr<IReplicatedFollowerState<S>>);
7979
void startService(std::shared_ptr<IReplicatedFollowerState<S>>);
80+
void retryTransferSnapshot(std::shared_ptr<IReplicatedFollowerState<S>>,
81+
std::uint64_t retryCount);
8082

8183
void applyEntries(std::unique_ptr<Iterator> iter) noexcept;
8284

@@ -92,6 +94,8 @@ struct FollowerStateManager
9294
FollowerInternalState::kUninitializedState};
9395
std::chrono::system_clock::time_point lastInternalStateChange;
9496
std::optional<LogRange> ingestionRange;
97+
std::optional<Result> lastError;
98+
std::uint64_t errorCounter{0};
9599

96100
// core will be nullptr as soon as the FollowerState was created
97101
std::unique_ptr<CoreType> core;
@@ -104,6 +108,7 @@ struct FollowerStateManager
104108
std::unique_ptr<ReplicatedStateToken> token);
105109
void updateInternalState(FollowerInternalState newState,
106110
std::optional<LogRange> range = std::nullopt);
111+
void updateInternalState(FollowerInternalState newState, Result);
107112
auto updateNextIndex(LogIndex nextWaitForIndex) -> DeferredAction;
108113
};
109114

arangod/Replication2/ReplicatedState/FollowerStateManager.tpp

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "Basics/debugging.h"
2828
#include "Basics/voc-errors.h"
2929
#include "Basics/Exceptions.h"
30+
#include "Scheduler/SchedulerFeature.h"
3031

3132
namespace arangodb::replication2::replicated_state {
3233

@@ -154,39 +155,91 @@ void FollowerStateManager<S>::tryTransferSnapshot(
154155
auto& leader = logFollower->getLeader();
155156
TRI_ASSERT(leader.has_value()) << "leader established it's leadership. There "
156157
"has to be a leader in the current term";
158+
159+
LOG_CTX("52a11", TRACE, loggerContext) << "try to acquire a new snapshot";
157160
auto f = hiddenState->acquireSnapshot(*leader, logFollower->getCommitIndex());
158161
std::move(f).thenFinal([weak = this->weak_from_this(), hiddenState](
159162
futures::Try<Result>&& tryResult) noexcept {
160163
auto self = weak.lock();
161164
if (self == nullptr) {
162165
return;
163166
}
164-
try {
165-
auto& result = tryResult.get();
166-
if (result.ok()) {
167-
LOG_CTX("44d58", DEBUG, self->loggerContext)
168-
<< "snapshot transfer successfully completed";
169-
170-
bool startService =
171-
self->_guardedData.doUnderLock([&](GuardedData& data) {
172-
if (data.token == nullptr) {
173-
return false;
174-
}
175-
data.token->snapshot.updateStatus(SnapshotStatus::kCompleted);
176-
return true;
177-
});
178-
if (startService) {
179-
self->startService(hiddenState);
180-
}
181-
return;
167+
168+
auto result = basics::catchToResult([&] { return tryResult.get(); });
169+
if (result.ok()) {
170+
LOG_CTX("44d58", DEBUG, self->loggerContext)
171+
<< "snapshot transfer successfully completed";
172+
173+
bool startService =
174+
self->_guardedData.doUnderLock([&](GuardedData& data) {
175+
if (data.token == nullptr) {
176+
return false;
177+
}
178+
data.token->snapshot.updateStatus(SnapshotStatus::kCompleted);
179+
return true;
180+
});
181+
if (startService) {
182+
self->startService(hiddenState);
182183
}
183-
} catch (...) {
184+
return;
185+
} else {
186+
LOG_CTX("9a68a", ERR, self->loggerContext)
187+
<< "failed to transfer snapshot: " << result.errorMessage()
188+
<< " - retry scheduled";
189+
190+
auto retryCount = self->_guardedData.doUnderLock([&](GuardedData& data) {
191+
data.updateInternalState(FollowerInternalState::kSnapshotTransferFailed,
192+
result);
193+
return data.errorCounter;
194+
});
195+
196+
self->retryTransferSnapshot(std::move(hiddenState), retryCount);
184197
}
185-
TRI_ASSERT(false) << "error handling not implemented";
186-
FATAL_ERROR_EXIT();
187198
});
188199
}
189200

201+
namespace {
202+
inline auto delayedFuture(std::chrono::steady_clock::duration duration)
203+
-> futures::Future<futures::Unit> {
204+
if (SchedulerFeature::SCHEDULER) {
205+
return SchedulerFeature::SCHEDULER->delay(duration);
206+
}
207+
208+
std::this_thread::sleep_for(duration);
209+
return futures::Future<futures::Unit>{std::in_place};
210+
}
211+
212+
inline auto calcRetryDuration(std::uint64_t retryCount)
213+
-> std::chrono::steady_clock::duration {
214+
// Capped exponential backoff. Wait for 100us, 200us, 400us, ...
215+
// until at most 100us * 2 ** 17 == 13.11s.
216+
auto executionDelay = std::chrono::microseconds{100} *
217+
(1u << std::min(retryCount, std::uint64_t{17}));
218+
return std::chrono::duration_cast<std::chrono::steady_clock::duration>(
219+
executionDelay);
220+
}
221+
} // namespace
222+
223+
template<typename S>
224+
void FollowerStateManager<S>::retryTransferSnapshot(
225+
std::shared_ptr<IReplicatedFollowerState<S>> hiddenState,
226+
std::uint64_t retryCount) {
227+
auto duration = calcRetryDuration(retryCount);
228+
LOG_CTX("2ea59", TRACE, loggerContext)
229+
<< "retry snapshot transfer after "
230+
<< std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()
231+
<< "ms";
232+
delayedFuture(duration).thenFinal(
233+
[weak = this->weak_from_this(), hiddenState](auto&&) {
234+
auto self = weak.lock();
235+
if (self == nullptr) {
236+
return;
237+
}
238+
239+
self->tryTransferSnapshot(hiddenState);
240+
});
241+
}
242+
190243
template<typename S>
191244
void FollowerStateManager<S>::checkSnapshot(
192245
std::shared_ptr<IReplicatedFollowerState<S>> hiddenState) {
@@ -339,6 +392,12 @@ auto FollowerStateManager<S>::getStatus() const -> StateStatus {
339392
status.managerState.detail = std::nullopt;
340393
status.generation = data.token->generation;
341394
status.snapshot = data.token->snapshot;
395+
396+
if (data.lastError.has_value()) {
397+
status.managerState.detail = basics::StringUtils::concatT(
398+
"Last error was: ", data.lastError->errorMessage());
399+
}
400+
342401
return StateStatus{.variant = std::move(status)};
343402
});
344403
}
@@ -392,6 +451,18 @@ void FollowerStateManager<S>::GuardedData::updateInternalState(
392451
internalState = newState;
393452
lastInternalStateChange = std::chrono::system_clock::now();
394453
ingestionRange = range;
454+
lastError.reset();
455+
errorCounter = 0;
456+
}
457+
458+
template<typename S>
459+
void FollowerStateManager<S>::GuardedData::updateInternalState(
460+
FollowerInternalState newState, Result error) {
461+
internalState = newState;
462+
lastInternalStateChange = std::chrono::system_clock::now();
463+
ingestionRange.reset();
464+
lastError.emplace(std::move(error));
465+
errorCounter += 1;
395466
}
396467

397468
template<typename S>

arangod/Replication2/ReplicatedState/StateStatus.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ inline constexpr std::string_view StringApplyRecentEntries =
5252
"ApplyRecentEntries";
5353
inline constexpr std::string_view StringUninitializedState =
5454
"UninitializedState";
55+
inline constexpr std::string_view StringSnapshotTransferFailed =
56+
"SnapshotTransferFailed";
5557

5658
inline constexpr auto StringRole = std::string_view{"role"};
5759
inline constexpr auto StringUnconfigured = std::string_view{"unconfigured"};
@@ -74,6 +76,8 @@ auto followerStateFromString(std::string_view str) -> FollowerInternalState {
7476
return FollowerInternalState::kNothingToApply;
7577
} else if (str == StringApplyRecentEntries) {
7678
return FollowerInternalState::kApplyRecentEntries;
79+
} else if (str == StringSnapshotTransferFailed) {
80+
return FollowerInternalState::kSnapshotTransferFailed;
7781
} else {
7882
THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_BAD_PARAMETER,
7983
"unknown follower internal state %*s",
@@ -131,6 +135,8 @@ auto replicated_state::to_string(FollowerInternalState state) noexcept
131135
return StringApplyRecentEntries;
132136
case FollowerInternalState::kUninitializedState:
133137
return StringUninitializedState;
138+
case FollowerInternalState::kSnapshotTransferFailed:
139+
return StringSnapshotTransferFailed;
134140
}
135141
TRI_ASSERT(false) << "invalid state value " << int(state);
136142
return "(unknown-internal-follower-state)";

arangod/Replication2/ReplicatedState/StateStatus.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ enum class FollowerInternalState {
7272
kTransferSnapshot,
7373
kNothingToApply,
7474
kApplyRecentEntries,
75+
kSnapshotTransferFailed,
7576
};
7677

7778
auto to_string(FollowerInternalState) noexcept -> std::string_view;

tests/Replication2/Mocks/FakeReplicatedState.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ struct AsyncOperationMarker {
114114
promise->setValue(std::move(res));
115115
}
116116

117+
auto resolveWithAndReset(Result res) {
118+
TRI_ASSERT(triggered);
119+
TRI_ASSERT(!promise->isFulfilled());
120+
auto p = std::move(promise);
121+
reset();
122+
std::move(p)->setValue(std::move(res));
123+
}
124+
117125
auto inspectValue() const -> Input const& { return *in; }
118126

119127
[[nodiscard]] auto wasTriggered() const noexcept -> bool { return triggered; }

tests/Replication2/ReplicatedState/FollowerSnapshotTest.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,21 @@ TEST_F(FollowerSnapshotTest, basic_follower_manager_test) {
103103
ASSERT_EQ(nullptr, manager->getFollowerState())
104104
<< "follower state should not be available yet";
105105

106+
// first trigger an error
107+
state->acquire.resolveWithAndReset(
108+
Result{TRI_ERROR_HTTP_SERVICE_UNAVAILABLE});
109+
110+
// we expect a retry
111+
{
112+
ASSERT_TRUE(state->acquire.wasTriggered())
113+
<< "expect snapshot to be requested";
114+
auto& value = state->acquire.inspectValue();
115+
EXPECT_EQ(value.first, "leader");
116+
EXPECT_EQ(value.second, LogIndex{0});
117+
}
118+
ASSERT_EQ(nullptr, manager->getFollowerState())
119+
<< "follower state should not be available yet";
120+
106121
// notify the manager that the state transfer was successfully completed
107122
state->acquire.resolveWith(Result{});
108123

0 commit comments

Comments
 (0)
0