8000 Merging staging/replication-2.0 into devel. · rnshah9/arangodb@30e4b84 · GitHub
[go: up one dir, main page]

Skip to content

Commit 30e4b84

Browse files
author
Lars Maier
committed
Merging staging/replication-2.0 into devel.
1 parent d2549d0 commit 30e4b84

36 files changed

+1507
-499
lines changed

arangod/Agency/Supervision.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1743,6 +1743,10 @@ bool Supervision::handleJobs() {
17431743
LOG_TOPIC("f7d05", TRACE, Logger::SUPERVISION) << "Begin checkReplicatedLogs";
17441744
checkReplicatedLogs();
17451745

1746+
LOG_TOPIC("83676", TRACE, Logger::SUPERVISION)
1747+
<< "Begin cleanupReplicatedLogs";
1748+
cleanupReplicatedLogs(); // TODO do this only every x seconds?
1749+
17461750
LOG_TOPIC("00aab", TRACE, Logger::SUPERVISION) << "Begin workJobs"; 9E88
17471751
workJobs();
17481752

@@ -2855,6 +2859,54 @@ void Supervision::readyOrphanedIndexCreations() {
28552859
}
28562860
}
28572861

2862+
void Supervision::cleanupReplicatedLogs() {
2863+
_lock.assertLockedByCurrentThread();
2864+
2865+
using namespace replication2::agency;
2866+
2867+
// check if Plan has replicated logs
2868+
auto const& planNode = snapshot().hasAsNode(planRepLogPrefix);
2869+
if (!planNode) {
2870+
return;
2871+
}
2872+
2873+
auto const& targetNode = snapshot().hasAsNode(targetRepLogPrefix);
2874+
2875+
auto builder = std::make_shared<Builder>();
2876+
auto envelope = arangodb::agency::envelope::into_builder(*builder);
2877+
2878+
for (auto const& [dbName, db] : planNode->get().children()) {
2879+
for (auto const& [idString, node] : db->children()) {
2880+
// check if this node has an owner and the owner is 'target'
2881+
if (auto owner = node->hasAsString("owner");
2882+
!owner.has_value() || owner != "target") {
2883+
continue;
2884+
}
2885+
2886+
// now check if there is a replicated log in target with that id
2887+
if (targetNode.has_value() &&
2888+
targetNode->get().has(std::vector{dbName, idString})) {
2889+
continue;
2890+
}
2891+
2892+
// delete plan and target
2893+
auto logId = replication2::LogId{basics::StringUtils::uint64(idString)};
2894+
envelope =
2895+
methods::deleteReplicatedLogTrx(std::move(envelope), dbName, logId);
2896+
}
2897+
}
2898+
2899+
envelope.done();
2900+
if (builder->slice().length() > 0) {
2901+
write_ret_t res = _agent->write(builder);
2902+
if (!res.successful()) {
2903+
LOG_TOPIC("df4b1", WARN, Logger::SUPERVISION)
2904+
<< "failed to update term in agency. Will retry. "
2905+
<< builder->toJson();
2906+
}
2907+
}
2908+
}
2909+
28582910
// This is the functional version which actually does the work, it is
28592911
// called by the private method Supervision::enforceReplication and the
28602912
// unit tests:

