10000 Following term id (#14405) · arangodb/arangodb@0f82b1b · GitHub
[go: up one dir, main page]

Skip to content

Commit 0f82b1b

Browse files
neunhoefjsteemann
authored and
maierlars
committed
Following term id (#14405)
* Implement following term id. * Add test. Co-authored-by: Jan <jsteemann@users.noreply.github.com>
1 parent abf7156 commit 0f82b1b

15 files changed

+354
-108
lines changed

CHANGELOG

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
devel
22
-----
33

4+
* Add following term ids, which prevents old synchronous replication requests
5+
to be accepted after a follower was dropped and has gotten in sync again.
6+
This makes the chaos tests which delay synchronous replication requests
7+
more reliable and prevent inconsistent shard replicas under bad network
8+
conditions.
9+
410
* Enable process metrics on agent instances by default. Previously, some
511
metrics (including the metrics starting with `arangodb_process` prefix) were
612
not returned by agent instances.

arangod/Cluster/CreateCollection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ bool CreateCollection::first() {
168168
std::vector<std::string> noFollowers;
169169
col->followers()->takeOverLeadership(noFollowers, nullptr);
170170
} else {
171-
col->followers()->setTheLeader(leader);
171+
col->followers()->setTheLeader(LEADER_NOT_YET_KNOWN);
172172
}
173173
}
174174

arangod/Cluster/FollowerInfo.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "Logger/LogMacros.h"
3434
#include "Logger/Logger.h"
3535
#include "Logger/LoggerStream.h"
36+
#include "Random/RandomGenerator.h"
3637
#include "VocBase/LogicalCollection.h"
3738

3839
using namespace arangodb;
@@ -542,3 +543,36 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const {
542543
}
543544
return newValue;
544545
}
546+
547+
uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept {
548+
WRITE_LOCKER(guard, _dataLock);
549+
uint64_t i = 0;
550+
uint64_t prev = 0;
551+
auto it = _followingTermId.find(s);
552+
if (it != _followingTermId.end()) {
553+
prev = it->second;
554+
}
555+
// We want the random number to be non-zero and different from a previous one:
556+
do {
557+
i = RandomGenerator::interval(UINT64_MAX);
558+
} while (i == 0 || i == prev);
559+
try {
560+
_followingTermId[s] = i;
561+
} catch(std::bad_alloc const&) {
562+
i = 1; // I assume here that I do not get bad_alloc if the key is
563+
// already in the map, since it then only has to overwrite
564+
// an integer, if the key is not in the map, we default to 1.
565+
}
566+
return i;
567+
}
568+
569+
uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) const noexcept {
570+
READ_LOCKER(guard, _dataLock);
571+
// Note that we assume that find() does not throw!
572+
auto it = _followingTermId.find(s);
573+
if (it == _followingTermId.end()) {
574+
// If not found, we use the default from above:
575+
return 1;
576+
}
577+
return it->second;
578+
}

