8000 cleanoutServer Bug Fix by maierlars · Pull Request #6537 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

cleanoutServer Bug Fix #6537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion arangod/Agency/CleanOutServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
serversCopy.end());
}

bool isLeader = (found == 0);

// Among those a random destination:
std::string toServer;
if (serversCopy.empty()) {
Expand All @@ -403,7 +405,7 @@ bool CleanOutServer::scheduleMoveShards(std::shared_ptr<Builder>& trx) {
// Schedule move into trx:
MoveShard(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
_jobId, database.first, collptr.first,
shard.first, _server, toServer, found == 0)
shard.first, _server, toServer, isLeader, false)
.create(trx);
}
}
Expand Down
44 changes: 41 additions & 3 deletions arangod/Agency/MoveShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@
using namespace arangodb;
using namespace arangodb::consensus;

MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
std::string const& jobId, std::string const& creator,
std::string const& database,
std::string const& collection, std::string const& shard,
std::string const& from, std::string const& to,
bool isLeader, bool remainsFollower)
: Job(NOTFOUND, snapshot, agent, jobId, creator),
_database(database),
_collection(collection),
_shard(shard),
_from(id(from)),
_to(id(to)),
_isLeader(isLeader), // will be initialized properly when information known
_remainsFollower(remainsFollower)
{ }

MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
std::string const& jobId, std::string const& creator,
std::string const& database,
Expand All @@ -42,7 +58,8 @@ MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
_shard(shard),
_from(id(from)),
_to(id(to)),
_isLeader(isLeader) // will be initialized properly when information known
_isLeader(isLeader), // will be initialized properly when information known
_remainsFollower(isLeader)
{ }

MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
Expand All @@ -57,6 +74,7 @@ MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
auto tmp_to = _snapshot.hasAsString(path + "toServer");
auto tmp_shard = _snapshot.hasAsString(path + "shard");
auto tmp_isLeader = _snapshot.hasAsSlice(path + "isLeader");
auto tmp_remainsFollower = _snapshot.hasAsSlice(path + "remainsFollower");
auto tmp_creator = _snapshot.hasAsString(path + "creator");

