8000 [R2] Log Multiplexer (#14667) · arangodb/arangodb@7fe5b5d · GitHub
[go: up one dir, main page]

Skip to content

Commit 7fe5b5d

Browse files
author
Lars Maier
authored
[R2] Log Multiplexer (#14667)
* First version of LogMultiplexer. * Rewrote Demultiplexer. Now implementing multiplexer. * Added .tpp file. * More multiplexer stuff. * Working Multiplexer and Demultiplexer. * Code clean up. * More code clean up. * Code cleanup. * Separated MultiplexedValue serialization from Multiplexer. * More code clean up+ added multiplexed value deserializer. * More code cleanup. * Fixing recursive waitFor calls. * More tests for multiplexer. * More tests for demultiplexer. * Clean up testing code. * Added test that produces a StackOverflow!!! * Fixed one stack overflow, got another. * Fixed more tests. Still stack overflow. This is a LogLeader problem. * Use async mock log only when required. * Fix resolvePromise code wth bug in clang. * More tests. * Prepare for common code extraction. * Extracted common code. * Implemented releaseInternal for streams. * Renamed commitIndex to currentCommitIndex in waitForResult.
1 parent b2d1c7a commit 7fe5b5d

38 files changed

+2344
-311
lines changed

arangod/Agency/Supervision.cpp

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,22 @@
2323

2424
#include "Supervision.h"
2525

26-
#include <Basics/StringUtils.h>
27-
#include <Basics/overload.h>
28-
#include <thread>
26+
#include <cstddef>
27+
#include <algorithm>
28+
#include <cstdint>
29+
#include <exception>
30+
#include <iosfwd>
31+
#include <optional>
32+
#include <type_traits>
33+
#include <unordered_map>
34+
#include <unordered_set>
35+
#include <utility>
36+
#include <variant>
37+
38+
#include <velocypack/Builder.h>
39+
#include <velocypack/Iterator.h>
40+
#include <velocypack/Slice.h>
41+
#include <velocypack/velocypack-common.h>
2942

3043
#include "Agency/ActiveFailoverJob.h"
3144
#include "Agency/AddFollower.h"
@@ -48,6 +61,22 @@
4861
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
4962
#include "Replication2/ReplicatedLog/Algorithms.h"
5063
#include "StorageEngine/HealthData.h"
64+
#include "Agency/AgencyStrings.h"
65+
#include "Agency/AgentConfiguration.h"
66+
#include "Agency/AgentInterface.h"
67+
#include "Agency/PathComponent.h"
68+
#include "Agency/TimeString.h"
69+
#include "Agency/TransactionBuilder.h"
70+
#include "Basics/Result.h"
71+
#include "Basics/VelocyPackHelper.h"
72+
#include "Basics/debugging.h"
73+
#include "Basics/overload.h"
74+
#include "Logger/LogMacros.h"
75+
#include "Logger/Logger.h"
76+
#include "Logger/LoggerStream.h"
77+
#include "Replication2/ReplicatedLog/LogCommon.h"
78+
#include "RestServer/Metrics.h"
79+
#include "RestServer/MetricsFeature.h"
5180

5281
using namespace arangodb;
5382
using namespace arangodb::consensus;
@@ -2282,20 +2311,21 @@ void Supervision::checkReplicatedLogs() {
22822311
});
22832312
auto newTermSpec = checkReplicatedLog(dbName, spec, current, info);
22842313

2285-
envelope = std::visit(
2286-
overload{[&, &dbName = dbName](LogPlanTermSpecification const& newSpec) {
2287-
return arangodb::replication2::agency::methods::updateTermSpecificationTrx(
2288-
std::move(envelope), dbName, spec.id, newSpec,
2289-
spec.currentTerm->term);
2290-
},
2291-
[&, &dbName = dbName](LogCurrentSupervisionElection const& newElection) {
2292-
return arangodb::replication2::agency::methods::updateElectionResult(
2293-
std::move(envelope), dbName, spec.id, newElection);
2294-
},
2295-
[&](std::monostate const&) {
2296-
return std::move(envelope); // do nothing
2297-
}},
2298-
newTermSpec);
2314+
envelope =
2315+
std::visit(arangodb::overload{
2316+
[&, &dbName = dbName](LogPlanTermSpecification const& newSpec) {
2317+
return arangodb::replication2::agency::methods::updateTermSpecificationTrx(
2318+
std::move(envelope), dbName, spec.id, newSpec,
2319+
spec.currentTerm->term);
2320+
},
2321+
[&, &dbName = dbName](LogCurrentSupervisionElection const& newElection) {
2322+
return arangodb::replication2::agency::methods::updateElectionResult(
2323+
std::move(envelope), dbName, spec.id, newElection);
2324+
},
2325+
[&](std::monostate const&) {
2326+
return std::move(envelope); // do nothing
2327+
}},
2328+
newTermSpec);
22992329
}
23002330
}
23012331

