@@ -89,10 +89,38 @@ std::string const TTL("ttl");
89
89
90
90
using namespace std ::chrono;
91
91
92
+ // Overview over the code in this file:
93
+ // The main method being called is "first", it does:
94
+ // first:
95
+ // - wait until leader has created shard
96
+ // - lookup local shard
97
+ // - call `replicationSynchronize`
98
+ // - call `catchupWithReadLock`
99
+ // - call `catchupWithExclusiveLock`
100
+ // replicationSynchronize:
101
+ // - set local shard to follow leader (without a following term id)
102
+ // - use a `DatabaseInitialSyncer` to synchronize to a certain state,
103
+ // (configure leaderId for it to go through)
104
+ // catchupWithReadLock:
105
+ // - start a read lock on leader
106
+ // - keep configuration for shard to follow the leader without term id
107
+ // - call `replicationSynchronizeCatchup` (WAL tailing, configure leaderId
108
+ // for it to go through)
109
+ // - cancel read lock on leader
110
+ // catchupWithExclusiveLock:
111
+ // - start an exclusive lock on leader, acquire unique following term id
112
+ // - set local shard to follower leader (with new following term id)
113
+ // - call `replicationSynchronizeFinalize` (WAL tailing)
114
+ // - do a final check by comparing counts on leader and follower
115
+ // - add us as official follower on the leader
116
+ // - release exclusive lock on leader
117
+ //
118
+
92
119
SynchronizeShard::SynchronizeShard (MaintenanceFeature& feature, ActionDescription const & desc)
93
120
: ActionBase(feature, desc),
94
121
ShardDefinition(desc.get(DATABASE), desc.get(SHARD)),
95
- _leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()) {
122
+ _leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()),
123
+ _followingTermId(0 ) {
96
124
std::stringstream error;
97
125
98
126
if (!desc.has (COLLECTION)) {
@@ -416,7 +444,7 @@ arangodb::Result SynchronizeShard::getReadLock(
416
444
// nullptr only happens during controlled shutdown
417
445
if (pool == nullptr ) {
418
446
return arangodb::Result (TRI_ERROR_SHUTTING_DOWN,
419
- " cancelReadLockOnLeader : Shutting down" );
447
+ " getReadLock : Shutting down" );
420
448
}
421
449
422
450
VPackBuilder body;
@@ -445,6 +473,16 @@ arangodb::Result SynchronizeShard::getReadLock(
445
473
446
474
if (res.ok ()) {
447
475
// Habemus clausum, we have a lock
476
+ if (!soft) {
477
+ // Now store the random followingTermId:
478
+ VPackSlice body = response.response ().slice ();
479
+ if (body.isObject ()) {
480
+ VPackSlice followingTermIdSlice = body.get (StaticStrings::FollowingTermId);
481
+ if (followingTermIdSlice.isNumber ()) {
482
+ _followingTermId = followingTermIdSlice.getNumber <uint64_t >();
483
+ }
484
+ }
485
+ }
448
486
return arangodb::Result ();
449
487
}
450
488
@@ -524,6 +562,7 @@ static arangodb::ResultT<SyncerId> replicationSynchronize(
524
562
auto syncer = DatabaseInitialSyncer::create (vocbase, configuration);
525
563
526
564
if (!leaderId.empty ()) {
565
+ // In this phase we use the normal leader ID without following term id:
527
566
syncer->setLeaderId (leaderId);
528
567
}
529
568
@@ -599,6 +638,8 @@ static arangodb::Result replicationSynchronizeCatchup(
599
638
auto syncer = DatabaseTailingSyncer::create (guard.database (), configuration, fromTick, /* useTick*/ true );
600
639
601
640
if (!leaderId.empty ()) {
641
+ // In this phase we still use the normal leaderId without a following
642
+ // term id:
602
643
syncer->setLeaderId (leaderId);
603
644
}
604
645
@@ -624,10 +665,10 @@ static arangodb::Result replicationSynchronizeCatchup(
624
665
625
666
static arangodb::Result replicationSynchronizeFinalize (SynchronizeShard const & job,
626
667
application_features::ApplicationServer& server,
627
- VPackSlice const & conf) {
668
+ VPackSlice const & conf,
669
+ std::string const & leaderId) {
628
670
auto const database = conf.get (DATABASE).copyString ();
629
671
auto const collection = conf.get (COLLECTION).copyString ();
630
- auto const leaderId = conf.get (LEADER_ID).copyString ();
631
672
auto const fromTick = conf.get (" from" ).getNumber <uint64_t >();
632
673
633
674
ReplicationApplierConfiguration configuration =
@@ -935,6 +976,10 @@ bool SynchronizeShard::first() {
935
976
936
977
auto details = std::make_shared<VPackBuilder>();
937
978
979
+ // Configure the shard to follow the leader without any following
980
+ // term id:
981
+ collection->followers ()->setTheLeader (leader);
982
+
938
983
ResultT<SyncerId> syncRes =
939
984
replicationSynchronize (*this , collection, config.slice (), details);
940
985
@@ -1165,6 +1210,19 @@ Result SynchronizeShard::catchupWithExclusiveLock(
1165
1210
}
1166
1211
});
1167
1212
1213
+ // Now we have got a unique id for this following term and have stored it
1214
+ // in _followingTermId, so we can use it to set the leader:
1215
+
1216
+ // This is necessary to accept replications from the leader which can
1217
+ // happen as soon as we are in sync.
1218
+ std::string leaderIdWithTerm{leader};
1219
+ if (_followingTermId != 0 ) {
1220
+ leaderIdWithTerm += " _" ;
1221
+ leaderIdWithTerm += basics::StringUtils::itoa (_followingTermId);
1222
+ }
1223
+ // If _followingTermid is 0, then this is a leader before the update,
1224
+ // we tolerate this and simply use its ID without a term in this case.
1225
+ collection.followers ()->setTheLeader (leaderIdWithTerm);
1168
1226
LOG_TOPIC (" d76cb" , DEBUG, Logger::MAINTENANCE) << " lockJobId: " << lockJobId;
1169
1227
1170
1228
builder.clear ();
@@ -1173,13 +1231,13 @@ Result SynchronizeShard::catchupWithExclusiveLock(
1173
1231
builder.add (ENDPOINT, VPackValue (ep));
1174
1232
builder.add (DATABASE, VPackValue (getDatabase ()));
1175
1233
builder.add (COLLECTION, VPackValue (getShard ()));
1176
- builder.add (LEADER_ID, VPackValue (leader ));
1234
+ builder.add (LEADER_ID, VPackValue (leaderIdWithTerm ));
1177
1235
builder.add (" from" , VPackValue (lastLogTick));
1178
1236
builder.add (" requestTimeout" , VPackValue (600.0 ));
1179
1237
builder.add (" connectTimeout" , VPackValue (60.0 ));
1180
1238
}
1181
1239
1182
- res = replicationSynchronizeFinalize (*this , feature ().server (), builder.slice ());
1240
+ res = replicationSynchronizeFinalize (*this , feature ().server (), builder.slice (), leaderIdWithTerm );
1183
1241
1184
1242
if (!res.ok ()) {
1185
1243
std::string errorMessage (
@@ -1188,10 +1246,6 @@ Result SynchronizeShard::catchupWithExclusiveLock(
1188
1246
return {TRI_ERROR_INTERNAL, errorMessage};
1189
1247
}
1190
1248
1191
- // This is necessary to accept replications from the leader which can
1192
- // happen as soon as we are in sync.
1193
- collection.followers ()->setTheLeader (leader);
1194
-
1195
1249
NetworkFeature& nf = _feature.server ().getFeature <NetworkFeature>();
1196
1250
network::ConnectionPool* pool = nf.pool ();
1197
1251
res = addShardFollower (pool, ep, getDatabase (), getShard (), lockJobId, clientId,
0 commit comments