8000 Fix issues with following term ids during rolling upgrade (#15101) · open-bigdata/arangodb@457a2e1 · GitHub
[go: up one dir, main page]

Skip to content

Commit 457a2e1

Browse files
authored
Fix issues with following term ids during rolling upgrade (arangodb#15101)
1 parent f137c0e commit 457a2e1

File tree

7 files changed

+238
-19
lines changed

7 files changed

+238
-19
lines changed

CHANGELOG

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
devel
22
-----
33

4+
* Fix issues during rolling upgrades from 3.8.0 to 3.8.x (x >= 1) and from
5+
3.7.x (x <= 12) to 3.8.3. The problem was that older versions did not handle
6+
following term ids that are sent from newer versions during synchronous
7+
replication operations.
8+
49
* Close a potential gap during shard synchronization when moving from the
510
initial sync step to the WAL tailing step. In this small gap the leader
611
could purge some of the WAL files that would be required by the following

arangod/Cluster/FollowerInfo.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const {
551551
return newValue;
552552
}
553553

554+
void FollowerInfo::setFollowingTermId(ServerID const& s, uint64_t value) {
555+
WRITE_LOCKER(guard, _dataLock);
556+
_followingTermId[s] = value;
557+
}
558+
554559
uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept {
555560
WRITE_LOCKER(guard, _dataLock);
556561
uint64_t i = 0;
< 8000 /div>

arangod/Cluster/FollowerInfo.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ class FollowerInfo {
149149

150150
Result remove(ServerID const& s);
151151

152+
//////////////////////////////////////////////////////////////////////////////
153+
/// @brief explicitly set the following term id for a follower.
154+
/// this should only be used for special cases during upgrading or testing.
155+
//////////////////////////////////////////////////////////////////////////////
156+
void setFollowingTermId(ServerID const& s, uint64_t value);
157+
152158
//////////////////////////////////////////////////////////////////////////////
153159
/// @brief for each run of the "get-in-sync" protocol we generate a
154160
/// random number to identify this "following term". This is created

arangod/Cluster/SynchronizeShard.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,17 @@ arangodb::Result SynchronizeShard::getReadLock(
449449
body.add(TTL, VPackValue(timeout));
450450
body.add("serverId", VPackValue(arangodb::ServerState::instance()->getId()));
451451
body.add(StaticStrings::RebootId, VPackValue(ServerState::instance()->getRebootId().value()));
452-
body.add(StaticStrings::ReplicationSoftLockOnly, VPackValue(soft));
452+
body.add(StaticStrings::ReplicationSoftLockOnly, VPackValue(soft));
453+
// the following attribute was added in 3.8.3:
454+
// with this, the follower indicates to the leader that it is
455+
// capable of handling following term ids correctly.
456+
bool sendWantFollowingTerm = true;
457+
TRI_IF_FAILURE("SynchronizeShard::dontSendWantFollowingTerm") {
458+
sendWantFollowingTerm = false;
459+
}
460+
if (sendWantFollowingTerm) {
461+
body.add("wantFollowingTerm", VPackValue(true));
462+
}
453463
}
454464
auto buf = body.steal();
455465

arangod/RestHandler/RestReplicationHandler.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2734,8 +2734,20 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
27342734
VPackObjectBuilder bb(&b);
27352735
b.add(StaticStrings::Error, VPackValue(false));
27362736
if (!serverId.empty()) {
2737-
b.add(StaticStrings::FollowingTermId,
2738-
VPackValue(col->followers()->newFollowingTermId(serverId)));
2737+
// check if the follower sent us the "wantFollowingTerm" attribute.
2738+
// followers >= 3.8.3 will send this to indicate that they know how
2739+
// to handle following term ids safely
2740+
bool wantFollowingTerm = VelocyPackHelper::getBooleanValue(body, "wantFollowingTerm", false);
2741+
if (wantFollowingTerm) {
2742+
b.add(StaticStrings::FollowingTermId,
2743+
VPackValue(col->followers()->newFollowingTermId(serverId)));
2744+
} else {
2745+
// a client < 3.8.3 does not know how to handle following term ids
2746+
// safely. in this case we set the follower's term id to 0, so it
2747+
// will be ignored on followers < 3.8.3
2748+
col->followers()->setFollowingTermId(serverId, 0);
2749+
b.add(StaticStrings::FollowingTermId, VPackValue(0));
2750+
}
27392751
}
27402752

27412753
// also return the _current_ last log sequence number. this may be higher

arangod/Transaction/Methods.cpp

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,17 @@ Future<OperationResult> transaction::Methods::insertLocal(std::string const& cna
10061006
if (options.isSynchronousReplicationFrom.empty()) {
10071007
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options);
10081008
}
1009-
if (options.isSynchronousReplicationFrom != theLeader) {
1009+
bool sendRefusal = (options.isSynchronousReplicationFrom != theLeader);
1010+
TRI_IF_FAILURE("synchronousReplication::refuseOnFollower") {
1011+
sendRefusal = true;
1012+
}
1013+
TRI_IF_FAILURE("synchronousReplication::expectFollowingTerm") {
1014+
// expect a following term id or send a refusal
1015+
if (!options.isRestore) {
1016+
sendRefusal |= (options.isSynchronousReplicationFrom.find('_') == std::string::npos);
1017+
}
1018+
}
1019+
if (sendRefusal) {
10101020
return OperationResult(
10111021
::buildRefusalResult(*collection, "insert", options, theLeader),
10121022
options);
@@ -1298,7 +1308,17 @@ Future<OperationResult> transaction::Methods::modifyLocal(std::string const& col
12981308
if (options.isSynchronousReplicationFrom.empty()) {
12991309
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options);
13001310
}
1301-
if (options.isSynchronousReplicationFrom != theLeader) {
1311+
bool sendRefusal = (options.isSynchronousReplicationFrom != theLeader);
1312+
TRI_IF_FAILURE("synchronousReplication::refuseOnFollower") {
1313+
sendRefusal = true;
1314+
}
1315+
TRI_IF_FAILURE("synchronousReplication::expectFollowingTerm") {
1316+
// expect a following term id or send a refusal
1317+
if (!options.isRestore) {
1318+
sendRefusal |= (options.isSynchronousReplicationFrom.find('_') == std::string::npos);
1319+
}
1320+
}
1321+
if (sendRefusal) {
13021322
return OperationResult(
13031323
::buildRefusalResult(*collection, (operation == TRI_VOC_DOCUMENT_OPERATION_REPLACE ? "replace" : "update"), options, theLeader),
13041324
options);
@@ -1509,7 +1529,17 @@ Future<OperationResult> transaction::Methods::removeLocal(std::string const& col
15091529
if (options.isSynchronousReplicationFrom.empty()) {
15101530
return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options);
15111531
}
1512-
if (options.isSynchronousReplicationFrom != theLeader) {
1532+
bool sendRefusal = (options.isSynchronousReplicationFrom != theLeader);
1533+
TRI_IF_FAILURE("synchronousReplication::refuseOnFollower") {
1534+
sendRefusal = true;
1535+
}
1536+
TRI_IF_FAILURE("synchronousReplication::expectFollowingTerm") {
1537+
// expect a following term id or send a refusal
1538+
if (!options.isRestore) {
1539+
sendRefusal |= (options.isSynchronousReplicationFrom.find('_') == std::string::npos);
1540+
}
1541+
}
1542+
if (sendRefusal) {
15131543
return OperationResult(
15141544
::buildRefusalResult(*collection, "remove", options, theLeader),
15151545
options);
@@ -1742,7 +1772,17 @@ Future<OperationResult> transaction::Methods::truncateLocal(std::string const& c
17421772
return futures::makeFuture(
17431773
OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED, options));
17441774
}
1745-
if (options.isSynchronousReplicationFrom != theLeader) {
1775+
bool sendRefusal = (options.isSynchronousReplicationFrom != theLeader);
1776+
TRI_IF_FAILURE("synchronousReplication::refuseOnFollower") {
1777+
sendRefusal = true;
1778+
}
1779+
TRI_IF_FAILURE("synchronousReplication::expectFollowingTerm") {
1780+
// expect a following term id or send a refusal
1781+
if (!options.isRestore) {
1782+
sendRefusal |= (options.isSynchronousReplicationFrom.find('_') == std::string::npos);
1783+
}
1784+
}
1785+
if (sendRefusal) {
17461786
return futures::makeFuture(
17471787
OperationResult(
17481788
::buildRefusalResult(*collection, "truncate", options, theLeader),
@@ -1784,10 +1824,21 @@ Future<OperationResult> transaction::Methods::truncateLocal(std::string const& c
17841824
reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false"));
17851825

17861826
for (auto const& f : *followers) {
1787-
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
1788-
ServerState::instance()->getId() + "_" +
1789-
basics::StringUtils::itoa(
1790-
collection->followers()->getFollowingTermId(f)));
1827+
// check following term id for the follower:
1828+
// if it is 0, it means that the follower cannot handle following
1829+
// term ids safely, so we only pass the leader id string to id but
1830+
// no following term. this happens for followers < 3.8.3
1831+
// if the following term id is != 0, we will pass it on along with
1832+
// the leader id string, in format "LEADER_FOLLOWINGTERMID"
1833+
uint64_t followingTermId = collection->followers()->getFollowingTermId(f);
1834+
if (followingTermId == 0) {
1835+
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
1836+
ServerState::instance()->getId());
1837+
} else {
1838+
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
1839+
ServerState::instance()->getId() + "_" +
1840+
basics::StringUtils::itoa(followingTermId));
1841+
}
17911842
// reqOpts is copied deep in sendRequestRetry, so we are OK to
17921843
// change it in the loop!
17931844
network::Headers headers;
@@ -2326,10 +2377,21 @@ Future<Result> Methods::replicateOperations(
23262377

23272378
auto* pool = vocbase().server().getFeature<NetworkFeature>().pool();
23282379
for (auto const& f : *followerList) {
2329-
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
2330-
ServerState::instance()->getId() + "_" +
2331-
basics::StringUtils::itoa(
2332-
collection->followers()->getFollowingTermId(f)));
2380+
// check following term id for the follower:
2381+
// if it is 0, it means that the follower cannot handle following
2382+
// term ids safely, so we only pass the leader id string to id but
2383+
// no following term. this happens for followers < 3.8.3
2384+
// if the following term id is != 0, we will pass it on along with
2385+
// the leader id string, in format "LEADER_FOLLOWINGTERMID"
2386+
uint64_t followingTermId = collection->followers()->getFollowingTermId(f);
2387+
if (followingTermId == 0) {
2388+
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
2389+
ServerState::instance()->getId());
2390+
} else {
2391+
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
2392+
ServerState::instance()->getId() + "_" +
2393+
basics::StringUtils::itoa(followingTermId));
2394+
}
23332395
// reqOpts is copied deep in sendRequestRetry, so we are OK to
23342396
// change it in the loop!
23352397
network::Headers headers;

tests/js/client/shell/shell-following-term-id-cluster.js

Lines changed: 123 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ let jsunity = require('jsunity');
3030
let internal = require('internal');
3131
let arangodb = require('@arangodb');
3232
let db = arangodb.db;
33+
let { getMetric, debugCanUseFailAt, debugRemoveFailAt, debugSetFailAt, debugClearFailAt, waitForShardsInSync } = require('@arangodb/test-helper');
3334

3435
function createCollectionWithKnownLeaderAndFollower(cn) {
3536
db._create(cn, {numberOfShards:1, replicationFactor:2});
@@ -64,20 +65,19 @@ function switchConnectionToFollower(collInfo) {
6465
function followingTermIdSuite() {
6566
'use strict';
6667
const cn = 'UnitTestsFollowingTermId';
67-
let collInfo = {};
6868

6969
return {
7070

7171
setUp: function () {
7272
db._drop(cn);
73-
collInfo = createCollectionWithKnownLeaderAndFollower(cn);
7473
},
7574

7675
tearDown: function () {
7776
db._drop(cn);
7877
},
7978

80-
testFollowingTermIdSuite: function() {
79+
testFollowingTermIdHandling: function() {
80+
let collInfo = createCollectionWithKnownLeaderAndFollower(cn);
8181
// We have a shard whose leader and follower is known to us.
8282

8383
// Let's insert some documents:
@@ -92,7 +92,7 @@ function followingTermIdSuite() {
9292
switchConnectionToFollower(collInfo);
9393
assertEqual(100, db._collection(collInfo.shard).count());
9494

95-
// Try to insert a document with the leaderId:
95+
// Try to insert a document with only the leaderId:
9696
let res = arango.POST(`/_api/document/${collInfo.shard}?isSynchronousReplication=${collInfo.idMap[collInfo.leader]}`, {Hallo:101});
9797
assertTrue(res.error);
9898
assertEqual(406, res.code);
@@ -112,6 +112,125 @@ function followingTermIdSuite() {
112112
switchConnectionToCoordinator(collInfo);
113113
},
114114

115+
testFollowingTermIdHandlingMixedMode: function() {
116+
let collInfo = createCollectionWithKnownLeaderAndFollower(cn);
117+
118+
let followerEndpoint = collInfo.endpointMap[collInfo.follower];
119+
let leaderEndpoint = collInfo.endpointMap[collInfo.leader];
120+
121+
if (!debugCanUseFailAt(followerEndpoint)) {
122+
// test only works if we can use failure points
123+
return;
124+
}
125+
126+
// We have a shard whose leader and follower is known to us.
127+
// now set failure points on the follower to get the shard out
128+
// of sync
129+
try {
130+
switchConnectionToFollower(collInfo);
131+
// this failure point makes a follower refuse every operation sent by the leader
132+
// via synchronous replication
133+
debugSetFailAt(followerEndpoint, "synchronousReplication::refuseOnFollower");
134+
// this failure point makes the follower not send the "wantFollowingTermId" as part
135+
// of the synchronization protocol
136+
debugSetFailAt(followerEndpoint, "synchronousReplication::dontSendWantFollowingTerm");
137+
138+
let droppedFollowersBefore = getMetric(leaderEndpoint, "arangodb_dropped_followers_total");
139+
140+
switchConnectionToCoordinator(collInfo);
141+
let c = db._collection(cn);
142+
// this will drop the follower
143+
c.insert({});
144+
145+
let droppedFollowersAfter = getMetric(leaderEndpoint, "arangodb_dropped_followers_total");
146+
assertTrue(droppedFollowersAfter > droppedFollowersBefore, { droppedFollowersBefore, droppedFollowersAfter });
147+
148+
switchConnectionToFollower(collInfo);
149+
debugRemoveFailAt(followerEndpoint, "synchronousReplication::refuseOnFollower");
150+
151+
// wait for everything to get back into sync
152+
switchConnectionToCoordinator(collInfo);
153+
assertEqual(1, db._collection(cn).count());
154+
waitForShardsInSync(cn, 120);
155+
156+
switchConnectionToFollower(collInfo);
157+
assertEqual(1, db._collection(collInfo.shard).count());
158+
159+
switchConnectionToLeader(collInfo);
160+
assertEqual(1, db._collection(collInfo.shard).count());
161+
162+
} finally {
163+
switchConnectionToFollower(collInfo);
164+
debugClearFailAt(followerEndpoint);
165+
166+
switchConnectionToCoordinator(collInfo);
167+
}
168+
},
169+
170+
testFollowingTermIdIsSet: function() {
171+
let collInfo = createCollectionWithKnownLeaderAndFollower(cn);
172+
173+
let followerEndpoint = collInfo.endpointMap[collInfo.follower];
174+
let leaderEndpoint = collInfo.endpointMap[collInfo.leader];
175+
176+
if (!debugCanUseFailAt(followerEndpoint)) {
177+
// test only works if we can use failure points
178+
return;
179+
}
180+
181+
// We have a shard whose leader and follower is known to us.
182+
// now set failure points on the follower to get the shard out
183+
// of sync
184+
try {
185+
switchConnectionToFollower(collInfo);
186+
187+
// this failure point makes a follower refuse every operation sent by the leader
188+
// via synchronous replication
189+
debugSetFailAt(followerEndpoint, "synchronousReplication::refuseOnFollower");
190+
// this failure point makes the follower reject all synchronous replication requests
191+
// that do not have a following term id
192+
debugSetFailAt(followerEndpoint, "synchronousReplication::expectFollowingTerm");
193+
194+
let droppedFollowersBefore = getMetric(leaderEndpoint, "arangodb_dropped_followers_total");
195+
196+
switchConnectionToCoordinator(collInfo);
197+
let c = db._collection(cn);
198+
// this will drop the follower
199+
c.insert({});
200+
201+
let droppedFollowersAfter = getMetric(leaderEndpoint, "arangodb_dropped_followers_total");
202+
assertTrue(droppedFollowersAfter > droppedFollowersBefore, { droppedFollowersBefore, droppedFollowersAfter });
203+
204+
switchConnectionToFollower(collInfo);
205+
debugRemoveFailAt(followerEndpoint, "synchronousReplication::refuseOnFollower");
206+
207+
// wait for everything to get back into sync
208+
switchConnectionToCoordinator(collInfo);
209+
assertEqual(1, db._collection(cn).count());
210+
waitForShardsInSync(cn, 120);
211+
212+
switchConnectionToFollower(collInfo);
213+
assertEqual(1, db._collection(collInfo.shard).count());
214+
215+
switchConnectionToLeader(collInfo);
216+
assertEqual(1, db._collection(collInfo.shard).count());
217+
218+
219+
droppedFollowersBefore = getMetric(leaderEndpoint, "arangodb_dropped_followers_total");
220+
// the following insert should not drop any followers
221+
switchConnectionToCoordinator(collInfo);
222+
c.insert({});
223+
224+
droppedFollowersAfter = getMetric(leaderEndpoint, "arangodb_dropped_followers_total");
225+
assertEqual(droppedFollowersAfter, droppedFollowersBefore);
226+
227+
} finally {
228+
switchConnectionToFollower(collInfo);
229+
debugClearFailAt(followerEndpoint);
230+
switchConnectionToCoordinator(collInfo);
231+
}
232+
},
233+
115234
};
116235
}
117236

0 commit comments

Comments
 (0)
0