if (tmp_database.second && tmp_collection.second && tmp_from.second && tmp_to.second
Expand All @@ -67,6 +85,7 @@ MoveShard::MoveShard(Node const& snapshot, AgentInterface* agent,
_to = tmp_to.first;
_shard = tmp_shard.first;
_isLeader = tmp_isLeader.first.isTrue();
_remainsFollower = tmp_remainsFollower.second ? tmp_remainsFollower.first.isTrue() : _isLeader;
_creator = tmp_creator.first;
} else {
std::stringstream err;
Expand Down Expand Up @@ -130,6 +149,7 @@ bool MoveShard::create(std::shared_ptr<VPackBuilder> envelope) {
_jb->add("fromServer", VPackValue(_from));
_jb->add("toServer", VPackValue(_to));
_jb->add("isLeader", VPackValue(_isLeader));
_jb->add("remainsFollower", VPackValue(_remainsFollower));
_jb->add("jobId", VPackValue(_jobId));
_jb->add("timeCreated", VPackValue(now));
}
Expand Down Expand Up @@ -276,6 +296,11 @@ bool MoveShard::start() {
return false;
}

if (!_isLeader && _remainsFollower) {
finish("", "", false, "remainsFollower is invalid without isLeader");
return false;
}

// Compute group to move shards together:
std::vector<Job::shard_t> shardsLikeMe
= clones(_snapshot, _database, _collection, _shard);
Expand Down Expand Up @@ -354,7 +379,7 @@ bool MoveShard::start() {
addPreconditionUnchanged(pending, planPath, planned);
addPreconditionShardNotBlocked(pending, _shard);
addPreconditionServerNotBlocked(pending, _to);
addPreconditionServerHealth(pending, _to, "GOOD");
addPreconditionServerHealth(pending, _to, "GOOD");
addPreconditionUnchanged(pending, failedServersPrefix, failedServers);
addPreconditionUnchanged(pending, cleanedPrefix, cleanedServers);
} // precondition done
Expand Down Expand Up @@ -507,6 +532,7 @@ JOB_STATUS MoveShard::pendingLeader() {
trx.add(srv);
}
}
// add the old leader as follower in case of a rollback
trx.add(VPackValue(_from));
}
// Precondition: Plan still as it was
Expand Down Expand Up @@ -545,7 +571,19 @@ JOB_STATUS MoveShard::pendingLeader() {
{ VPackObjectBuilder trxObject(&trx);
VPackObjectBuilder preObject(&pre);
doForAllShards(_snapshot, _database, shardsLikeMe,
[&pre](Slice plan, Slice current, std::string& planPath) {
[&trx, &pre, this](Slice plan, Slice current, std::string& planPath) {
if (!_remainsFollower) {
// Remove _from from the list of follower
trx.add(VPackValue(planPath));
{ VPackArrayBuilder guard(&trx);
for (auto 10000 const& srv : VPackArrayIterator(plan)) {
if (!srv.isEqualString(_from)) {
trx.add(srv);
}
}
}
}

// Precondition: Plan still as it was
pre.add(VPackValue(planPath));
{ VPackObjectBuilder guard(&pre);
Expand Down
13 changes: 12 additions & 1 deletion arangod/Agency/MoveShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,17 @@ namespace arangodb {
namespace consensus {

struct MoveShard : public Job {


MoveShard(Node const& snapshot, AgentInterface* agent, std::string const& jobId,
std::string const& creator,
std::string const& database,
std::string const& collection,
std::string const& shard,
std::string const& from,
std::string const& to,
bool isLeader,
bool remainsFollower);

MoveShard(Node const& snapshot, AgentInterface* agent, std::string const& jobId,
std::string const& creator,
std::string const& database,
Expand Down Expand Up @@ -61,6 +71,7 @@ struct MoveShard : public Job {
std::string _from;
std::string _to;
bool _isLeader;
bool _remainsFollower;
};
}
}
Expand Down
31 changes: 16 additions & 15 deletions tests/Agency/CleanOutServerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ VPackBuilder createMoveShardJob() {
builder.add("fromServer", VPackValue("test"));
builder.add("toServer", VPackValue("test2"));
builder.add("isLeader", VPackValue(true));
builder.add("remainsFollower", VPackValue(false));
builder.add("collection", VPackValue("test"));
builder.add("shard", VPackValue("s99"));
builder.add("creator", VPackValue("unittest"));
Expand All @@ -77,7 +78,7 @@ VPackBuilder createMoveShardJob() {

void checkFailed(JOB_STATUS status, query_t const& q) {
INFO("WRITE: " << q->toJson());

REQUIRE(std::string(q->slice().typeName()) == "array" );
REQUIRE(q->slice().length() == 1);
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
Expand All @@ -100,7 +101,7 @@ Node createNodeFromBuilder(VPackBuilder const& builder) {
VPackBuilder opBuilder;
{ VPackObjectBuilder a(&opBuilder);
opBuilder.add("new", builder.slice()); }

Node node("");
node.handle<SET>(opBuilder.slice());
return node;
Expand All @@ -112,11 +113,11 @@ Builder createBuilder(char const* c) {
options.checkAttributeUniqueness = true;
VPackParser parser(&options);
parser.parse(c);

VPackBuilder builder;
builder.add(parser.steal()->slice());
return builder;

}

Node createNode(char const* c) {
Expand Down Expand Up @@ -156,7 +157,7 @@ VPackBuilder createJob(std::string const& server) {
TEST_CASE("CleanOutServer", "[agency][supervision]") {
RandomGenerator::initialize(RandomGenerator::RandomType::MERSENNE);
auto baseStructure = createRootNode();

write_ret_t fakeWriteResult {true, "", std::vector<bool> {true}, std::vector<index_t> {1}};
auto transBuilder = std::make_shared<Builder>();
{ VPackArrayBuilder a(transBuilder.get());
Expand Down Expand Up @@ -338,7 +339,7 @@ SECTION("cleanout server should fail if the server is already cleaned") {
}
return builder;
};

Mock<AgentInterface> mockAgent;
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
checkFailed(JOB_STATUS::TODO, q);
Expand Down Expand Up @@ -385,7 +386,7 @@ SECTION("cleanout server should fail if the server is failed") {
}
return builder;
};

Mock<AgentInterface> mockAgent;
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
checkFailed(JOB_STATUS::TODO, q);
Expand Down Expand Up @@ -434,7 +435,7 @@ SECTION("cleanout server should fail if the replicationFactor is too big for any
}
return builder;
};

Mock<AgentInterface> mockAgent;
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
checkFailed(JOB_STATUS::TODO, q);
Expand Down Expand Up @@ -484,7 +485,7 @@ SECTION("cleanout server should fail if the replicationFactor is too big for any
}
return builder;
};

Mock<AgentInterface> mockAgent;
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
checkFailed(JOB_STATUS::TODO, q);
Expand Down Expand Up @@ -529,11 +530,11 @@ SECTION("a cleanout server job should move into pending when everything is ok")
}
return builder;
};

Mock<AgentInterface> mockAgent;
When(Method(mockAgent, write)).Do([&](query_t const& q, bool d) -> write_ret_t {
INFO("WRITE: " << q->toJson());

REQUIRE(std::string(q->slice().typeName()) == "array" );
REQUIRE(q->slice().length() == 1);
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
Expand Down Expand Up @@ -609,14 +610,14 @@ SECTION("a cleanout server job should abort after a long timeout") {
}
return builder;
};

int qCount = 0;
Mock<AgentInterface> mockAgent;
When(Method(mockAgent, write)).AlwaysDo([&](query_t const& q, bool d) -> write_ret_t {
if (qCount++ == 0) {
// first the moveShard job should be aborted
INFO("WRITE FIRST: " << q->toJson());

REQUIRE(std::string(q->slice().typeName()) == "array" );
REQUIRE(q->slice().length() == 1);
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
Expand Down Expand Up @@ -720,7 +721,7 @@ SECTION("once all subjobs were successful then the job should be finished") {
Mock<AgentInterface> mockAgent;
When(Method(mockAgent 8000 , write)).Do([&](query_t const& q, bool d) -> write_ret_t {
INFO("WRITE: " << q->toJson());

REQUIRE(std::string(q->slice().typeName()) == "array" );
REQUIRE(q->slice().length() == 1);
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
Expand Down Expand Up @@ -827,7 +828,7 @@ SECTION("when the cleanout server job is aborted all subjobs should be aborted t
if (qCount++ == 0) {
// first the moveShard job should be aborted
INFO("WRITE FIRST: " << q->toJson());

REQUIRE(std::string(q->slice().typeName()) == "array" );
REQUIRE(q->slice().length() == 1);
REQUIRE(std::string(q->slice()[0].typeName()) == "array");
Expand Down
18 changes: 9 additions & 9 deletions tests/Agency/FailedFollowerTest.cpp
17A7
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Node createNodeFromBuilder(VPackBuilder const& builder) {
VPackBuilder opBuilder;
{ VPackObjectBuilder a(&opBuilder);
opBuilder.add("new", builder.slice()); }

Node node("");
node.handle<SET>(opBuilder.slice());
return node;
Expand All @@ -100,11 +100,11 @@ Builder createBuilder(char const* c) {
options.checkAttributeUniqueness = true;
VPackParser parser(&options);
parser.parse(c);

VPackBuilder builder;
builder.add(parser.steal()->slice());
return builder;

}

Node createNode(char const* c) {
Expand All @@ -121,13 +121,13 @@ TEST_CASE("FailedFollower", "[agency][supervision]") {
auto transBuilder = std::make_shared<Builder>();
{ VPackArrayBuilder a(transBuilder.get());
transBuilder->add(VPackValue((uint64_t)1)); }



auto baseStructure = createRootNode();
write_ret_t fakeWriteResult {true, "", std::vector<bool> {true}, std::vector<index_t> {1}};
trans_ret_t fakeTransResult {true, "", 1, 0, transBuilder};

SECTION("creating a job should create a job in todo") {
Mock<AgentInterface> mockAgent;

Expand Down Expand Up @@ -396,7 +396,7 @@ SECTION("if there is no healthy free server when trying to start just wait") {
REQUIRE(builder);
INFO("Agency: " << builder->toJson());
Node agency = createNodeFromBuilder(*builder);

// nothing should happen
Mock<AgentInterface> mockAgent;
AgentInterface &agent = mockAgent.get();
Expand Down Expand Up @@ -429,7 +429,7 @@ SECTION("abort any moveShard job blocking the shard and start") {
AgentInterface &moveShardAgent = moveShardMockAgent.get();
auto moveShard = MoveShard(
baseStructure(PREFIX), &moveShardAgent, "2", "strunz", DATABASE,
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true);
COLLECTION, SHARD, SHARD_LEADER, FREE_SERVER, true, true);
moveShard.create();

std::string jobId = "1";
Expand Down
Loading
0