arangod/Cluster/FollowerInfo.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ class FollowerInfo {
5454
// soon as we can guarantee at least so many followers locally.
5555
std::shared_ptr<std::vector<ServerID>> _failoverCandidates;
5656

57+
// The following map holds a random number for each follower, this
58+
// random number is given and sent to the follower when it gets in sync
59+
// (actually, when it acquires the exclusive lock to get in sync), and is
60+
// then subsequently sent alongside every synchronous replication
61+
// request. If the number does not match, the follower will refuse the
62+
// replication request. This is to ensure that replication requests cannot
63+
// be delayed into the "next" leader/follower relationship.
64+
// And here is the proof that this works: The exclusive lock divides the
65+
// write operations between "before" and "after". The id is changed
66+
// when the exclusive lock is established. Therefore, it is OK for the
67+
// replication requests to send along the "current" id, directly
68+
// from this map.
69+
std::unordered_map<ServerID, uint64_t> _followingTermId;
70+
5771
// The agencyMutex is used to synchronise access to the agency.
5872
// the _dataLock is used to sync the access to local data.
5973
// The _canWriteLock is used to protect flag if we do have enough followers
@@ -135,6 +149,28 @@ class FollowerInfo {
135149

136150
Result remove(ServerID const& s);
137151

152+
//////////////////////////////////////////////////////////////////////////////
153+
/// @brief for each run of the "get-in-sync" protocol we generate a
154+
/// random number to identify this "following term". This is created
155+
/// when the follower fetches the exclusive lock to finally get in sync
156+
/// and is stored in _followingTermId, so that it can be forwarded with
157+
/// each synchronous replication request. The follower can then decline
158+
/// the replication in case it is not "in the same term".
159+
//////////////////////////////////////////////////////////////////////////////
160+
161+
uint64_t newFollowingTermId(ServerID const& s) noexcept;
162+
163+
//////////////////////////////////////////////////////////////////////////////
164+
/// @brief for each run of the "get-in-sync" protocol we generate a
165+
/// random number to identify this "following term". This is created
166+
/// when the follower fetches the exclusive lock to finally get in sync
167+
/// and is stored in _followingTermId, so that it can be forwarded with
168+
/// each synchronous replication request. The follower can then decline
169+
/// the replication in case it is not "in the same term".
170+
//////////////////////////////////////////////////////////////////////////////
171+
172+
uint64_t getFollowingTermId(ServerID const& s) const noexcept;
173+
138174
//////////////////////////////////////////////////////////////////////////////
139175
/// @brief clear follower list, no changes in agency necesary
140176
//////////////////////////////////////////////////////////////////////////////

arangod/Cluster/Maintenance.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,6 @@ static void handlePlanShard(
327327
{DATABASE, dbname},
328328
{COLLECTION, colname},
329329
{SHARD, shname},
330-
{THE_LEADER, std::string()},
331330
{LOCAL_LEADER, std::string(localLeader)},
332331
{OLD_CURRENT_COUNTER, "0"}, // legacy, no longer used
333332
{PLAN_RAFT_INDEX, std::to_string(planIndex)}},

arangod/Cluster/SynchronizeShard.cpp

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,38 @@ std::string const TTL("ttl");
8989

9090
using namespace std::chrono;
9191

92+
// Overview over the code in this file:
93+
// The main method being called is "first", it does:
94+
// first:
95+
// - wait until leader has created shard
96+
// - lookup local shard
97+
// - call `replicationSynchronize`
98+
// - call `catchupWithReadLock`
99+
// - call `catchupWithExclusiveLock`
100+
// replicationSynchronize:
101+
// - set local shard to follow leader (without a following term id)
102+
// - use a `DatabaseInitialSyncer` to synchronize to a certain state,
103+
// (configure leaderId for it to go through)
104+
// catchupWithReadLock:
105+
// - start a read lock on leader
106+
// - keep configuration for shard to follow the leader without term id
107+
// - call `replicationSynchronizeCatchup` (WAL tailing, configure leaderId
108+
// for it to go through)
109+
// - cancel read lock on leader
110+
// catchupWithExclusiveLock:
111+
// - start an exclusive lock on leader, acquire unique following term id
112+
// - set local shard to follower leader (with new following term id)
113+
// - call `replicationSynchronizeFinalize` (WAL tailing)
114+
// - do a final check by comparing counts on leader and follower
115+
// - add us as official follower on the leader
116+
// - release exclusive lock on leader
117+
//
118+
92119
SynchronizeShard::SynchronizeShard(MaintenanceFeature& feature, ActionDescription const& desc)
93120
: ActionBase(feature, desc),
94121
ShardDefinition(desc.get(DATABASE), desc.get(SHARD)),
95-
_leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()) {
122+
_leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()),
123+
_followingTermId(0) {
96124
std::stringstream error;
97125

98126
if (!desc.has(COLLECTION)) {
@@ -416,7 +444,7 @@ arangodb::Result SynchronizeShard::getReadLock(
416444
// nullptr only happens during controlled shutdown
417445
if (pool == nullptr) {
418446
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN,
419-
"cancelReadLockOnLeader: Shutting down");
447+
"getReadLock: Shutting down");
420448
}
421449

422450
VPackBuilder body;
@@ -445,6 +473,16 @@ arangodb::Result SynchronizeShard::getReadLock(
445473

446474
if (res.ok()) {
447475
// Habemus clausum, we have a lock
476+
if (!soft) {
477+
// Now store the random followingTermId:
478+
VPackSlice body = response.response().slice();
479+
if (body.isObject()) {
480+
VPackSlice followingTermIdSlice = body.get(StaticStrings::FollowingTermId);
481+
if (followingTermIdSlice.isNumber()) {
482+
_followingTermId = followingTermIdSlice.getNumber<uint64_t>();
483+
}
484+
}
485+
}
448486
return arangodb::Result();
449487
}
450488

@@ -524,6 +562,7 @@ static arangodb::ResultT<SyncerId> replicationSynchronize(
524562
auto syncer = DatabaseInitialSyncer::create(vocbase, configuration);
525563

526564
if (!leaderId.empty()) {
565+
// In this phase we use the normal leader ID without following term id:
527566
syncer->setLeaderId(leaderId);
528567
}
529568

@@ -599,6 +638,8 @@ static arangodb::Result replicationSynchronizeCatchup(
599638
auto syncer = DatabaseTailingSyncer::create(guard.database(), configuration, fromTick, /*useTick*/true);
600639

601640
if (!leaderId.empty()) {
641+
// In this phase we still use the normal leaderId without a following
642+
// term id:
602643
syncer->setLeaderId(leaderId);
603644
}
604645

@@ -624,10 +665,10 @@ static arangodb::Result replicationSynchronizeCatchup(
624665

625666
static arangodb::Result replicationSynchronizeFinalize(SynchronizeShard const& job,
626667
application_features::ApplicationServer& server,
627-
VPackSlice const& conf) {
668+
VPackSlice const& conf,
669+
std::string const& leaderId) {
628670
auto const database = conf.get(DATABASE).copyString();
629671
auto const collection = conf.get(COLLECTION).copyString();
630-
auto const leaderId = conf.get(LEADER_ID).copyString();
631672
auto const fromTick = conf.get("from").getNumber<uint64_t>();
632673

633674
ReplicationApplierConfiguration configuration =
@@ -935,6 +976,10 @@ bool SynchronizeShard::first() {
935976

936977
auto details = std::make_shared<VPackBuilder>();
937978

979+
// Configure the shard to follow the leader without any following
980+
// term id:
981+
collection->followers()->setTheLeader(leader);
982+
938983
ResultT<SyncerId> syncRes =
939984
replicationSynchronize(*this, collection, config.slice(), details);
940985

@@ -1165,6 +1210,19 @@ Result SynchronizeShard::catchupWithExclusiveLock(
11651210
}
11661211
});
11671212

1213+
// Now we have got a unique id for this following term and have stored it
1214+
// in _followingTermId, so we can use it to set the leader:
1215+
1216+
// This is necessary to accept replications from the leader which can
1217+
// happen as soon as we are in sync.
1218+
std::string leaderIdWithTerm{leader};
1219+
if (_followingTermId != 0) {
1220+
leaderIdWithTerm += "_";
1221+
leaderIdWithTerm += basics::StringUtils::itoa(_followingTermId);
1222+
}
1223+
// If _followingTermid is 0, then this is a leader before the update,
1224+
// we tolerate this and simply use its ID without a term in this case.
1225+
collection.followers()->setTheLeader(leaderIdWithTerm);
11681226
LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId;
11691227

11701228
builder.clear();
@@ -1173,13 +1231,13 @@ Result SynchronizeShard::catchupWithExclusiveLock(
11731231
builder.add(ENDPOINT, VPackValue(ep));
11741232
builder.add(DATABASE, VPackValue(getDatabase()));
11751233
builder.add(COLLECTION, VPackValue(getShard()));
1176-
builder.add(LEADER_ID, VPackValue(leader));
1234+
builder.add(LEADER_ID, VPackValue(leaderIdWithTerm));
11771235
builder.add("from", VPackValue(lastLogTick));
11781236
builder.add("requestTimeout", VPackValue(600.0));
11791237
builder.add("connectTimeout", VPackValue(60.0));
11801238
}
11811239

1182-
res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice());
1240+
res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice(), leaderIdWithTerm);
11831241

11841242
if (!res.ok()) {
11851243
std::string errorMessage(
@@ -1188,10 +1246,6 @@ Result SynchronizeShard::catchupWithExclusiveLock(
11881246
return {TRI_ERROR_INTERNAL, errorMessage};
11891247
}
11901248

1191-
// This is necessary to accept replications from the leader which can
1192-
// happen as soon as we are in sync.
1193-
collection.followers()->setTheLeader(leader);
1194-
11951249
NetworkFeature& nf = _feature.server().getFeature<NetworkFeature>();
11961250
network::ConnectionPool* pool = nf.pool();
11971251
res = addShardFollower(pool, ep, getDatabase(), getShard(), lockJobId, clientId,

arangod/Cluster/SynchronizeShard.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class SynchronizeShard : public ActionBase, public ShardDefinition {
9191

9292
/// @brief information about the leader, reused across multiple replication steps
9393
arangodb::replutils::LeaderInfo _leaderInfo;
94+
uint64_t _followingTermId;
9495
};
9596

9697
} // namespace maintenance

0 commit comments

Comments
 (0)
0