8000 ReplicatedLog 2.0.1 Follow up PR (#14728) · arangodb/arangodb@b12ef28 · GitHub
[go: up one dir, main page]

Skip to content

Commit b12ef28

Browse files
author
Lars Maier
authored
ReplicatedLog 2.0.1 Follow up PR (#14728)
1 parent 51c71fe commit b12ef28

File tree

77 files changed

+4242
-695
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+4242
-695
lines changed

3rdParty/immer/v0.6.2/immer/transience/no_transience_policy.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct no_transience_policy
2626
struct owner
2727
{
2828
operator edit () const { return {}; }
29-
owner& operator=(const owner&) { return *this; };
29+
//owner& operator=(const owner&) { return *this; }; -- already fixed upstream
3030
};
3131

3232
struct ownee

arangod/Agency/Supervision.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2244,8 +2244,8 @@ void Supervision::checkReplicatedLogs() {
22442244
};
22452245

22462246
// check if Plan has replicated logs
2247-
auto const& node = snapshot().hasAsNode(planRepLogPrefix);
2248-
if (!node) {
2247+
auto const& planNode = snapshot().hasAsNode(planRepLogPrefix);
2248+
if (!planNode) {
22492249
return;
22502250
}
22512251

@@ -2267,20 +2267,29 @@ void Supervision::checkReplicatedLogs() {
22672267

22682268
auto builder = std::make_shared<Builder>();
22692269
auto envelope = arangodb::agency::envelope::into_builder(*builder);
2270-
for (auto const& [dbName, db] : node->get().children()) {
2270+
for (auto const& [dbName, db] : planNode->get().children()) {
22712271
for (auto const& [idString, node] : db->children()) {
22722272
auto spec = readPlanSpecification(*node);
2273-
auto current = std::invoke([&, &dbName = dbName, &idString = idString]() -> LogCurrent {
2273+
auto current = std::invoke([&, &dbName = dbName, &idString = idString]() -> std::optional<LogCurrent> {
22742274
using namespace cluster::paths;
22752275
auto currentPath =
22762276
aliases::current()
22772277
->replicatedLogs()
22782278
->database(dbName)
22792279
->log(idString)
22802280
->str(SkipComponents(1) /* skip first path component, i.e. 'arango' */);
2281-
return readLogCurrent(snapshot().get(currentPath)->get());
2281+
2282+
auto cnode = snapshot().get(currentPath);
2283+
if (cnode.has_value()) {
2284+
return readLogCurrent(cnode->get());
2285+
}
2286+
return std::nullopt;
22822287
});
2283-
auto newTermSpec = checkReplicatedLog(dbName, spec, current, info);
2288+
if (!current.has_value()) {
2289+
continue;
2290+
}
2291+
2292+
auto newTermSpec = checkReplicatedLog(dbName, spec, *current, info);
22842293

22852294
envelope = std::visit(
22862295
overload{[&, &dbName = dbName](LogPlanTermSpecification const& newSpec) {

arangod/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ set(LIB_ARANGO_REPLICATION2_SOURCES
566566
Replication2/ReplicatedLog/types.cpp
567567
Replication2/Version.cpp
568568
RestHandler/RestLogHandler.cpp
569-
)
569+
)
570570

571571
set (LIB_ARANGO_METRICS_SOURCES
572572
RestServer/Metrics.cpp

arangod/Replication2/AgencyMethods.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,28 @@
2020
/// @author Lars Maier
2121
////////////////////////////////////////////////////////////////////////////////
2222

23-
#include <chrono>
24-
2523
#include "AgencyMethods.h"
2624

27-
#include <Agency/AsyncAgencyComm.h>
28-
#include <Agency/TransactionBuilder.h>
29-
#include <Cluster/ClusterFeature.h>
25+
#include <cstdint>
26+
#include <chrono>
27+
#include <functional>
28+
#include <memory>
29+
#include <string>
30+
#include <utility>
31+
32+
#include <velocypack/velocypack-aliases.h>
33+
#include <velocypack/velocypack-common.h>
34+
35+
#include "Agency/AsyncAgencyComm.h"
36+
#include "Agency/TransactionBuilder.h"
37+
#include "Agency/AgencyPaths.h"
38+
#include "Cluster/ClusterTypes.h"
39+
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
40+
#include "Replication2/ReplicatedLog/LogCommon.h"
41+
42+
namespace arangodb {
43+
class Result;
44+
} // namespace arangodb
3045

3146
using namespace std::chrono_literals;
3247

arangod/Replication2/AgencyMethods.h

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,26 @@
2121
////////////////////////////////////////////////////////////////////////////////
2222

2323
#pragma once
24-
#include <optional>
25-
2624
#include <Agency/TransactionBuilder.h>
2725
#include <Basics/ResultT.h>
2826
#include <Cluster/ClusterTypes.h>
29-
3027
#include <Replication2/ReplicatedLog/AgencyLogSpecification.h>
28+
#include <optional>
29+
30+
#include "Futures/Future.h"
31+
32+
namespace arangodb {
33+
class Result;
34+
} // namespace arangodb
35+
namespace arangodb::replication2 {
36+
class LogId;
37+
struct LogTerm;
38+
} // namespace arangodb::replication2
39+
namespace arangodb::replication2::agency {
40+
struct LogCurrentSupervisionElection;
41+
struct LogPlanSpecification;
42+
struct LogPlanTermSpecification;
43+
} // namespace arangodb::replication2::agency
3144

3245
namespace arangodb::replication2::agency::methods {
3346

arangod/Replication2/LoggerContext.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ struct LogNameValuePair : LoggableValue {
5858
};
5959

6060
struct LoggerContext {
61-
explicit LoggerContext(LogTopic topic) : topic(std::move(topic)) {}
61+
explicit LoggerContext(LogTopic const& topic) : topic(topic) {}
6262

6363
template<const char N[], typename T>
6464
auto with(T&& t) const -> LoggerContext {
@@ -67,8 +67,8 @@ struct LoggerContext {
6767
return LoggerContext(values.push_back(std::move(pair)), topic);
6868
}
6969

70-
auto withTopic(LogTopic newTopic) const {
71-
return LoggerContext(values, std::move(newTopic));
70+
auto withTopic(LogTopic const& newTopic) const {
71+
return LoggerContext(values, newTopic);
7272
}
7373

7474
friend auto operator<<(std::ostream& os, LoggerContext const& ctx) -> std::ostream& {
@@ -85,12 +85,13 @@ struct LoggerContext {
8585
return os;
8686
}
8787

88-
LogTopic const topic;
89-
::immer::flex_vector<std::shared_ptr<LoggableValue>, arangodb::immer::arango_memory_policy> const values = {};
88+
using Container = ::immer::flex_vector<std::shared_ptr<LoggableValue>, arangodb::immer::arango_memory_policy>;
89+
LogTopic const& topic;
90+
Container const values = {};
9091

9192
private:
92-
LoggerContext(decltype(values) values, LogTopic topic)
93-
: topic(std::move(topic)), values(std::move(values)) {}
93+
LoggerContext(Container values, LogTopic const& topic)
94+
: topic(topic), values(std::move(values)) {}
9495
};
9596
}
9697

arangod/Replication2/ReplicatedLog/AgencyLogSpecification.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ auto LogPlanTermSpecification::toVelocyPack(VPackBuilder& builder) const -> void
5757
}
5858
}
5959

60-
LogPlanTermSpecification::LogPlanTermSpecification(from_velocypack_t, VPackSlice slice) {
61-
term = slice.get(StaticStrings::Term).extract<LogTerm>();
62-
config = LogConfig(slice.get(StaticStrings::Config));
60+
LogPlanTermSpecification::LogPlanTermSpecification(from_velocypack_t, VPackSlice slice)
61+
: term(slice.get(StaticStrings::Term).extract<LogTerm>()),
62+
config(slice.get(StaticStrings::Config)) {
6363
for (auto const& [key, value] :
6464
VPackObjectIterator(slice.get(StaticStrings::Participants))) {
6565
TRI_ASSERT(value.isEmptyObject());
@@ -82,9 +82,9 @@ auto LogPlanSpecification::toVelocyPack(VPackBuilder& builder) const -> void {
8282
}
8383
}
8484

85-
LogPlanSpecification::LogPlanSpecification(from_velocypack_t, VPackSlice slice) {
86-
id = slice.get(StaticStrings::Id).extract<LogId>();
87-
targetConfig = LogConfig(slice.get(StaticStrings::TargetConfig));
85+
LogPlanSpecification::LogPlanSpecification(from_velocypack_t, VPackSlice slice)
86+
: id(slice.get(StaticStrings::Id).extract<LogId>()),
87+
targetConfig(slice.get(StaticStrings::TargetConfig)) {
8888
if (auto term = slice.get(StaticStrings::CurrentTerm); !term.isNone()) {
8989
currentTerm = LogPlanTermSpecification{from_velocypack, term};
9090
}
@@ -99,7 +99,8 @@ LogPlanTermSpecification::LogPlanTermSpecification(LogTerm term, LogConfig confi
9999
participants(std::move(participants)) {}
100100

101101
LogPlanSpecification::LogPlanSpecification(LogId id, std::optional<LogPlanTermSpecification> term,
102-
LogConfig config) : id(id), currentTerm(std::move(term)), targetConfig(config) {}
102+
LogConfig config)
103+
: id(id), currentTerm(std::move(term)), targetConfig(config) {}
103104

104105
LogCurrentLocalState::LogCurrentLocalState(from_velocypack_t, VPackSlice slice) {
105106
auto spearheadSlice = slice.get(StaticStrings::Spearhead);
@@ -135,10 +136,11 @@ LogCurrentSupervision::LogCurrentSupervision(from_velocypack_t, VPackSlice slice
135136
}
136137
}
137138

138-
LogCurrentSupervisionElection::LogCurrentSupervisionElection(from_velocypack_t, VPackSlice slice) {
139-
term = slice.get(StaticStrings::Term).extract<LogTerm>();
140-
participantsRequired = slice.get("participantsRequired").getNumericValue<std::size_t>();
141-
participantsAvailable = slice.get("participantsAvailable").getNumericValue<std::size_t>();
139+
LogCurrentSupervisionElection::LogCurrentSupervisionElection(from_velocypack_t, VPackSlice slice)
140+
: term(slice.get(StaticStrings::Term).extract<LogTerm>()),
141+
participantsRequired(slice.get("participantsRequired").getNumericValue<std::size_t>()),
142+
participantsAvailable(
143+
slice.get("participantsAvailable").getNumericValue<std::size_t>()) {
142144
for (auto [key, value] : VPackObjectIterator(slice.get("details"))) {
143145
detail.emplace(key.copyString(), value.get("code").getNumericValue<ErrorCode>());
144146
}
@@ -172,21 +174,23 @@ auto LogCurrentSupervisionElection::toVelocyPack(VPackBuilder& builder) const ->
172174
builder.add("participantsAvailable", VPackValue(participantsAvailable));
173175
{
174176
VPackObjectBuilder db(&builder, "details");
175-
for (auto const&[server, error] : detail) {
177+
for (auto const& [server, error] : detail) {
176178
builder.add(VPackValue(server));
177179
::toVelocyPack(error, builder);
178180
}
179181
}
180182
}
181183

182-
auto agency::toVelocyPack(LogCurrentSupervisionElection::ErrorCode ec, VPackBuilder& builder) -> void {
184+
auto agency::toVelocyPack(LogCurrentSupervisionElection::ErrorCode ec,
185+
VPackBuilder& builder) -> void {
183186
VPackObjectBuilder ob(&builder);
184187
builder.add("code", VPackValue(static_cast<int>(ec)));
185188
builder.add("message", VPackValue(to_string(ec)));
186189
}
187190

188-
auto agency::to_string(LogCurrentSupervisionElection::ErrorCode ec) noexcept -> std::string_view {
189-
switch(ec) {
191+
auto agency::to_string(LogCurrentSupervisionElection::ErrorCode ec) noexcept
192+
-> std::string_view {
193+
switch (ec) {
190194
case LogCurrentSupervisionElection::ErrorCode::OK:
191195
return "the server is ok";
192196
case LogCurrentSupervisionElection::ErrorCode::SERVER_NOT_GOOD:

arangod/Replication2/ReplicatedLog/Algorithms.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,6 @@ auto algorithms::detectConflict(replicated_log::InMemoryLog const& log, TermInde
223223
TermIndexPair{lastEntry->entry().logTerm(),
224224
lastEntry->entry().logIndex() + 1});
225225
} else {
226-
// this can only happen if we drop log entries, check the code below before removing the assert
227-
TRI_ASSERT(false);
228226
TRI_ASSERT(prevLog.index < lastEntry->entry().logIndex());
229227
TRI_ASSERT(prevLog.index < log.getFirstEntry()->entry().logIndex());
230228
// the given index too old, reset to (0, 0)

arangod/Replication2/ReplicatedLog/Algorithms.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
#include <unordered_map>
2828

2929
#include "Cluster/ClusterTypes.h"
30-
#include "InMemoryLog.h"
31-
#include "ReplicatedLog.h"
30+
#include "Replication2/ReplicatedLog/ReplicatedLog.h"
3231
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
3332

3433
namespace arangodb::replication2::algorithms {

arangod/Replication2/ReplicatedLog/ILogParticipant.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "RestServer/Metrics.h"
2929

3030
#include <Basics/Exceptions.h>
31+
#include <Basics/StaticStrings.h>
3132

3233
using namespace arangodb;
3334
using namespace arangodb::replication2;
@@ -64,3 +65,23 @@ auto replicated_log::ILogParticipant::waitForIterator(LogIndex index)
6465
auto replicated_log::ILogParticipant::getTerm() const noexcept -> std::optional<LogTerm> {
6566
return getStatus().getCurrentTerm();
6667
}
68+
69+
auto replicated_log::LogUnconfiguredParticipant::release(LogIndex doneWithIdx) -> Result {
70+
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
71+
}
72+
73+
replicated_log::WaitForResult::WaitForResult(LogIndex index,
74+
std::shared_ptr<QuorumData const> quorum)
75+
: currentCommitIndex(index), quorum(std::move(quorum)) {}
76+
77+
void replicated_log::WaitForResult::toVelocyPack(velocypack::Builder& builder) const {
78+
VPackObjectBuilder ob(&builder);
79+
builder.add(StaticStrings::CommitIndex, VPackValue(currentCommitIndex));
80+
builder.add(VPackValue("quorum"));
81+
quorum->toVelocyPack(builder);
82+
}
83+
84+
replicated_log::WaitForResult::WaitForResult(velocypack::Slice s) {
85+
currentCommitIndex = s.get(StaticStrings::CommitIndex).extract<LogIndex>();
86+
quorum = std::make_shared<QuorumData>(s.get("quorum"));
87+
}

arangod/Replication2/ReplicatedLog/ILogParticipant.h

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,28 @@
3333
#include <map>
3434
#include <memory>
3535

36+
namespace arangodb {
37+
class Result;
38+
}
39+
3640
namespace arangodb::replication2::replicated_log {
3741

3842
struct LogCore;
3943
struct LogStatus;
4044

45+
struct WaitForResult {
46+
/// @brief contains the _current_ commit index. (Not the index waited for)
47+
LogIndex currentCommitIndex;
48+
/// @brief Quorum information
49+
std::shared_ptr<QuorumData const> quorum;
50+
51+
WaitForResult(LogIndex index, std::shared_ptr<QuorumData const> quorum);
52+
WaitForResult() = default;
53+
WaitForResult(velocypack::Slice);
54+
55+
void toVelocyPack(velocypack::Builder&) const;
56+
};
57+
4158
/**
4259
* @brief Interface for a log participant: That is, usually either a leader or a
4360
* follower (LogLeader and LogFollower). Can also be a LogUnconfiguredParticipant,
@@ -51,21 +68,23 @@ struct ILogParticipant {
5168
[[nodiscard]] virtual auto resign() &&
5269
-> std::tuple<std::unique_ptr<LogCore>, DeferredAction> = 0;
5370

54-
using WaitForPromise = futures::Promise<std::shared_ptr<QuorumData const>>;
55-
using WaitForFuture = futures::Future<std::shared_ptr<QuorumData const>>;
56-
using WaitForIteratorFuture = futures::Future<std::unique_ptr<LogIterator>>;
71+
using WaitForPromise = futures::Promise<WaitForResult>;
72+
using WaitForFuture = futures::Future<WaitForResult>;
73+
using WaitForIteratorFuture = futures::Future<std::unique_ptr<LogRangeIterator>>;
5774
using WaitForQueue = std::multimap<LogIndex, WaitForPromise>;
5875

5976
[[nodiscard]] virtual auto waitFor(LogIndex index) -> WaitForFuture = 0;
6077
[[nodiscard]] virtual auto waitForIterator(LogIndex index) -> WaitForIteratorFuture;
6178
[[nodiscard]] virtual auto getTerm() const noexcept -> std::optional<LogTerm>;
79+
80+
[[nodiscard]] virtual auto release(LogIndex doneWithIdx) -> Result = 0;
6281
};
6382

6483
/**
6584
* @brief Unconfigured log participant, i.e. currently neither a leader nor
6685
* follower. Holds a LogCore, does nothing else.
6786
*/
68-
struct LogUnconfiguredParticipant
87+
struct LogUnconfiguredParticipant final
6988
: std::enable_shared_from_this<LogUnconfiguredParticipant>,
7089
ILogParticipant {
7190
~LogUnconfiguredParticipant() override;
@@ -76,6 +95,7 @@ struct LogUnconfiguredParticipant
7695
auto resign() &&
7796
-> std::tuple<std::unique_ptr<LogCore>, DeferredAction> override;
7897
[[nodiscard]] auto waitFor(LogIndex) -> WaitForFuture override;
98+
[[nodiscard]] auto release(LogIndex doneWithIdx) -> Result override;
7999
private:
80100
std::unique_ptr<LogCore> _logCore;
81101
std::shared_ptr<ReplicatedLogMetrics> const _logMetrics;

0 commit comments

Comments
 (0)
0