8000 Fix an agency supervision bug. by neunhoef · Pull Request #11356 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Fix an agency supervision bug. #11356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
devel
-----

* Fixed a bug in the agency supervision, which ignored the `failoverCandidates`
field.

* Fixed a bug in the agency supervision, which could declare an already FAILED
dbserver temporarily as GOOD again after an agency leader change.

* Added INTERLEAVE AQL function.

* Upgraded bundled RocksDB library to version 6.8.0.
Expand Down
2 changes: 1 addition & 1 deletion arangod/Agency/Job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ std::string Job::findNonblockedCommonHealthyInSyncFollower( // Which is in "GOO
auto sharedPath = db + "/" + clone.collection + "/";
auto currentShardPath = curColPrefix + sharedPath + clo 8000 ne.shard + "/servers";
auto currentFailoverCandidatesPath =
curColPrefix + sharedPath + clone.shard + "/servers";
curColPrefix + sharedPath + clone.shard + "/failoverCandidates";
auto plannedShardPath = planColPrefix + sharedPath + "shards/" + clone.shard;

// start up race condition ... current might not have everything in plan
Expand Down
95 changes: 69 additions & 26 deletions arangod/Agency/Supervision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,30 +588,66 @@ std::vector<check_t> Supervision::check(std::string const& type) {
HealthRecord persist(shortName, endpoint, hostId, engine, serverVersion, externalEndpoint);

// Get last health entries from transient and persistent key value stores
bool transientHealthRecordFound = true;
if (_transient.has(healthPrefix + serverID)) {
transist = _transient.hasAsNode(healthPrefix + serverID).first;
} else {
// In this case this is the first time we look at this server during our
// new leadership. So we do not touch the persisted health record and
// only create a new transient health record.
transientHealthRecordFound = false;
}
if (snapshot().has(healthPrefix + serverID)) {
persist = snapshot().hasAsNode(healthPrefix + serverID).first;
}

// Here is an important subtlety: We will derive the health status of this
// server a bit further down by looking at the time when we saw the last
// heartbeat. The heartbeat is stored in transient in `Sync/ServerStates`.
// It has a timestamp, however, this was taken on the other server, so we
// cannot trust this time stamp because of possible clock skew. Since we
// do not actually know when this heartbeat came in, we have to proceed as
// follows: We make a copy of the `syncTime` and store it into the health
// record in transient `Supervision/Health`. If we detect a difference, it
// is the first time we have seen the new heartbeat and we can then take
// a reading of our local system clock and use that time. However, if we
// do not yet have a previous reading in our transient health record,
// we must not touch the persisted health record at all. This is what the
// flag `transientHealthRecordFound` means.

// New health record (start with old add current information from sync)
// Sync.time is copied to Health.syncTime
// Sync.status is copied to Health.syncStatus
std::string syncTime =
_transient.has(syncPrefix + serverID)
? _transient.hasAsString(syncPrefix + serverID + "/time").first
: timepointToString(std::chrono::system_clock::time_point());
std::string syncStatus =
_transient.has(syncPrefix + serverID)
? _transient.hasAsString(syncPrefix + serverID + "/status").first
: "UNKNOWN";

// Last change registered in sync (transient != sync)
// Either now or value in transient
auto lastAckedTime = (syncTime != transist.syncTime)
? startTimeLoop
: stringToTimepoint(transist.lastAcked);
std::string syncTime;
std::string syncStatus;
bool heartBeatVisible = _transient.has(syncPrefix + serverID);
if (heartBeatVisible) {
syncTime = _transient.hasAsString(syncPrefix + serverID + "/time").first;
syncStatus = _transient.hasAsString(syncPrefix + serverID + "/status").first;
} else {
syncTime = timepointToString(std::chrono::system_clock::time_point()); // beginning of time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the universe is ~50 years old

syncStatus = "UNKNOWN";
}

// Compute the time when we last discovered a new heartbeat from that server:
std::chrono::system_clock::time_point lastAckedTime;
if (heartBeatVisible) {
if (transientHealthRecordFound) {
lastAckedTime = (syncTime != transist.syncTime)
? startTimeLoop
: stringToTimepoint(transist.lastAcked);
} else {
// in this case we do no really know when this heartbeat came in,
// however, it must have been after we became a leader, and since we
// do not have a transient health record yet, we just assume that we
// got it recently and set it to "now". Note that this will not make
// a FAILED server "GOOD" again, since we do not touch the persisted
// health record if we do not have a transient health record yet.
lastAckedTime = startTimeLoop;
}
} else {
lastAckedTime = std::chrono::system_clock::time_point();
}
transist.lastAcked = timepointToString(lastAckedTime);
transist.syncTime = syncTime;
transist.syncStatus = syncStatus;
Expand All @@ -623,34 +659,41 @@ std::vector<check_t> Supervision::check(std::string const& type) {
transist.hostId = hostId;
transist.endpoint = endpoint;

// Calculate elapsed since lastAcked
auto elapsed = std::chrono::duration<double>(startTimeLoop - lastAckedTime);
// We have now computed a new transient health record under all circumstances.

bool changed;
if (transientHealthRecordFound) {
// Calculate elapsed since lastAcked
auto elapsed = std::chrono::duration<double>(startTimeLoop - lastAckedTime);

if (elapsed.count() <= _okThreshold) {
transist.status = Supervision::HEALTH_STATUS_GOOD;
} else if (elapsed.count() <= _gracePeriod) {
transist.status = Supervision::HEALTH_STATUS_BAD;
if (elapsed.count() <= _okThreshold) {
transist.status = Supervision::HEALTH_STATUS_GOOD;
} else if (elapsed.count() <= _gracePeriod) {
transist.status = Supervision::HEALTH_STATUS_BAD;
} else {
transist.status = Supervision::HEALTH_STATUS_FAILED;
}

// Status changed?
changed = transist.statusDiff(persist);
} else {
transist.status = Supervision::HEALTH_STATUS_FAILED;
transist.status = persist.status;
changed = false;
}

// Status changed?
bool changed = transist.statusDiff(persist);

// Take necessary actions if any
std::shared_ptr<VPackBuilder> envelope;
if (changed) {
LOG_TOPIC("bbbde", DEBUG, Logger::SUPERVISION)
<< "Status of server " << serverID << " has changed from "
<< persist.status << " to " << transist.status;
handleOnStatus(_agent, snapshot(), persist, transist, serverID, _jobId, envelope);
persist = transist; // Now copy Status, SyncStatus from transient to persited
} else {
LOG_TOPIC("44253", TRACE, Logger::SUPERVISION)
<< "Health of server " << machine.first << " remains " << transist.status;
}

persist = transist; // Now copy Status, SyncStatus from transient to persited

// Transient report
std::shared_ptr<Builder> tReport = std::make_shared<Builder>();
{
Expand Down
0