10000 AddFollower to handle multiple followers at the same time · rowhit/arangodb@c6ef45b · GitHub
[go: up one dir, main page]

Skip to content

Commit c6ef45b

Browse files
committed
AddFollower to handle multiple followers at the same time
1 parent 9245a09 commit c6ef45b

File tree

5 files changed

+85
-29
lines changed

5 files changed

+85
-29
lines changed

arangod/Agency/AddFollower.cpp

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,34 @@ AddFollower::AddFollower(Node const& snapshot, Agent* agent,
3232
std::string const& prefix, std::string const& database,
3333
std::string const& collection,
3434
std::string const& shard,
35-
std::string const& newFollower)
35+
std::initializer_list<std::string> const& newFollower)
36+
: Job(snapshot, agent, jobId, creator, prefix),
37+
_database(database),
38+
_collection(collection),
39+
_shard(shard),
40+
_newFollower(newFollower) {
41+
try {
42+
JOB_STATUS js = status();
43+
44+
if (js == TODO) {
45+
start();
46+
} else if (js == NOTFOUND) {
47+
if (create()) {
48+
start();
49+
}
50+
}
51+
} catch (std::exception const& e) {
52+
LOG_TOPIC(WARN, Logger::AGENCY) << e.what() << __FILE__ << __LINE__;
53+
finish("Shards/" + _shard, false, e.what());
54+
}
55+
}
56+
57+
AddFollower::AddFollower(Node const& snapshot, Agent* agent,
58+
std::string const& jobId, std::string const& creator,
59+
std::string const& prefix, std::string const& database,
60+
std::string const& collection,
61+
std::string const& shard,
62+
std::vector<std::string> const& newFollower)
3663
: Job(snapshot, agent, jobId, creator, prefix),
3764
_database(database),
3865
_collection(collection),
@@ -109,7 +136,13 @@ bool AddFollower::create() {
109136
_jb->add("database", VPackValue(_database));
110137
_jb->add("collection", VPackValue(_collection));
111138
_jb->add("shard", VPackValue(_shard));
112-
_jb->add("newFollower", VPackValue(_newFollower));
139+
_jb->add(VPackValue("newFollower"));
140+
{
141+
VPackArrayBuilder b(_jb.get());
142+
for (auto const& i : _newFollower) {
143+
_jb->add(VPackValue(i));
144+
}
145+
}
113146
_jb->add("jobId", VPackValue(_jobId));
114147
_jb->add("timeCreated", VPackValue(now));
115148

@@ -142,15 +175,15 @@ bool AddFollower::start() {
142175

143176
for (auto const& srv : VPackArrayIterator(current)) {
144177
TRI_ASSERT(srv.isString());
145-
if (srv.copyString() == _newFollower) {
178+
if (srv.copyString() == _newFollower.front()) {
146179
finish("Shards/" + _shard, false,
147180
"newFollower must not be already holding the shard.");
148181
return false;
149182
}
150183
}
151184
for (auto const& srv : VPackArrayIterator(planned)) {
152185
TRI_ASSERT(srv.isString());
153-
if (srv.copyString() == _newFollower) {
186+
if (srv.copyString() == _newFollower.front()) {
154187
finish("Shards/" + _shard, false,
155188
"newFollower must not be planned for shard already.");
156189
return false;
@@ -206,7 +239,9 @@ bool AddFollower::start() {
206239
for (auto const& srv : VPackArrayIterator(planned)) {
207240
pending.add(srv);
208241
}
209-
pending.add(VPackValue(_newFollower));
242+
for (auto const& i : _newFollower) {
243+
pending.add(VPackValue(i));
244+
}
210245
pending.close();
211246

212247
// --- Increment Plan/Version
@@ -237,7 +272,7 @@ bool AddFollower::start() {
237272

238273
if (res.accepted && res.indices.size() == 1 && res.indices[0]) {
239274
LOG_TOPIC(INFO, Logger::AGENCY)
240-
<< "Pending: Addfollower " + _newFollower + " to shard " + _shard;
275+
<< "Pending: Addfollower " << _newFollower << " to shard " << _shard;
241276
return true;
242277
}
243278

@@ -253,8 +288,12 @@ JOB_STATUS AddFollower::status() {
253288
try {
254289
_database = _snapshot(pos[status] + _jobId + "/database").getString();
255290
_collection = _snapshot(pos[status] + _jobId + "/collection").getString();
256-
_newFollower =
257-
_snapshot(pos[status] + _jobId + "/newFollower").getString();
291+
for (auto const& i :
292+
VPackArrayIterator(
293+
_snapshot(pos[status] + _jobId + "/newFollower").getArray())) {
294+
_newFollower.push_back(i.copyString());
295+
}
296+
_snapshot(pos[status] + _jobId + "/newFollower").getArray();
258297
_shard = _snapshot(pos[status] + _jobId + "/shard").getString();
259298
} catch (std::exception const& e) {
260299
std::stringstream err;
@@ -271,7 +310,7 @@ JOB_STATUS AddFollower::status() {
271310

272311
Slice current = _snapshot(curPath).slice();
273312
for (auto const& srv : VPackArrayIterator(current)) {
274-
if (srv.copyString() == _newFollower) {
313+
if (srv.copyString() == _newFollower.front()) {
275314
if (finish("Shards/" + _shard)) {
276315
return FINISHED;
277316
}

arangod/Agency/AddFollower.h

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,26 @@ namespace consensus {
3333
struct AddFollower : public Job {
3434

3535
AddFollower (Node const& snapshot,
36-
Agent* agent,
37-
std::string const& jobId,
38-
std::string const& creator,
39-
std::string const& prefix,
40-
std::string const& database = std::string(),
41-
std::string const& collection = std::string(),
42-
std::string const& shard = std::string(),
43-
std::string const& newFollower = std::string());
36+
Agent* agent,
37+
std::string const& jobId,
38+
std::string const& creator,
39+
std::string const& prefix,
40+
std::string const& database,
41+
std::string const& collection,
42+
std::string const& shard,
43+
std::initializer_list<std::string> const&);
44+
45+
46+
AddFollower (Node const& snapshot,
47+
Agent* agent,
48+
std::string const& jobId,
49+
std::string const& creator,
50+
std::string const& prefix,
51+
std::string const& database = std::string(),
52+
std::string const& collection = std::string(),
53+
std::string const& shard = std::string(),
54+
std::vector<std::string> const& newFollowers = {});
55+
4456

4557
virtual ~AddFollower ();
4658

@@ -51,7 +63,7 @@ struct AddFollower : public Job {
5163
std::string _database;
5264
std::string _collection;
5365
std::string _shard;
54-
std::string _newFollower;
66+
std::vector<std::string> _newFollower;
5567

5668
};
5769

arangod/Agency/Inception.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,9 @@ class Inception : public Thread {
6969

7070
private:
7171

72-
/// @brief Find active agency from persisted
73-
bool activeAgencyFromPersistence();
74-
7572
/// @brief We are a restarting active RAFT agent
7673
bool restartingActiveAgent();
7774

78-
/// @brief Find active agency from command line
79-
bool activeAgencyFromCommandLine();
80-
8175
/// @brief Try to estimate good RAFT min/max timeouts
8276
bool estimateRAFTInterval();
8377

arangod/Agency/RemoveServer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ bool RemoveServer::scheduleAddFollowers() {
355355

356356
AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
357357
_jobId, _agencyPrefix, database.first, collptr.first,
358-
shard.first, newServer);
358+
shard.first, {newServer});
359359
}
360360
}
361361
}

arangod/Agency/Supervision.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -634,18 +634,29 @@ void Supervision::enforceReplication() {
634634

635635
// Enough DBServer to
636636
if (replicationFactor > shard.slice().length() &&
637-
available.size() >= replicationFactor) {
637+
available.size() > shard.slice().length()) {
638638
for (auto const& i : VPackArrayIterator(shard.slice())) {
639639
available.erase(
640640
std::remove(
641641
available.begin(), available.end(), i.copyString()),
642642
available.end());
643643
}
644-
auto randIt = available.begin();
645-
std::advance(randIt, std::rand() % available.size());
644+
645+
size_t optimal = replicationFactor - shard.slice().length();
646+
std::vector<std::string> newFollowers;
647+
for (size_t i = 0; i < optimal; ++i) {
648+
auto randIt = available.begin();
649+
std::advance(randIt, std::rand() % available.size());
650+
newFollowers.push_back(*randIt);
651+
available.erase(randIt);
652+
if (available.empty()) {
653+
break;
654+
}
655+
}
656+
646657
AddFollower(
647658
_snapshot, _agent, std::to_string(_jobId++), "supervision",
648-
_agencyPrefix, db_.first, col_.first, shard_.first, *randIt);
659+
_agencyPrefix, db_.first, col_.first, shard_.first, newFollowers);
649660
}
650661
}
651662
}

0 commit comments

Comments
 (0)
0