|
27 | 27 | #include "Basics/debugging.h"
|
28 | 28 | #include "Basics/voc-errors.h"
|
29 | 29 | #include "Basics/Exceptions.h"
|
| 30 | +#include "Scheduler/SchedulerFeature.h" |
30 | 31 |
|
31 | 32 | namespace arangodb::replication2::replicated_state {
|
32 | 33 |
|
@@ -154,39 +155,91 @@ void FollowerStateManager<S>::tryTransferSnapshot(
|
154 | 155 | auto& leader = logFollower->getLeader();
|
155 | 156 | TRI_ASSERT(leader.has_value()) << "leader established it's leadership. There "
|
156 | 157 | "has to be a leader in the current term";
|
| 158 | + |
| 159 | + LOG_CTX("52a11", TRACE, loggerContext) << "try to acquire a new snapshot"; |
157 | 160 | auto f = hiddenState->acquireSnapshot(*leader, logFollower->getCommitIndex());
|
158 | 161 | std::move(f).thenFinal([weak = this->weak_from_this(), hiddenState](
|
159 | 162 | futures::Try<Result>&& tryResult) noexcept {
|
160 | 163 | auto self = weak.lock();
|
161 | 164 | if (self == nullptr) {
|
162 | 165 | return;
|
163 | 166 | }
|
164 |
| - try { |
165 |
| - auto& result = tryResult.get(); |
166 |
| - if (result.ok()) { |
167 |
| - LOG_CTX("44d58", DEBUG, self->loggerContext) |
168 |
| - << "snapshot transfer successfully completed"; |
169 |
| - |
170 |
| - bool startService = |
171 |
| - self->_guardedData.doUnderLock([&](GuardedData& data) { |
172 |
| - if (data.token == nullptr) { |
173 |
| - return false; |
174 |
| - } |
175 |
| - data.token->snapshot.updateStatus(SnapshotStatus::kCompleted); |
176 |
| - return true; |
177 |
| - }); |
178 |
| - if (startService) { |
179 |
| - self->startService(hiddenState); |
180 |
| - } |
181 |
| - return; |
| 167 | + |
| 168 | + auto result = basics::catchToResult([&] { return tryResult.get(); }); |
| 169 | + if (result.ok()) { |
| 170 | + LOG_CTX("44d58", DEBUG, self->loggerContext) |
| 171 | + << "snapshot transfer successfully completed"; |
| 172 | + |
| 173 | + bool startService = |
| 174 | + self->_guardedData.doUnderLock([&](GuardedData& data) { |
| 175 | + if (data.token == nullptr) { |
| 176 | + return false; |
| 177 | + } |
| 178 | + data.token->snapshot.updateStatus(SnapshotStatus::kCompleted); |
| 179 | + return true; |
| 180 | + }); |
| 181 | + if (startService) { |
| 182 | + self->startService(hiddenState); |
182 | 183 | }
|
183 |
| - } catch (...) { |
| 184 | + return; |
| 185 | + } else { |
| 186 | + LOG_CTX("9a68a", ERR, self->loggerContext) |
| 187 | + << "failed to transfer snapshot: " << result.errorMessage() |
| 188 | + << " - retry scheduled"; |
| 189 | + |
| 190 | + auto retryCount = self->_guardedData.doUnderLock([&](GuardedData& data) { |
| 191 | + data.updateInternalState(FollowerInternalState::kSnapshotTransferFailed, |
| 192 | + result); |
| 193 | + return data.errorCounter; |
| 194 | + }); |
| 195 | + |
| 196 | + self->retryTransferSnapshot(std::move(hiddenState), retryCount); |
184 | 197 | }
|
185 |
| - TRI_ASSERT(false) << "error handling not implemented"; |
186 |
| - FATAL_ERROR_EXIT(); |
187 | 198 | });
|
188 | 199 | }
|
189 | 200 |
|
| 201 | +namespace { |
| 202 | +inline auto delayedFuture(std::chrono::steady_clock::duration duration) |
| 203 | + -> futures::Future<futures::Unit> { |
| 204 | + if (SchedulerFeature::SCHEDULER) { |
| 205 | + return SchedulerFeature::SCHEDULER->delay(duration); |
| 206 | + } |
| 207 | + |
| 208 | + std::this_thread::sleep_for(duration); |
| 209 | + return futures::Future<futures::Unit>{std::in_place}; |
| 210 | +} |
| 211 | + |
| 212 | +inline auto calcRetryDuration(std::uint64_t retryCount) |
| 213 | + -> std::chrono::steady_clock::duration { |
| 214 | + // Capped exponential backoff. Wait for 100us, 200us, 400us, ... |
| 215 | + // until at most 100us * 2 ** 17 == 13.11s. |
| 216 | + auto executionDelay = std::chrono::microseconds{100} * |
| 217 | + (1u << std::min(retryCount, std::uint64_t{17})); |
| 218 | + return std::chrono::duration_cast<std::chrono::steady_clock::duration>( |
| 219 | + executionDelay); |
| 220 | +} |
| 221 | +} // namespace |
| 222 | + |
| 223 | +template<typename S> |
| 224 | +void FollowerStateManager<S>::retryTransferSnapshot( |
| 225 | + std::shared_ptr<IReplicatedFollowerState<S>> hiddenState, |
| 226 | + std::uint64_t retryCount) { |
| 227 | + auto duration = calcRetryDuration(retryCount); |
| 228 | + LOG_CTX("2ea59", TRACE, loggerContext) |
| 229 | + << "retry snapshot transfer after " |
| 230 | + << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() |
| 231 | + << "ms"; |
| 232 | + delayedFuture(duration).thenFinal( |
| 233 | + [weak = this->weak_from_this(), hiddenState](auto&&) { |
| 234 | + auto self = weak.lock(); |
| 235 | + if (self == nullptr) { |
| 236 | + return; |
| 237 | + } |
| 238 | + |
| 239 | + self->tryTransferSnapshot(hiddenState); |
| 240 | + }); |
| 241 | +} |
| 242 | + |
190 | 243 | template<typename S>
|
191 | 244 | void FollowerStateManager<S>::checkSnapshot(
|
192 | 245 | std::shared_ptr<IReplicatedFollowerState<S>> hiddenState) {
|
@@ -339,6 +392,12 @@ auto FollowerStateManager<S>::getStatus() const -> StateStatus {
|
339 | 392 | status.managerState.detail = std::nullopt;
|
340 | 393 | status.generation = data.token->generation;
|
341 | 394 | status.snapshot = data.token->snapshot;
|
| 395 | + |
| 396 | + if (data.lastError.has_value()) { |
| 397 | + status.managerState.detail = basics::StringUtils::concatT( |
| 398 | + "Last error was: ", data.lastError->errorMessage()); |
| 399 | + } |
| 400 | + |
342 | 401 | return StateStatus{.variant = std::move(status)};
|
343 | 402 | });
|
344 | 403 | }
|
@@ -392,6 +451,18 @@ void FollowerStateManager<S>::GuardedData::updateInternalState(
|
392 | 451 | internalState = newState;
|
393 | 452 | lastInternalStateChange = std::chrono::system_clock::now();
|
394 | 453 | ingestionRange = range;
|
| 454 | + lastError.reset(); |
| 455 | + errorCounter = 0; |
| 456 | +} |
| 457 | + |
| 458 | +template<typename S> |
| 459 | +void FollowerStateManager<S>::GuardedData::updateInternalState( |
| 460 | + FollowerInternalState newState, Result error) { |
| 461 | + internalState = newState; |
| 462 | + lastInternalStateChange = std::chrono::system_clock::now(); |
| 463 | + ingestionRange.reset(); |
| 464 | + lastError.emplace(std::move(error)); |
| 465 | + errorCounter += 1; |
395 | 466 | }
|
396 | 467 |
|
397 | 468 | template<typename S>
|
|
0 commit comments