8000 [CINFRA-161] Follower State Transfer (#15352) · strogo/arangodb@f9c484b · GitHub
[go: up one dir, main page]

Skip to content

Commit f9c484b

Browse files
Lars Maierapetencheagoedderz
authored
[CINFRA-161] Follower State Transfer (arangodb#15352)
* Added log core to leader and follower. * Rough sketch of snapshot transfer on followers. * Added ILogLeader and ILogFollower interfaces. * Optimized LogMultiplexer. Less copies of shared_ptrs. * Added FakeReplicatedState. * Fixing tests. * Preparations for FollowerState tests. * Renamed FakeFollower to FakeAbstractFollower. * Added new FakeFollower. * Moved interfaces to a separate header file. * Moved LeaderStateManager and FollowerStateManager into sep. files. * Completed first test run. * Fixing use of temporary after expression. * Code cleanup. * Added missing include for mac. * One more try for apple clang. * Dropping std::totally_ordered because it is not supported by Jenkins apple-clang, using typename instead. * Minor formatting * Applied clang-format * Reordered sources * Fixed merge conflict * Reformatted *.tpp files * Switched to ParticipantResignedException * Update arangod/Replication2/ReplicatedState/StateInterfaces.h Co-authored-by: Alex Petenchea <alexandru.petenchea@arangodb.com> * Use less special pointer. Co-authored-by: apetenchea <alexandru.petenchea@arangodb.com> Co-authored-by: Tobias Gödderz <tobias@arangodb.com>
1 parent ee773ca commit f9c484b

39 files changed

+1873
-628
lines changed

arangod/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,9 @@ set(LIB_ARANGO_REPLICATION2_SOURCES
559559
Replication2/ReplicatedLog/ReplicatedLogMetrics.cpp
560560
Replication2/ReplicatedLog/types.cpp
561561
Replication2/ReplicatedState/AgencySpecification.cpp
562+
Replication2/ReplicatedState/FollowerStateManager.tpp Replication2/ReplicatedState/FollowerStateManager.h
563+
Replication2/ReplicatedState/LeaderStateManager.tpp Replication2/ReplicatedState/LeaderStateManager.h
564+
Replication2/ReplicatedState/ReplicatedStateCore.cpp
562565
Replication2/ReplicatedState/ReplicatedStateFeature.cpp
563566
Replication2/ReplicatedState/StateCommon.cpp
564567
Replication2/ReplicatedState/StateStatus.cpp

arangod/Replication2/ReplicatedLog/ILogInterfaces.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ struct ILogParticipant {
9494
*/
9595
struct ILogFollower : ILogParticipant, AbstractFollower {
9696
virtual auto waitForLeaderAcked() -> WaitForFuture = 0;
97+
[[nodiscard]] virtual auto getLeader() const noexcept
98+
-> std::optional<ParticipantId> const& = 0;
9799
};
98100

99101
/**

arangod/Replication2/ReplicatedLog/InMemoryLog.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ struct InMemoryLog {
6666

6767
private:
6868
log_type _log{};
69-
LogIndex _first{0};
69+
LogIndex _first{1};
7070

7171
public:
72-
InMemoryLog() = delete;
72+
InMemoryLog() = default;
7373
explicit InMemoryLog(log_type log);
7474

7575
InMemoryLog(InMemoryLog&& other) noexcept;

arangod/Replication2/ReplicatedLog/LogFollower.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,11 @@ auto LogFollower::waitForLeaderAcked() -> WaitForFuture {
559559
return waitFor(LogIndex{1});
560560
}
561561

562+
auto LogFollower::getLeader() const noexcept
563+
-> std::optional<ParticipantId> const& {
564+
return _leaderId;
565+
}
566+
562567
auto LogFollower::getCommitIndex() const noexcept -> LogIndex {
563568
return _guardedFollowerData.getLockedGuard()->_commitIndex;
564569
}

arangod/Replication2/ReplicatedLog/LogFollower.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class LogFollower final : public ILogFollower,
6161
[[nodiscard]] auto getQuickStatus() const -> QuickLogStatus override;
6262
[[nodiscard]] auto
6363
resign() && -> std::tuple<std::unique_ptr<LogCore>, DeferredAction> override;
64+
[[nodiscard]] auto getLeader() const noexcept
65+
-> std::optional<ParticipantId> const& override;
6466

6567
[[nodiscard]] auto waitFor(LogIndex) -> WaitForFuture override;
6668
[[nodiscard]] auto waitForIterator(LogIndex index)
@@ -71,8 +73,8 @@ class LogFollower final : public ILogFollower,
7173
-> std::unique_ptr<LogIterator>;
7274
[[nodiscard]] auto getCommittedLogIterator(LogIndex firstIndex) const
7375
-> std::unique_ptr<LogIterator>;
74-
7576
[[nodiscard]] auto getCommitIndex() const noexcept -> LogIndex override;
77+
7678
[[nodiscard]] auto release(LogIndex doneWithIdx) -> Result override;
7779

7880
/// @brief Resolved when the leader has committed at least one entry.

arangod/Replication2/ReplicatedState/AbstractStateMachine.tpp

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,19 @@
3232
using namespace arangodb;
3333
using namespace arangodb::replication2;
3434

35-
template <typename T>
35+
template<typename T>
3636
auto replicated_state::AbstractStateMachine<T>::getIterator(LogIndex first)
3737
-> std::unique_ptr<LogIterator> {
3838
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
3939
}
4040

41-
template <typename T>
41+
template<typename T>
4242
auto replicated_state::AbstractStateMachine<T>::getEntry(LogIndex)
4343
-> std::optional<T> {
4444
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
4545
}
4646

47-
template <typename T>
47+
template<typename T>
4848
auto replicated_state::AbstractStateMachine<T>::insert(T const& v) -> LogIndex {
4949
velocypack::UInt8Buffer payload;
5050
{
@@ -54,21 +54,22 @@ auto replicated_state::AbstractStateMachine<T>::insert(T const& v) -> LogIndex {
5454
return log->getLeader()->insert(LogPayload(std::move(payload)));
5555
}
5656

57-
template <typename T>
58-
auto replication2::replicated_state::AbstractStateMachine<T>::waitFor(LogIndex idx)
59-
-> futures::Future<replicated_log::WaitForResult> {
57+
template<typename T>
58+
auto replication2::replicated_state::AbstractStateMachine<T>::waitFor(
59+
LogIndex idx) -> futures::Future<replicated_log::WaitForResult> {
6060
return log->getParticipant()->waitFor(idx);
6161
}
6262

63-
template <typename T>
63+
template<typename T>
6464
void replicated_state::AbstractStateMachine<T>::releaseIndex(LogIndex) {
6565
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
6666
}
6767

6868
namespace {
69-
template <typename T>
69+
template<typename T>
7070
struct DeserializeLogIterator : TypedLogRangeIterator<T> {
71-
explicit DeserializeLogIterator(std::unique_ptr<replication2::LogRangeIterator> base)
71+
explicit DeserializeLogIterator(
72+
std::unique_ptr<replication2::LogRangeIterator> base)
7273
: base(std::move(base)) {}
7374

7475
auto next() -> std::optional<T> override {
@@ -79,19 +80,17 @@ struct DeserializeLogIterator : TypedLogRangeIterator<T> {
7980
return std::nullopt;
8081
}
8182

82-
auto range() const noexcept -> LogRange override {
83-
return base->range();
84-
}
83+
auto range() const noexcept -> LogRange override { return base->range(); }
8584

8685
std::unique_ptr<replication2::LogRangeIterator> base;
8786
};
8887
} // namespace
8988

90-
template <typename T>
89+
template<typename T>
9190
auto replicated_state::AbstractStateMachine<T>::triggerPollEntries()
9291
-> futures::Future<Result> {
93-
auto nextIndex =
94-
_guardedData.doUnderLock([&](GuardedData& guard) -> std::optional<LogIndex> {
92+
auto nextIndex = _guardedData.doUnderLock(
93+
[&](GuardedData& guard) -> std::optional<LogIndex> {
9594
if (guard.pollOnGoing) {
9695
return std::nullopt;
9796
}
@@ -109,14 +108,16 @@ auto replicated_state::AbstractStateMachine<T>::triggerPollEntries()
109108
auto [from, to] = res->range(); // [from, to)
110109
TRI_ASSERT(from != to);
111110

112-
auto iter = std::make_unique<DeserializeLogIterator<T>>(std::move(res));
113-
return self->applyEntries(std::move(iter)).thenValue([self, to = to](Result&& result) {
114-
auto guard = self->_guardedData.getLockedGuard();
115-
guard->pollOnGoing = false;
116-
TRI_ASSERT(to > guard->nextIndex);
117-
guard->nextIndex = to;
118-
return std::move(result);
119-
});
111+
auto iter =
112+
std::make_unique<DeserializeLogIterator<T>>(std::move(res));
113+
return self->applyEntries(std::move(iter))
114+
.thenValue([self, to = to](Result&& result) {
115+
auto guard = self->_guardedData.getLockedGuard();
116+
guard->pollOnGoing = false;
117+
TRI_ASSERT(to > guard->nextIndex);
118+
guard->nextIndex = to;
119+
return std::move(result);
120+
});
120121
}
121122

122123
return futures::Future<Result>{TRI_ERROR_NO_ERROR};
@@ -126,7 +127,7 @@ auto replicated_state::AbstractStateMachine<T>::triggerPollEntries()
126127
return futures::Future<Result>{TRI_ERROR_NO_ERROR};
127128
}
128129

129-
template <typename T>
130+
template<typename T>
130131
replicated_state::AbstractStateMachine<T>::AbstractStateMachine(
131132
std::shared_ptr<replicated_log::ReplicatedLog> log)
132133
: log(std::move(log)) {}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
////////////////////////////////////////////////////////////////////////////////
2+
/// DISCLAIMER
3+
///
4+
/// Copyright 2021-2021 ArangoDB GmbH, Cologne, Germany
5+
///
6+
/// Licensed under the Apache License, Version 2.0 (the "License");
7+
/// you may not use this file except in compliance with the License.
8+
/// You may obtain a copy of the License at
9+
///
10+
/// http://www.apache.org/licenses/LICENSE-2.0
11+
///
12+
/// Unless required by applicable law or agreed to in writing, software
13+
/// distributed under the License is distributed on an "AS IS" BASIS,
14+
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
/// See the License for the specific language governing permissions and
16+
/// limitations under the License.
17+
///
18+
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
///
20+
/// @author Lars Maier
21+
////////////////////////////////////////////////////////////////////////////////
22+
23+
#pragma once
24+
25+
#include "Replication2/ReplicatedState/ReplicatedState.h"
26+
#include "Replication2/ReplicatedState/StateInterfaces.h"
27+
#include "Replication2/Streams/Streams.h"
28+
#include "Replication2/Streams/LogMultiplexer.h"
29+
30+
namespace arangodb::replication2::replicated_state {
31+
template<typename S>
32+
struct FollowerStateManager
33+
: ReplicatedState<S>::StateManagerBase,
34+
std::enable_shared_from_this<FollowerStateManager<S>> {
35+
using Factory = typename ReplicatedStateTraits<S>::FactoryType;
36+
using EntryType = typename ReplicatedStateTraits<S>::EntryType;
37+
using FollowerType = typename ReplicatedStateTraits<S>::FollowerType;
38+
using LeaderType = typename ReplicatedStateTraits<S>::LeaderType;
39+
40+
using Stream = streams::Stream<EntryType>;
41+
using Iterator = typename Stream::Iterator;
42+
43+
FollowerStateManager(
44+
std::shared_ptr<ReplicatedStateBase> parent,
45+
std::shared_ptr<replicated_log::ILogFollower> logFollower,
46+
std::unique_ptr<ReplicatedStateCore> core,
47+
std::shared_ptr<Factory> factory) noexcept;
48+
49+
void run();
50+
auto getStatus() const -> StateStatus final;
51+
auto getSnapshotStatus() const -> SnapshotStatus final;
52+
53+
auto getFollowerState() -> std::shared_ptr<IReplicatedFollowerState<S>>;
54+
55+
private:
56+
void awaitLeaderShip();
57+
void ingestLogData();
58+
void pollNewEntries();
59+
void checkSnapshot(std::shared_ptr<IReplicatedFollowerState<S>>);
60+
void tryTransferSnapshot(std::shared_ptr<IReplicatedFollowerState<S>>);
61+
void startService(std::shared_ptr<IReplicatedFollowerState<S>>);
62+
63+
void applyEntries(std::unique_ptr<Iterator> iter) noexcept;
64+
65+
using Demultiplexer = streams::LogDemultiplexer<ReplicatedStateStreamSpec<S>>;
66+
LogIndex nextEntry{1};
67+
68+
// TODO locking
69+
70+
std::shared_ptr<Stream> stream;
71+
std::shared_ptr<IReplicatedFollowerState<S>> state;
72+
std::weak_ptr<ReplicatedStateBase> parent;
73+
std::shared_ptr<replicated_log::ILogFollower> logFollower;
74+
75+
FollowerInternalState internalState{
76+
FollowerInternalState::kUninitializedState};
77+
std::chrono::system_clock::time_point lastInternalStateChange;
78+
std::optional<LogRange> ingestionRange;
79+
80+
std::unique_ptr<ReplicatedStateCore> core;
81+
std::shared_ptr<Factory> const factory;
82+
83+
private:
84+
void updateInternalState(FollowerInternalState newState,
85+
std::optional<LogRange> range = std::nullopt) {
86+
internalState = newState;
87+
lastInternalStateChange = std::chrono::system_clock::now();
88+
ingestionRange = range;
89+
}
90+
};
91+
} // namespace arangodb::replication2::replicated_state

0 commit comments

Comments
 (0)
0