8000 Delay MoveShard job until old and new leader ready - BTS-1110 (#17554) · arangodb/arangodb@0a97c82 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0a97c82

Browse files
authored
Delay MoveShard job until old and new leader ready - BTS-1110 (#17554)
* Delay start of MoveShard if supposed leader not in Current. * Also delay, if supposed follower is not in sync. * CHANGELOG. * Add unit tests for new behaviour. * Typo fixes suggested by reviewer. * Refactor for shorter MoveShard::start function.
1 parent 2182dd3 commit 0a97c82

File tree

4 files changed

+187
-25
lines changed

4 files changed

+187
-25
lines changed

CHANGELOG

+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
devel
22
-----
33

4+
* Delay a MoveShard operation for leader change, until the old leader has
5+
actually assumed its leadership and until the new leader is actually in
6+
sync. This fixes a bug which could block a shard under certain circumstances.
7+
This fixes BTS-1110.
8+
49
* Fixed issue #17367: FILTER fails when using negation (!) on variable whose
510
name starts with "in". Add trailing context to NOT IN token.
611

arangod/Agency/MoveShard.cpp

Lines changed: 83 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,50 @@ bool MoveShard::create(std::shared_ptr<VPackBuilder> envelope) {
204204
return false;
205205
}
206206

207+
bool MoveShard::checkLeaderFollowerCurrent(
208+
std::vector<Job::shard_t> const& shardsLikeMe) {
209+
bool ok = true;
210+
for (auto const& s : shardsLikeMe) {
211+
auto sharedPath = _database + "/" + s.collection + "/";
212+
auto currentServersPath = curColPrefix + sharedPath + s.shard + "/servers";
213+
auto serverList = _snapshot.hasAsArray(currentServersPath);
214+
if (serverList && (*serverList).length() > 0) {
215+
if (_from != (*serverList)[0].stringView()) {
216+
LOG_TOPIC("55261", DEBUG, Logger::SUPERVISION)
217+
<< "MoveShard: From server " << _from
218+
<< " has not yet assumed leadership for collection " << s.collection
219+
<< " shard " << s.shard
220+
<< ", delaying start of MoveShard job for shard " << _shard;
221+
ok = false;
222+
break;
223+
}
224+
bool toFound = false;
225+
for (auto server : VPackArrayIterator(*serverList)) {
226+
if (_to == server.stringView()) {
227+
toFound = true;
228+
break;
229+
}
230+
}
231+
if (!toFound) {
232+
LOG_TOPIC("55262", DEBUG, Logger::SUPERVISION)
233+
<< "MoveShard: To server " << _to
234+
<< " is not in sync for collection " << s.collection << " shard "
235+
<< s.shard << ", delaying start of MoveShard job for shard "
236+
<< _shard;
237+
ok = false;
238+
break;
239+
}
240+
} else {
241+
LOG_TOPIC("55263", INFO, Logger::SUPERVISION)
242+
<< "MoveShard: Did not find a non-empty server list in Current "
243+
"for collection "
244+
<< s.collection << " and shard " << s.shard;
245+
ok = false; // not even a server list found
246+
}
247+
}
248+
return ok;
249+
}
250+
207251
bool MoveShard::start(bool&) {
208252
if (considerCancellation()) {
209253
return false;
@@ -391,6 +435,19 @@ bool MoveShard::start(bool&) {
391435
}
392436
}
393437

438+
if (_isLeader && _toServerIsFollower) {
439+
// Further checks, before we can begin, we must make sure that the
440+
// _fromServer has accepted its leadership already for all shards in the
441+
// shard group and that the _toServer is actually in sync. Otherwise,
442+
// if this job here asks the leader to resign, we would be stuck.
443+
// If the _toServer is not in sync, the job would take overly long.
444+
bool ok = checkLeaderFollowerCurrent(shardsLikeMe);
445+
if (!ok) {
446+
return false; // Do not start job, but leave it in Todo.
447+
// Log messages already written.
448+
}
449+
}
450+
394451
// Copy todo to pending
395452
Builder todo, pending;
396453

@@ -732,7 +789,8 @@ JOB_STATUS MoveShard::pendingLeader() {
732789
if (plan.isArray() &&
733790
Job::countGoodOrBadServersInList(_snapshot, plan) < plan.length()) {
734791
LOG_TOPIC("de056", DEBUG, Logger::SUPERVISION)
735-
<< "MoveShard (leader): found FAILED server in Plan, aborting job, db: "
792+
<< "MoveShard (leader): found FAILED server in Plan, aborting job, "
793+
"db: "
736794
<< _database << " coll: " << _collection << " shard: " << _shard;
737795
abort("failed server in Plan");
738796
return FAILED;
@@ -839,12 +897,12 @@ JOB_STATUS MoveShard::pendingLeader() {
839897

840898
// We need to switch leaders:
841899
{
842-
// First make sure that the server we want to go to is still in Current
843-
// for all shards. This is important, since some transaction which the
844-
// leader has still executed before its resignation might have dropped a
845-
// follower for some shard, and this could have been our new leader. In
846-
// this case we must abort and go back to the original leader, which is
847-
// still perfectly safe.
900+
// First make sure that the server we want to go to is still in
901+
// Current for all shards. This is important, since some transaction
902+
// which the leader has still executed before its resignation might
903+
// have dropped a follower for some shard, and this could have been
904+
// our new leader. In this case we must abort and go back to the
905+
// original leader, which is still perfectly safe.
848906
for (auto const& sh : shardsLikeMe) {
849907
auto const shardPath =
850908
curColPrefix + _database + "/" + sh.collection + "/" + sh.shard;
@@ -953,8 +1011,8 @@ JOB_STATUS MoveShard::pendingLeader() {
9531011
}
9541012
} else {
9551013
LOG_TOPIC("3294e", WARN, Logger::SUPERVISION)
956-
<< "failed to iterate through current shard servers for "
957-
"shard "
1014+
<< "failed to iterate through current shard servers "
1015+
"for shard "
9581016
<< _shard << " or one of its clones";
9591017
TRI_ASSERT(false);
9601018
return; // we don't increment done and remain PENDING
@@ -1063,8 +1121,8 @@ JOB_STATUS MoveShard::pendingFollower() {
10631121
if (plan.isArray() &&
10641122
Job::countGoodOrBadServersInList(_snapshot, plan) < plan.length()) {
10651123
LOG_TOPIC("f8c22", DEBUG, Logger::SUPERVISION)
1066-
<< "MoveShard (follower): found FAILED server in Plan, aborting job, "
1067-
"db: "
1124+
<< "MoveShard (follower): found FAILED server in Plan, aborting "
1125+
"job, db: "
10681126
<< _database << " coll: " << _collection << " shard: " << _shard;
10691127
abort("failed server in Plan");
10701128
return FAILED;
@@ -1242,8 +1300,8 @@ arangodb::Result MoveShard::abort(std::string const& reason) {
12421300
trx.add(VPackValue(_from));
12431301
if (plan.isArray()) {
12441302
for (VPackSlice srv : VPackArrayIterator(plan)) {
1245-
// from could be in plan as <from> or <_from>. Exclude to
1246-
// server always.
1303+
// from could be in plan as <from> or <_from>. Exclude
1304+
// to server always.
12471305
if (srv.isEqualString(_from) ||
12481306
srv.isEqualString("_" + _from) ||
12491307
srv.isEqualString(_to)) {
@@ -1259,8 +1317,8 @@ arangodb::Result MoveShard::abort(std::string const& reason) {
12591317
TRI_ASSERT(false);
12601318
return;
12611319
}
1262-
// Add to server last. Will be removed by removeFollower if to
1263-
// much
1320+
// Add to server last. Will be removed by removeFollower if
1321+
// too many.
12641322
trx.add(VPackValue(_to));
12651323
}
12661324
});
@@ -1312,8 +1370,8 @@ arangodb::Result MoveShard::abort(std::string const& reason) {
13121370
VPackObjectBuilder preconditionObj(&trx);
13131371
addMoveShardToServerCanUnLock(trx);
13141372
addMoveShardFromServerCanUnLock(trx);
1315-
// If the collection is gone in the meantime, we do nothing here, but the
1316-
// round will move the job to Finished anyway:
1373+
// If the collection is gone in the meantime, we do nothing here, but
1374+
// the round will move the job to Finished anyway:
13171375
addPreconditionCollectionStillThere(trx, _database, _collection);
13181376
}
13191377
}
@@ -1331,19 +1389,19 @@ arangodb::Result MoveShard::abort(std::string const& reason) {
13311389
LOG_TOPIC("513e6", INFO, Logger::SUPERVISION)
13321390
<< "Precondition failed on MoveShard::abort() for shard " << _shard
13331391
<< " of collection " << _collection
1334-
<< ", if the collection has been deleted in the meantime, the job "
1335-
"will be finished soon, if this message repeats, tell us.";
1392+
<< ", if the collection has been deleted in the meantime, the "
1393+
"job will be finished soon, if this message repeats, tell us.";
13361394
result = Result(
13371395
TRI_ERROR_SUPERVISION_GENERAL_FAILURE,
13381396
std::string("Precondition failed while aborting moveShard job ") +
13391397
_jobId);
13401398
return result;
1341-
// We intentionally do not move the job object to Failed or Finished here!
1342-
// The failed precondition can either be one of the read locks, which
1343-
// suggests a fundamental problem, and in which case we will log this
1344-
// message in every round of the supervision. Or the collection has been
1345-
// dropped since we took the snapshot, in this case we will move the job
1346-
// to Finished in the next round.
1399+
// We intentionally do not move the job object to Failed or Finished
1400+
// here! The failed precondition can either be one of the read locks,
1401+
// which suggests a fundamental problem, and in which case we will log
1402+
// this message in every round of the supervision. Or the collection
1403+
// has been dropped since we took the snapshot, in this case we will
1404+
// move the job to Finished in the next round.
13471405
}
13481406
result = Result(
13491407
TRI_ERROR_SUPERVISION_GENERAL_FAILURE,

arangod/Agency/MoveShard.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,7 @@ struct MoveShard : public Job {
8888
void addMoveShardFromServerCanUnLock(Builder& ops) const;
8989

9090
bool moveShardFinish(bool unlock, bool success, std::string const& msg);
91+
bool checkLeaderFollowerCurrent(
92+
std::vector<Job::shard_t> const& shardsLikeMe);
9193
};
9294
} // namespace arangodb::consensus

tests/Agency/MoveShardTest.cpp

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,103 @@ TEST_F(MoveShardTest, the_job_should_wait_until_the_target_server_is_good) {
621621
moveShard.start(aborts);
622622
}
623623

624+
TEST_F(MoveShardTest, the_job_should_wait_until_the_from_server_is_in_current) {
625+
std::function<std::unique_ptr<VPackBuilder>(velocypack::Slice,
626+
std::string const&)>
627+
createTestStructure = [&](velocypack::Slice s, std::string const& path) {
628+
auto builder = std::make_unique<velocypack::Builder>();
629+
if (s.isObject()) {
630+
builder->add(VPackValue(VPackValueType::Object));
631+
for (auto it : VPackObjectIterator(s)) {
632+
auto childBuilder =
633+
createTestStructure(it.value, path + "/" + it.key.copyString());
634+
if (childBuilder) {
635+
builder->add(it.key.copyString(), childBuilder->slice());
636+
}
637+
}
638+
639+
if (path == "/arango/Target/ToDo") {
640+
builder->add(
641+
jobId,
642+
createJob(COLLECTION, SHARD_LEADER, SHARD_FOLLOWER1).slice());
643+
}
644+
builder->close();
645+
} else {
646+
// Simulate a new leader which has not yet assumed its leadership:
647+
if (path == "/arango/Current/Collections/" + DATABASE + "/" +
648+
COLLECTION + "/" + SHARD + "/servers") {
649+
{
650+
VPackArrayBuilder guard(builder.get());
651+
builder->add(VPackValue("follower1"));
652+
builder->add(VPackValue("leader"));
653+
}
654+
} else {
655+
builder->add(s);
656+
}
657+
}
658+
return builder;
659+
};
660+
661+
Mock<AgentInterface> mockAgent;
662+
When(Method(mockAgent, waitFor)).AlwaysReturn();
663+
AgentInterface& agent = mockAgent.get();
664+
665+
auto builder = createTestStructure(baseStructure.toBuilder().slice(), "");
666+
ASSERT_TRUE(builder);
667+
Node agency = createAgencyFromBuilder(*builder);
668+
669+
auto moveShard = MoveShard(agency, &agent, TODO, jobId);
670+
moveShard.start(aborts);
671+
}
672+
673+
TEST_F(MoveShardTest, the_job_should_wait_until_the_to_server_is_in_sync) {
674+
std::function<std::unique_ptr<VPackBuilder>(velocypack::Slice,
675+
std::string const&)>
676+
createTestStructure = [&](velocypack::Slice s, std::string const& path) {
677+
auto builder = std::make_unique<velocypack::Builder>();
678+
if (s.isObject()) {
679+
builder->add(VPackValue(VPackValueType::Object));
680+
for (auto it : VPackObjectIterator(s)) {
681+
auto childBuilder =
682+
createTestStructure(it.value, path + "/" + it.key.copyString());
683+
if (childBuilder) {
684+
builder->add(it.key.copyString(), childBuilder->slice());
685+
}
686+
}
687+
688+
if (path == "/arango/Target/ToDo") {
689+
builder->add(
690+
jobId,
691+
createJob(COLLECTION, SHARD_LEADER, SHARD_FOLLOWER1).slice());
692+
}
693+
builder->close();
694+
} else {
695+
// Simulate a new leader which has not yet assumed its leadership:
696+
if (path == "/arango/Current/Collections/" + DATABASE + "/" +
697+
COLLECTION + "/" + SHARD + "/servers") {
698+
{
699+
VPackArrayBuilder guard(builder.get());
700+
builder->add(VPackValue("leader"));
701+
}
702+
} else {
703+
builder->add(s);
704+
}
705+
}
706+
return builder;
707+
};
708+
709+
Mock<AgentInterface> mockAgent;
710+
When(Method(mockAgent, waitFor)).AlwaysReturn();
711+
AgentInterface& agent = mockAgent.get();
712+
713+
auto builder = createTestStructure(baseStructure.toBuilder().slice(), "");
714+
ASSERT_TRUE(builder);
715+
Node agency = createAgencyFromBuilder(*builder);
716+
717+
auto moveShard = MoveShard(agency, &agent, TODO, jobId);
718+
moveShard.start(aborts);
719+
}
720+
624721
TEST_F(
625722
MoveShardTest,
626723
the_job_should_fail_if_the_shard_distributes_its_shards_like_some_other) {

0 commit comments

Comments
 (0)
0