arangod/Agency/Supervision.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,9 @@ class Supervision : public arangodb::Thread {
195195
/// @brief Check replicated logs
196196
void checkReplicatedStates();
197197

198+
/// @brief Check replicated logs
199+
void cleanupReplicatedLogs();
200+
198201
struct ResourceCreatorLostEvent {
199202
std::shared_ptr<Node> const& resource;
200203
std::string const& coordinatorId;

arangod/Replication2/AgencyMethods.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,20 @@ auto methods::updateTermSpecification(DatabaseID const& database, LogId id,
137137
auto methods::deleteReplicatedLogTrx(arangodb::agency::envelope envelope,
138138
DatabaseID const& database, LogId id)
139139
-> arangodb::agency::envelope {
140-
auto path =
140+
auto planPath =
141141
paths::plan()->replicatedLogs()->database(database)->log(id)->str();
142+
auto targetPath =
143+
paths::target()->replicatedLogs()->database(database)->log(id)->str();
144+
auto currentPath =
145+
paths::current()->replicatedLogs()->database(database)->log(id)->str();
142146

143147
return envelope.write()
144-
.remove(path)
148+
.remove(planPath)
145149
.inc(paths::plan()->version()->str())
146-
.precs()
147-
.isNotEmpty(path)
150+
.remove(targetPath)
151+
.inc(paths::target()->version()->str())
152+
.remove(currentPath)
153+
.inc(paths::current()->version()->str())
148154
.end();
149155
}
150156

arangod/Replication2/ReplicatedLog/AgencyLogSpecification.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#include <optional>
3535
#include <type_traits>
36+
#include <utility>
3637

3738
namespace arangodb::replication2::agency {
3839

@@ -43,8 +44,8 @@ struct LogPlanTermSpecification {
4344
ParticipantId serverId;
4445
RebootId rebootId;
4546

46-
Leader(ParticipantId const& participant, RebootId const& rebootId)
47-
: serverId{participant}, rebootId{rebootId} {}
47+
Leader(ParticipantId participant, RebootId rebootId)
48+
: serverId{std::move(participant)}, rebootId{rebootId} {}
4849
Leader() : rebootId{RebootId{0}} {};
4950
auto toVelocyPack(VPackBuilder&) const -> void;
5051
friend auto operator==(Leader const&, Leader const&) noexcept
@@ -69,6 +70,8 @@ struct LogPlanSpecification {
6970

7071
ParticipantsConfig participantsConfig;
7172

73+
std::optional<std::string> owner;
74+
7275
auto toVelocyPack(velocypack::Builder&) const -> void;
7376
[[nodiscard]] static auto fromVelocyPack(velocypack::Slice)
7477
-> LogPlanSpecification;

arangod/Replication2/ReplicatedLog/AgencySpecificationInspectors.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ auto constexpr MaxActionsTraceLength =
5353
std::string_view{"maxActionsTraceLength"};
5454
auto constexpr Code = std::string_view{"code"};
5555
auto constexpr Message = std::string_view{"message"};
56+
auto constexpr Owner = std::string_view{"owner"};
5657
} // namespace static_strings
5758

5859
template<class Inspector>
@@ -73,6 +74,7 @@ auto inspect(Inspector& f, LogPlanSpecification& x) {
7374
return f.object(x).fields(
7475
f.field(StaticStrings::Id, x.id),
7576
f.field(StaticStrings::CurrentTerm, x.currentTerm),
77+
f.field(static_strings::Owner, x.owner),
7678
f.field(static_strings::ParticipantsConfig, x.participantsConfig));
7779
};
7880

arangod/Replication2/ReplicatedLog/SupervisionAction.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ auto execute(Action const& action, DatabaseID const& dbName, LogId const& log,
9393
})
9494
.inc(paths::current()->version()->str());
9595
})
96+
.precs()
97+
.isNotEmpty(
98+
paths::target()->replicatedLogs()->database(dbName)->log(log)->str())
9699
.end();
97100
}
98101

arangod/Replication2/ReplicatedLog/SupervisionAction.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,11 @@ struct AddLogToPlanAction {
177177
std::optional<LogPlanTermSpecification::Leader> _leader;
178178

179179
auto execute(ActionContext& ctx) const -> void {
180-
ctx.setPlan(LogPlanSpecification(
180+
auto newPlan = LogPlanSpecification(
181181
_id, LogPlanTermSpecification(LogTerm{1}, _config, _leader),
182-
ParticipantsConfig{.generation = 1, .participants = _participants}));
182+
ParticipantsConfig{.generation = 1, .participants = _participants});
183+
newPlan.owner = "target";
184+
ctx.setPlan(newPlan);
183185
}
184186
};
185187
template<typename Inspector>

arangod/Replication2/StateMachines/Prototype/PrototypeCore.cpp

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void PrototypeCore::loadStateFromDB() {
7979

8080
auto PrototypeCore::getSnapshot()
8181
-> std::unordered_map<std::string, std::string> {
82-
auto snapshot = _store;
82+
auto snapshot = getReadState();
8383
return std::unordered_map<std::string, std::string>{snapshot.begin(),
8484
snapshot.end()};
8585
}
@@ -101,7 +101,7 @@ auto PrototypeCore::getDump() -> PrototypeDump {
101101

102102
auto PrototypeCore::get(std::string const& key) noexcept
103103
-> std::optional<std::string> {
104-
if (auto it = _store.find(key); it != nullptr) {
104+
if (auto it = getReadState().find(key); it != nullptr) {
105105
return *it;
106106
}
107107
return std::nullopt;
@@ -110,15 +110,36 @@ auto PrototypeCore::get(std::string const& key) noexcept
110110
auto PrototypeCore::get(std::vector<std::string> const& keys)
111111
-> std::unordered_map<std::string, std::string> {
112112
std::unordered_map<std::string, std::string> result;
113-
auto snapshot = _store;
113+
auto snapshot = getReadState();
114114
for (auto const& it : keys) {
115-
if (auto found = _store.find(it); found != nullptr) {
115+
if (auto found = snapshot.find(it); found != nullptr) {
116116
result.emplace(it, *found);
117117
}
118118
}
119119
return result;
120120
}
121121

122+
bool PrototypeCore::compare(std::string const& key, std::string const& value) {
123+
if (auto it = _store.find(key); it != nullptr) {
124+
return *it == value;
125+
}
126+
return false;
127+
}
128+
129+
auto PrototypeCore::getReadState() -> StorageType {
130+
if (_ongoingStates.empty()) {
131+
// This can happen on followers or before any entries have been applied.
132+
return _store;
133+
}
134+
return _ongoingStates.front().second;
135+
}
136+
137+
void PrototypeCore::applyToOngoingState(LogIndex idx,
138+
PrototypeLogEntry const& entry) {
139+
applyToLocalStore(entry);
140+
_ongoingStates.emplace_back(idx, _store);
141+
}
142+
122143
auto PrototypeCore::getLastPersistedIndex() const noexcept -> LogIndex const& {
123144
return _lastPersistedIndex;
124145
}
@@ -127,6 +148,24 @@ auto PrototypeCore::getLogId() const noexcept -> GlobalLogIdentifier const& {
127148
return _logId;
128149
}
129150

151+
void PrototypeCore::applyToLocalStore(PrototypeLogEntry const& entry) {
152+
std::visit(
153+
overload{[&](PrototypeLogEntry::InsertOperation const& op) {
154+
for (auto const& [key, value] : op.map) {
155+
_store = _store.set(key, value);
156+
}
157+
},
158+
[&](PrototypeLogEntry::DeleteOperation const& op) {
159+
for (auto const& it : op.keys) {
160+
_store = _store.erase(it);
161+
}
162+
},
163+
[&](PrototypeLogEntry::CompareExchangeOperation const& op) {
164+
_store = _store.set(op.key, op.newValue);
165+
}},
166+
entry.op);
167+
}
168+
130169
void PrototypeDump::toVelocyPack(velocypack::Builder& b) {
131170
serialize<PrototypeDump>(b, *this);
132171
}

arangod/Replication2/StateMachines/Prototype/PrototypeCore.h

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ struct PrototypeCore {
8787
template<typename EntryIterator>
8888
void applyEntries(std::unique_ptr<EntryIterator> ptr);
8989

90+
template<typename EntryIterator>
91+
void update(std::unique_ptr<EntryIterator> ptr);
92+
9093
auto getSnapshot() -> std::unordered_map<std::string, std::string>;
9194
void applySnapshot(
9295
std::unordered_map<std::string, std::string> const& snapshot);
@@ -99,17 +102,26 @@ struct PrototypeCore {
99102
auto get(std::vector<std::string> const& keys)
100103
-> std::unordered_map<std::string, std::string>;
101104

105+
bool compare(std::string const& key, std::string const& value);
106+
107+
auto getReadState() -> StorageType;
108+
void applyToOngoingState(LogIndex, PrototypeLogEntry const&);
109+
102110
[[nodiscard]] auto getLastPersistedIndex() const noexcept -> LogIndex const&;
103111
[[nodiscard]] auto getLogId() const noexcept -> GlobalLogIdentifier const&;
104112

105113
LoggerContext const loggerContext;
106114

115+
private:
116+
void applyToLocalStore(PrototypeLogEntry const& entry);
117+
107118
private:
108119
GlobalLogIdentifier _logId;
109120
LogIndex _lastPersistedIndex;
110121
LogIndex _lastAppliedIndex;
111122
StorageType _store;
112123
std::shared_ptr<IPrototypeStorageInterface> _storage;
124+
std::deque<std::pair<LogIndex, StorageType>> _ongoingStates;
113125
};
114126

115127
/*
@@ -120,21 +132,22 @@ void PrototypeCore::applyEntries(std::unique_ptr<EntryIterator> ptr) {
120132
auto lastAppliedIndex = ptr->range().to.saturatedDecrement();
121133
while (auto entry = ptr->next()) {
122134
PrototypeLogEntry const& logEntry = entry->second;
123-
std::visit(overload{
124-
[&](PrototypeLogEntry::InsertOperation const& op) {
125-
for (auto const& [key, value] : op.map) {
126-
_store = _store.set(key, value);
127-
}
128-
},
129-
[&](PrototypeLogEntry::DeleteOperation const& op) {
130-
for (auto const& it : op.keys) {
131-
_store = _store.erase(it);
132-
}
133-
},
134-
},
135-
logEntry.op);
135+
applyToLocalStore(logEntry);
136+
}
137+
_lastAppliedIndex = std::move(lastAppliedIndex);
138+
}
139+
140+
/*
141+
* Advances through the deque.
142+
*/
143+
template<typename EntryIterator>
144+
void PrototypeCore::update(std::unique_ptr<EntryIterator> ptr) {
145+
auto lastAppliedIndex = ptr->range().to.saturatedDecrement();
146+
while (!_ongoingStates.empty() &&
147+
_ongoingStates.front().first < lastAppliedIndex) {
148+
_ongoingStates.pop_front();
136149
}
137150
_lastAppliedIndex = std::move(lastAppliedIndex);
138151
}
139152

140-
} // namespace arangodb::replication2::replicated_state::prototype
153+
} // namespace arangodb::replication2::replicated_state::prototype

0 commit comments

Comments
 (0)
0