arangod/Agency/Supervision.h

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,36 @@
2323

2424
#pragma once
2525

26+
#include <atomic>
27+
#include <chrono>
28+
#include <cstdint>
29+
#include <functional>
30+
#include <memory>
31+
#include <string>
32+
#include <vector>
33+
2634
#include "Agency/AgencyCommon.h"
2735
#include "Agency/AgentInterface.h"
36+
#include "Agency/Node.h"
2837
#include "Agency/Store.h"
2938
#include "Agency/TimeString.h"
3039
#include "Basics/ConditionVariable.h"
3140
#include "Basics/Mutex.h"
3241
#include "Basics/Thread.h"
42+
#include "Cluster/ClusterTypes.h"
3343
#include "RestServer/MetricsFeature.h"
3444

35-
namespace arangodb {
36-
namespace consensus {
45+
class Counter;
46+
namespace arangodb::application_features {
47+
class ApplicationServer;
48+
} // namespace arangodb::application_features
49+
namespace arangodb::consensus {
50+
class AgentInterface;
51+
} // namespace arangodb::consensus
52+
template <typename Scale> class Histogram;
53+
template <typename T> struct log_scale_t;
54+
55+
namespace arangodb::consensus {
3756

3857
class Agent;
3958

@@ -328,6 +347,5 @@ class Supervision : public arangodb::Thread {
328347
*/
329348
query_t removeTransactionBuilder(std::vector<std::string> const&);
330349

331-
} // namespace consensus
332350
} // namespace arangodb
333351

arangod/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ set(LIB_ARANGO_REPLICATION2_SOURCES
566566
Replication2/ReplicatedLog/types.cpp
567567
Replication2/Version.cpp
568568
RestHandler/RestLogHandler.cpp
569-
Replication2/ReplicatedState/AbstractStateMachine.tpp Replication2/ReplicatedState/AbstractStateMachine.h)
569+
Replication2/ReplicatedState/AbstractStateMachine.tpp Replication2/ReplicatedState/AbstractStateMachine.h Replication2/Streams/LogMultiplexer.tpp Replication2/Streams/LogMultiplexer.h Replication2/Streams/StreamSpecification.h Replication2/Streams/Streams.h Replication2/Streams/MultiplexedValues.h Replication2/Streams/StreamInformationBlock.h)
570570

571571
set (LIB_ARANGO_METRICS_SOURCES
572572
RestServer/Metrics.cpp

arangod/Replication2/AgencyMethods.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,28 @@
2020
/// @author Lars Maier
2121
////////////////////////////////////////////////////////////////////////////////
2222

23-
#include <chrono>
24-
2523
#include "AgencyMethods.h"
2624

27-
#include <Agency/AsyncAgencyComm.h>
28-
#include <Agency/TransactionBuilder.h>
29-
#include <Cluster/ClusterFeature.h>
25+
#include <cstdint>
26+
#include <chrono>
27+
#include <functional>
28+
#include <memory>
29+
#include <string>
30+
#include <utility>
31+
32+
#include <velocypack/velocypack-aliases.h>
33+
#include <velocypack/velocypack-common.h>
34+
35+
#include "Agency/AsyncAgencyComm.h"
36+
#include "Agency/TransactionBuilder.h"
37+
#include "Agency/AgencyPaths.h"
38+
#include "Cluster/ClusterTypes.h"
39+
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
40+
#include "Replication2/ReplicatedLog/LogCommon.h"
41+
42+
namespace arangodb {
43+
class Result;
44+
} // namespace arangodb
3045

3146
using namespace std::chrono_literals;
3247

arangod/Replication2/AgencyMethods.h

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,26 @@
2121
////////////////////////////////////////////////////////////////////////////////
2222

2323
#pragma once
24-
#include <optional>
25-
2624
#include <Agency/TransactionBuilder.h>
2725
#include <Basics/ResultT.h>
2826
#include <Cluster/ClusterTypes.h>
29-
3027
#include <Replication2/ReplicatedLog/AgencyLogSpecification.h>
28+
#include <optional>
29+
30+
#include "Futures/Future.h"
31+
32+
namespace arangodb {
33+
class Result;
34+
} // namespace arangodb
35+
namespace arangodb::replication2 {
36+
class LogId;
37+
struct LogTerm;
38+
} // namespace arangodb::replication2
39+
namespace arangodb::replication2::agency {
40+
struct LogCurrentSupervisionElection;
41+
struct LogPlanSpecification;
42+
struct LogPlanTermSpecification;
43+
} // namespace arangodb::replication2::agency
3144

3245
namespace arangodb::replication2::agency::methods {
3346

arangod/Replication2/ReplicatedLog/AgencyLogSpecification.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ auto LogPlanTermSpecification::toVelocyPack(VPackBuilder& builder) const -> void
5757
}
5858
}
5959

60-
LogPlanTermSpecification::LogPlanTermSpecification(from_velocypack_t, VPackSlice slice) {
61-
term = slice.get(StaticStrings::Term).extract<LogTerm>();
62-
config = LogConfig(slice.get(StaticStrings::Config));
60+
LogPlanTermSpecification::LogPlanTermSpecification(from_velocypack_t, VPackSlice slice)
61+
: term(slice.get(StaticStrings::Term).extract<LogTerm>()),
62+
config(slice.get(StaticStrings::Config)) {
6363
for (auto const& [key, value] :
6464
VPackObjectIterator(slice.get(StaticStrings::Participants))) {
6565
TRI_ASSERT(value.isEmptyObject());
@@ -82,9 +82,9 @@ auto LogPlanSpecification::toVelocyPack(VPackBuilder& builder) const -> void {
8282
}
8383
}
8484

85-
LogPlanSpecification::LogPlanSpecification(from_velocypack_t, VPackSlice slice) {
86-
id = slice.get(StaticStrings::Id).extract<LogId>();
87-
targetConfig = LogConfig(slice.get(StaticStrings::TargetConfig));
85+
LogPlanSpecification::LogPlanSpecification(from_velocypack_t, VPackSlice slice)
86+
: id(slice.get(StaticStrings::Id).extract<LogId>()),
87+
targetConfig(slice.get(StaticStrings::TargetConfig)) {
8888
if (auto term = slice.get(StaticStrings::CurrentTerm); !term.isNone()) {
8989
currentTerm = LogPlanTermSpecification{from_velocypack, term};
9090
}
@@ -99,7 +99,8 @@ LogPlanTermSpecification::LogPlanTermSpecification(LogTerm term, LogConfig confi
9999
participants(std::move(participants)) {}
100100

101101
LogPlanSpecification::LogPlanSpecification(LogId id, std::optional<LogPlanTermSpecification> term,
102-
LogConfig config) : id(id), currentTerm(std::move(term)), targetConfig(config) {}
102+
LogConfig config)
103+
: id(id), currentTerm(std::move(term)), targetConfig(config) {}
103104

104105
LogCurrentLocalState::LogCurrentLocalState(from_velocypack_t, VPackSlice slice) {
105106
auto spearheadSlice = slice.get(StaticStrings::Spearhead);
@@ -135,10 +136,11 @@ LogCurrentSupervision::LogCurrentSupervision(from_velocypack_t, VPackSlice slice
135136
}
136137
}
137138

138-
LogCurrentSupervisionElection::LogCurrentSupervisionElection(from_velocypack_t, VPackSlice slice) {
139-
term = slice.get(StaticStrings::Term).extract<LogTerm>();
140-
participantsRequired = slice.get("participantsRequired").getNumericValue<std::size_t>();
141-
participantsAvailable = slice.get("participantsAvailable").getNumericValue<std::size_t>();
139+
LogCurrentSupervisionElection::LogCurrentSupervisionElection(from_velocypack_t, VPackSlice slice)
140+
: term(slice.get(StaticStrings::Term).extract<LogTerm>()),
141+
participantsRequired(slice.get("participantsRequired").getNumericValue<std::size_t>()),
142+
participantsAvailable(
143+
slice.get("participantsAvailable").getNumericValue<std::size_t>()) {
142144
for (auto [key, value] : VPackObjectIterator(slice.get("details"))) {
143145
detail.emplace(key.copyString(), value.get("code").getNumericValue<ErrorCode>());
144146
}
@@ -172,21 +174,23 @@ auto LogCurrentSupervisionElection::toVelocyPack(VPackBuilder& builder) const ->
172174
builder.add("participantsAvailable", VPackValue(participantsAvailable));
173175
{
174176
VPackObjectBuilder db(&builder, "details");
175-
for (auto const&[server, error] : detail) {
177+
for (auto const& [server, error] : detail) {
176178
builder.add(VPackValue(server));
177179
::toVelocyPack(error, builder);
178180
}
179181
}
180182
}
181183

182-
auto agency::toVelocyPack(LogCurrentSupervisionElection::ErrorCode ec, VPackBuilder& builder) -> void {
184+
auto agency::toVelocyPack(LogCurrentSupervisionElection::ErrorCode ec,
185+
VPackBuilder& builder) -> void {
183186
VPackObjectBuilder ob(&builder);
184187
builder.add("code", VPackValue(static_cast<int>(ec)));
185188
builder.add("message", VPackValue(to_string(ec)));
186189
}
187190

188-
auto agency::to_string(LogCurrentSupervisionElection::ErrorCode ec) noexcept -> std::string_view {
189-
switch(ec) {
191+
auto agency::to_string(LogCurrentSupervisionElection::ErrorCode ec) noexcept
192+
-> std::string_view {
193+
switch (ec) {
190194
case LogCurrentSupervisionElection::ErrorCode::OK:
191195
return "the server is ok";
192196
case LogCurrentSupervisionElection::ErrorCode::SERVER_NOT_GOOD:

arangod/Replication2/ReplicatedLog/ILogParticipant.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "RestServer/Metrics.h"
2929

3030
#include <Basics/Exceptions.h>
31+
#include <Basics/StaticStrings.h>
3132

3233
using namespace arangodb;
3334
using namespace arangodb::replication2;
@@ -68,3 +69,19 @@ auto replicated_log::ILogParticipant::getTerm() const noexcept -> std::optional<
6869
auto replicated_log::LogUnconfiguredParticipant::release(LogIndex doneWithIdx) -> Result {
6970
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
7071
}
72+
73+
replicated_log::WaitForResult::WaitForResult(LogIndex index,
74+
std::shared_ptr<QuorumData const> quorum)
75+
: currentCommitIndex(index), quorum(std::move(quorum)) {}
76+
77+
void replicated_log::WaitForResult::toVelocyPack(velocypack::Builder& builder) const {
78+
VPackObjectBuilder ob(&builder);
79+
builder.add(StaticStrings::CommitIndex, VPackValue(currentCommitIndex));
80+
builder.add(VPackValue("quorum"));
81+
quorum->toVelocyPack(builder);
82+
}
83+
84+
replicated_log::WaitForResult::WaitForResult(velocypack::Slice s) {
85+
currentCommitIndex = s.get(StaticStrings::CommitIndex).extract<LogIndex>();
86+
quorum = std::make_shared<QuorumData>(s.get("quorum"));
87+
}

arangod/Replicat F65E ion2/ReplicatedLog/ILogParticipant.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ namespace arangodb::replication2::replicated_log {
4242
struct LogCore;
4343
struct LogStatus;
4444

45+
struct WaitForResult {
46+
/// @brief contains the _current_ commit index. (Not the index waited for)
47+
LogIndex currentCommitIndex;
48+
/// @brief Quorum information
49+
std::shared_ptr<QuorumData const> quorum;
50+
51+
WaitForResult(LogIndex index, std::shared_ptr<QuorumData const> quorum);
52+
WaitForResult() = default;
53+
WaitForResult(velocypack::Slice);
54+
55+
void toVelocyPack(velocypack::Builder&) const;
56+
};
57+
4558
/**
4659
* @brief Interface for a log participant: That is, usually either a leader or a
4760
* follower (LogLeader and LogFollower). Can also be a LogUnconfiguredParticipant,
@@ -55,8 +68,8 @@ struct ILogParticipant {
5568
[[nodiscard]] virtual auto resign() &&
5669
-> std::tuple<std::unique_ptr<LogCore>, DeferredAction> = 0;
5770

58-
using WaitForPromise = futures::Promise<std::shared_ptr<QuorumData const>>;
59-
using WaitForFuture = futures::Future<std::shared_ptr<QuorumData const>>;
71+
using WaitForPromise = futures::Promise<WaitForResult>;
72+
using WaitForFuture = futures::Future<WaitForResult>;
6073
using WaitForIteratorFuture = futures::Future<std::unique_ptr<LogRangeIterator>>;
6174
using WaitForQueue = std::multimap<LogIndex, WaitForPromise>;
6275

0 commit comments

Comments
 (0)
0