8000 Merging replication 2 staging into devel. · rnshah9/arangodb@d970c99 · GitHub
[go: up one dir, main page]

Skip to content

Commit d970c99

Browse files
author
Lars Maier
committed
Merging replication 2 staging into devel.
1 parent 2cb5f92 commit d970c99

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2608
-1926
lines changed

arangod/Agency/Supervision.cpp

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2521,7 +2521,7 @@ namespace {
25212521
template<typename T>
25222522
auto parseSomethingFromNode(Node const& n) -> T {
25232523
auto builder = n.toBuilder();
2524-
return T::fromVelocyPack(builder.slice());
2524+
return velocypack::deserialize<T>(builder.slice());
25252525
}
25262526

25272527
template<typename T>
@@ -2584,6 +2584,33 @@ auto parseReplicatedStateAgency(Node const& root, Node const& targetNode,
25842584
}
25852585
} // namespace
25862586

2587+
namespace {
2588+
using namespace replication2::replicated_log;
2589+
auto handleReplicatedLog(Node const& snapshot, Node const& targetNode,
2590+
std::string const& dbName, std::string const& idString,
2591+
ParticipantsHealth const& health,
2592+
arangodb::agency::envelope envelope)
2593+
-> arangodb::agency::envelope try {
2594+
auto maybeLog = parseReplicatedLogAgency(snapshot, dbName, idString);
2595+
2596+
if (maybeLog.has_value()) {
2597+
auto const& log = *maybeLog;
2598+
return replication2::replicated_log::executeCheckReplicatedLog(
2599+
dbName, idString, std::move(log), health, std::move(envelope));
2600+
} else {
2601+
LOG_TOPIC("56a0c", ERR, Log 10000 ger::REPLICATION2)
2602+
<< "Supervision could not parse Target node for replicated log "
2603+
<< dbName << "/" << idString;
2604+
return envelope;
2605+
}
2606+
} catch (std::exception const& err) {
2607+
LOG_TOPIC("9f7fb", ERR, Logger::REPLICATION2)
2608+
<< "Supervision caught exception while parsing replicated log" << dbName
2609+
<< "/" << idString << ": " << err.what();
2610+
return envelope;
2611+
}
2612+
} // namespace
2613+
25872614
void Supervision::checkReplicatedLogs() {
25882615
_lock.assertLockedByCurrentThread();
25892616

@@ -2597,7 +2624,7 @@ void Supervision::checkReplicatedLogs() {
25972624

25982625
using namespace replication2::replicated_log;
25992626

2600-
ParticipantsHealth info = std::invoke([&] {
2627+
ParticipantsHealth participantsHealth = std::invoke([&] {
26012628
std::unordered_map<replication2::ParticipantId, ParticipantHealth> info;
26022629
auto& dbservers = snapshot().hasAsChildren(plannedServers).value().get();
26032630
for (auto const& [serverId, node] : dbservers) {
@@ -2619,61 +2646,8 @@ void Supervision::checkReplicatedLogs() {
26192646

26202647
for (auto const& [dbName, db] : targetNode->get().children()) {
26212648
for (auto const& [idString, node] : db->children()) {
2622-
auto target = parseSomethingFromNode<LogTarget>(*node);
2623-
auto plan = parseIfExists<LogPlanSpecification>(
2624-
snapshot(), aliases::plan()
2625-
->replicatedLogs()
2626-
->database(dbName)
2627-
->log(idString)
2628-
->str(SkipComponents(1)));
2629-
auto current =
2630-
parseIfExists<LogCurrent>(snapshot(), aliases::current()
2631-
->replicatedLogs()
2632-
->database(dbName)
2633-
->log(idString)
2634-
->str(SkipComponents(1)));
2635-
2636-
auto maybeAction =
2637-
std::invoke([&, &dbName = dbName]() -> std::optional<Action> {
2638-
try {
2639-
return checkReplicatedLog(target, plan, current, info);
2640-
} catch (std::exception const& err) {
2641-
LOG_TOPIC("576c1", ERR, Logger::REPLICATION2)
2642-
<< "Supervision caught exception in checkReplicatedLog for "
2643-
"replicated log "
2644-
<< dbName << "/" << target.id << ": " << err.what();
2645-
return std::nullopt;
2646-
}
2647-
});
2648-
2649-
if (maybeAction) {
2650-
auto const& action = *maybeAction;
2651-
2652-
if (target.supervision.has_value() &&
2653-
target.supervision->maxActionsTraceLength > 0) {
2654-
envelope =
2655-
envelope.write()
2656-
.push_queue_emplace(
2657-
aliases::current()
2658-
->replicatedLogs()
2659-
->database(dbName)
2660-
->log(idString)
2661-
->actions()
2662-
->str(),
2663-
[&](velocypack::Builder& b) {
2664-
VPackObjectBuilder ob(&b);
2665-
b.add("time", VPackValue(timepointToString(
2666-
std::chrono::system_clock::now())));
2667-
b.add(VPackValue("desc"));
2668-
arangodb::replication2::replicated_log::toVelocyPack(
2669-
action, b);
2670-
},
2671-
target.supervision->maxActionsTraceLength)
2672-
.end();
2673-
}
2674-
envelope = arangodb::replication2::replicated_log::execute(
2675-
action, dbName, target.id, plan, current, std::move(envelope));
2676-
}
2649+
envelope = handleReplicatedLog(snapshot(), *node, dbName, idString,
2650+
participantsHealth, std::move(envelope));
26772651
}
26782652
}
26792653

arangod/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,7 @@ set(LIB_ARANGO_REPLICATION2_SOURCES
591591
Replication2/Exceptions/ParticipantResignedException.h
592592
Replication2/Methods.cpp
593593
Replication2/ReplicatedLog/AgencyLogSpecification.cpp
594+
Replication2/ReplicatedLog/AgencySpecificationInspectors.cpp
594595
Replication2/ReplicatedLog/Algorithms.cpp
595596
Replication2/ReplicatedLog/ILogInterfaces.cpp
596597
Replication2/ReplicatedLog/InMemoryLog.cpp
@@ -1126,6 +1127,7 @@ target_link_libraries(arango_replication arango_vocbase)
11261127

11271128
target_link_libraries(arango_replication2 arango)
11281129
target_link_libraries(arango_replication2 immer)
1130+
target_link_libraries(arango_replication2 fmt)
11291131
target_link_libraries(arango_replication2 arango_network)
11301132

11311133
target_link_libraries(arango_rocksdb rocksdb)

arangod/Cluster/ClusterInfo.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,19 @@
5252
#include "Cluster/RebootTracker.h"
5353
#include "Cluster/ServerState.h"
5454
#include "Indexes/Index.h"
55+
#include "Inspection/VPack.h"
5556
#include "Logger/Logger.h"
57+
#include "Metrics/CounterBuilder.h"
58+
#include "Metrics/HistogramBuilder.h"
59+
#include "Metrics/LogScale.h"
60+
#include "Metrics/MetricsFeature.h"
5661
#include "Random/RandomGenerator.h"
5762
#include "Replication2/AgencyCollectionSpecification.h"
5863
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
64+
#include "Replication2/ReplicatedLog/AgencySpecificationInspectors.h"
5965
#include "Replication2/ReplicatedLog/LogCommon.h"
6066
#include "Rest/CommonDefines.h"
6167
#include "RestServer/DatabaseFeature.h"
62-
#include "Metrics/CounterBuilder.h"
63-
#include "Metrics/HistogramBuilder.h"
64-
#include "Metrics/LogScale.h"
65-
#include "Metrics/MetricsFeature.h"
6668
#include "RestServer/SystemDatabaseFeature.h"
6769
#include "Scheduler/SchedulerFeature.h"
6870
#include "Sharding/ShardingInfo.h"
@@ -1449,8 +1451,8 @@ void ClusterInfo::loadPlan() {
14491451
VPackObjectIterator(logsSlice)) {
14501452
auto spec =
14511453
std::make_shared<replication2::agency::LogPlanSpecification>(
1452-
replication2::agency::LogPlanSpecification::fromVelocyPack(
1453-
logSlice));
1454+
velocypack::deserialize<
1455+
replication2::agency::LogPlanSpecification>(logSlice));
14541456
newLogs.emplace(spec->id, spec);
14551457
}
14561458
stuff->replicatedLogs = std::move(newLogs);

arangod/Cluster/Maintenance.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,20 @@
3535
#include "Cluster/FollowerInfo.h"
3636
#include "Cluster/ResignShardLeadership.h"
3737
#include "Indexes/Index.h"
38+
#include "Inspection/VPack.h"
3839
#include "Logger/LogContextKeys.h"
3940
#include "Logger/LogMacros.h"
4041
#include "Logger/Logger.h"
4142
#include "Logger/LoggerStream.h"
42-
#include "Replication2/LoggerContext.h"
43-
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
44-
#include "Replication2/ReplicatedLog/LogStatus.h"
45-
#include "Replication2/ReplicatedState/StateStatus.h"
4643
#include "Metrics/Counter.h"
4744
#include "Metrics/Gauge.h"
4845
#include "Metrics/Histogram.h"
4946
#include "Metrics/LogScale.h"
47+
#include "Replication2/LoggerContext.h"
48+
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
49+
#include "Replication2/ReplicatedLog/AgencySpecificationInspectors.h"
50+
#include "Replication2/ReplicatedLog/LogStatus.h"
51+
#include "Replication2/ReplicatedState/StateStatus.h"
5052
#include "RestServer/DatabaseFeature.h"
5153
#include "Utils/DatabaseGuard.h"
5254
#include "VocBase/LogicalCollection.h"
@@ -546,7 +548,7 @@ void arangodb::maintenance::diffReplicatedLogs(
546548
VPackBuilder builder;
547549
auto slice = VPackSlice::noneSlice();
548550
if (spec != nullptr) {
549-
spec->toVelocyPack(builder);
551+
velocypack::serialize(builder, *spec);
550552
slice = builder.slice();
551553
}
552554
return StringUtils::encodeBase64(slice.startAs<char>(),
@@ -642,7 +644,7 @@ void arangodb::maintenance::diffReplicatedStates(
642644
VPackBuilder builder;
643645
auto slice = VPackSlice::noneSlice();
644646
if (obj != nullptr) {
645-
obj->toVelocyPack(builder);
647+
velocypack::serialize(builder, *obj);
646648
slice = builder.slice();
647649
}
648650
return StringUtils::encodeBase64(slice.startAs<char>(), slice.byteSize());
@@ -921,7 +923,8 @@ arangodb::Result arangodb::maintenance::diffPlanLocal(
921923
->vec());
922924
if (planLogInDatabaseSlice.isObject()) {
923925
for (auto [key, value] : VPackObjectIterator(planLogInDatabaseSlice)) {
924-
auto spec = agency::LogPlanSpecification::fromVelocyPack(value);
926+
auto spec =
927+
velocypack::deserialize<agency::LogPlanSpecification>(value);
925928
planLogsInDatabase.emplace(spec.id, std::move(spec));
926929
}
927930
}
@@ -950,7 +953,8 @@ arangodb::Result arangodb::maintenance::diffPlanLocal(
950953
if (planStatesInDatabaseSlice.isObject()) {
951954
for (auto [key, value] :
952955
VPackObjectIterator(planStatesInDatabaseSlice)) {
953-
auto spec = replicated_state::agency::Plan::fromVelocyPack(value);
956+
auto spec =
957+
velocypack::deserialize<replicated_state::agency::Plan>(value);
954958

955959
auto id = spec.id;
956960
planStatesInDatabase.emplace(id, std::move(spec));
@@ -959,7 +963,7 @@ arangodb::Result arangodb::maintenance::diffPlanLocal(
959963
currentStatesInDatabaseSlice.get(key.stringView());
960964
!currentSlice.isNone()) {
961965
auto currentObj =
962-
replicated_state::agency::Current::fromVelocyPack(
966+
velocypack::deserialize<replicated_state::agency::Current>(
963967
currentSlice);
964968
currentStatesInDatabase.emplace(id, std::move(currentObj));
965969
}
@@ -1545,7 +1549,7 @@ static void writeUpdateReplicatedLogLeader(
15451549
VPackObjectBuilder o(&report);
15461550
report.add(OP, VP_SET);
15471551
report.add(VPackValue("payload"));
1548-
leader.toVelocyPack(report);
1552+
velocypack::serialize(report, leader);
15491553
{
15501554
VPackObjectBuilder preconditionBuilder(&report, "precondition");
15511555
report.add(preconditionPath, VPackValue(localTerm));
@@ -1589,7 +1593,7 @@ static void writeUpdateReplicatedLogLocal(
15891593
VPackObjectBuilder o(&report);
15901594
report.add(OP, VP_SET);
15911595
report.add(VPackValue("payload"));
1592-
local.toVelocyPack(report);
1596+
velocypack::serialize(report, local);
15931597
{
15941598
VPackObjectBuilder preconditionBuilder(&report, "precondition");
15951599
report.add(preconditionPath, VPackValue(localTerm));
@@ -1624,7 +1628,7 @@ static void reportCurrentReplicatedLog(
16241628
if (currentSlice.isNone()) {
16251629
return std::nullopt;
16261630
}
1627-
return LogCurrent::fromVelocyPack(currentSlice);
1631+
return velocypack::deserialize<LogCurrent>(currentSlice);
16281632
});
16291633

16301634
{
@@ -1681,8 +1685,8 @@ static void reportCurrentReplicatedState(
16811685
if (currentSlice.isNone()) {
16821686
return std::nullopt;
16831687
}
1684-
return replication2::replicated_state::agency::Current::fromVelocyPack(
1685-
currentSlice);
1688+
return velocypack::deserialize<
1689+
replication2::replicated_state::agency::Current>(currentSlice);
16861690
});
16871691

16881692
bool const updateCurrent = std::invoke([&] {
@@ -1728,7 +1732,7 @@ static void reportCurrentReplicatedState(
17281732
VPackObjectBuilder o(&report);
17291733
report.add(OP, VP_SET);
17301734
report.add(VPackValue("payload"));
1731-
update.toVelocyPack(report);
1735+
velocypack::serialize(report, update);
17321736
}
17331737
}
17341738

arangod/Cluster/UpdateReplicatedLogAction.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
#include "Cluster/FailureOracleFeature.h"
3030
#include "Cluster/MaintenanceFeature.h"
3131
#include "Cluster/ServerState.h"
32+
#include "Inspection/VPack.h"
3233
#include "Network/NetworkFeature.h"
3334
#include "Replication2/Exceptions/ParticipantResignedException.h"
3435
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
36+
#include "Replication2/ReplicatedLog/AgencySpecificationInspectors.h"
3537
#include "Replication2/ReplicatedLog/Algorithms.h"
3638
#include "Replication2/ReplicatedLog/NetworkAttachedFollower.h"
3739
#include "Replication2/ReplicatedLog/ReplicatedLog.h"
@@ -73,7 +75,7 @@ bool arangodb::maintenance::UpdateReplicatedLogAction::first() {
7375
StringUtils::decodeBase64(_description.get(REPLICATED_LOG_SPEC));
7476
auto slice = VPackSlice(reinterpret_cast<uint8_t const*>(buffer.c_str()));
7577
if (!slice.isNone()) {
76-
return agency::LogPlanSpecification::fromVelocyPack(slice);
78+
return velocypack::deserialize<agency::LogPlanSpecification>(slice);
7779
}
7880

7981
return std::nullopt;

arangod/Cluster/UpdateReplicatedStateAction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ bool arangodb::maintenance::UpdateReplicatedStateAction::first() {
7272
auto buffer = StringUtils::decodeBase64(_description.get(std::string{key}));
7373
auto slice = VPackSlice(reinterpret_cast<uint8_t const*>(buffer.c_str()));
7474
if (!slice.isNone()) {
75-
return T::fromVelocyPack(slice);
75+
return velocypack::deserialize<T>(slice);
7676
}
7777

7878
return std::nullopt;

arangod/Replication2/AgencyMethods.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@
4141
#include "Cluster/AgencyCache.h"
4242
#include "Cluster/ClusterTypes.h"
4343
#include "Replication2/ReplicatedLog/AgencyLogSpecification.h"
44+
#include "Replication2/ReplicatedLog/AgencySpecificationInspectors.h"
4445
#include "Replication2/ReplicatedLog/LogCommon.h"
4546
#include "VocBase/voc-types.h"
4647
#include "VocBase/vocbase.h"
48+
#include "Inspection/VPack.h"
4749

4850
namespace arangodb {
4951
class Result;
@@ -88,7 +90,8 @@ auto methods::updateTermSpecificationTrx(arangodb::agency::envelope envelope,
8890

8991
return envelope.write()
9092
.emplace_object(
91-
termPath, [&](VPackBuilder& builder) { spec.toVelocyPack(builder); })
93+
termPath,
94+
[&](VPackBuilder& builder) { velocypack::serialize(builder, spec); })
9295
.inc(paths::plan()->version()->str())
9396
.precs()
9497
.isNotEmpty(logPath)
@@ -179,7 +182,8 @@ auto methods::createReplicatedLogTrx(arangodb::agency::envelope envelope,
179182

180183
return envelope.write()
181184
.emplace_object(
182-
path, [&](VPackBuilder& builder) { spec.toVelocyPack(builder); })
185+
path,
186+
[&](VPackBuilder& builder) { velocypack::serialize(builder, spec); })
183187
.inc(paths::target()->version()->str())
184188
.precs()
185189
.isEmpty(path)
@@ -225,9 +229,10 @@ auto methods::updateElectionResult(arangodb::agency::envelope envelope,
225229
->str();
226230

227231
return envelope.write()
228-
.emplace_object(
229-
path + "/supervision/election",
230-
[&](VPackBuilder& builder) { result.toVelocyPack(builder); })
232+
.emplace_object(path + "/supervision/election",
233+
[&](VPackBuilder& builder) {
234+
velocypack::serialize(builder, result);
235+
})
231236
.inc(paths::current()->version()->str())
232237
.end();
233238
}
@@ -240,7 +245,7 @@ auto methods::getCurrentSupervision(TRI_vocbase_t& vocbase, LogId id)
240245
agencyCache.get(builder, basics::StringUtils::concatT(
241246
"Current/ReplicatedLogs/", vocbase.name(), "/",
242247
id, "/supervision"));
243-
return LogCurrentSupervision::fromVelocyPack(builder.slice());
248+
return velocypack::deserialize<LogCurrentSupervision>(builder.slice());
244249
}
245250

246251
namespace {
@@ -256,7 +261,8 @@ auto createReplicatedStateTrx(arangodb::agency::envelope envelope,
256261

257262
return envelope.write()
258263
.emplace_object(
259-
path, [&](VPackBuilder& builder) { spec.toVelocyPack(builder); })
264+
path,
265+
[&](VPackBuilder& builder) { velocypack::serialize(builder, spec); })
260266
.inc(paths::target()->version()->str())
261267
.precs()
262268
.isEmpty(path)

0 commit comments

Comments
 (0)
0