8000 Leader updates Current precondition fixes. (#9410) · Mars2018/arangodb@7aa0c19 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7aa0c19

Browse files
authored
Leader updates Current precondition fixes. (arangodb#9410)
* Better logging and error reporting. * Preconditions for FollowerInfo. * Preconditions when updating Current as leader. * Change a log level. * Fix unit tests. * CHANGELOG. * LOG_TOPIC ids. * Fix a log id. * Fix Windows compilation.
1 parent 595728e commit 7aa0c19

File tree

9 files changed

+215
-91
lines changed

9 files changed

+215
-91
lines changed

CHANGELOG

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ devel
1616

1717
* Fixed agency bug with election lock step.
1818

19+
* Fixed some error reporting and logging in Maintenance.
20+
21+
* Fixed an error condition in which an ex-leader for a short still believed
22+
to be the leader and wrongly reported to Current.
23+
1924

2025
v3.5.0-rc.4 (2019-06-15)
2126
------------------------

arangod/Cluster/DBServerAgencySync.cpp

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,6 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
183183
VPackBuilder local;
184184
Result glc = getLocalCollections(local);
185185
if (!glc.ok()) {
186-
// FIXMEMAINTENANCE: if this fails here, then result is empty, is this
187-
// intended? I also notice that there is another Result object "tmp"
188-
// that is going to eat bad results in few lines later. Again, is
189-
// that the correct action? If so, how about supporting comments in
190-
// the code for both.
191186
result.errorMessage = "Could not do getLocalCollections for phase 1: '";
192187
result.errorMessage.append(glc.errorMessage()).append("'");
193188
return result;
@@ -315,6 +310,30 @@ DBServerAgencySyncResult DBServerAgencySync::execute() {
315310
result = DBServerAgencySyncResult(false, "Error in phase 2: " + tmp.errorMessage(),
316311
0, 0);
317312
}
313+
} else {
314+
// This code should never run, it is only there to debug problems if
315+
// we mess up in other places.
316+
result.errorMessage = "Report from phase 1 and 2 was no object.";
317+
try {
318+
std::string json = report.toJson();
319+
LOG_TOPIC("65fde", WARN, Logger::MAINTENANCE) << "Report from phase 1 and 2 was: " << json;
320+
} catch(std::exception const& exc) {
321+
LOG_TOPIC("54de2", WARN, Logger::MAINTENANCE)
322+
<< "Report from phase 1 and 2 could not be dumped to JSON, error: "
323+
<< exc.what() << ", head byte:" << report.head();
324+
uint64_t l = 0;
325+
try {
326+
l = report.byteSize();
327+
LOG_TOPIC("54dda", WARN, Logger::MAINTENANCE)
328+
<< "Report from phase 1 and 2, byte size: " << l;
329+
LOG_TOPIC("67421", WARN, Logger::MAINTENANCE)
330+
<< "Bytes: "
331+
<< arangodb::basics::StringUtils::encodeHex((char const*) report.start(), l);
332+
} catch(...) {
333+
LOG_TOPIC("76124", WARN, Logger::MAINTENANCE)
334+
<< "Report from phase 1 and 2, byte size throws.";
335+
}
336+
}
318337
}
319338
} else {
320339
result.errorMessage = "Report from phase 1 and 2 was not closed.";

arangod/Cluster/DBServerAgencySync.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "Basics/VelocyPackHelper.h"
3030

3131
namespace arangodb {
32+
3233
class HeartbeatThread;
3334

3435
struct DBServerAgencySyncResult {
@@ -45,9 +46,6 @@ struct DBServerAgencySyncResult {
4546

4647
DBServerAgencySyncResult(bool s, std::string const& e, uint64_t p, uint64_t c)
4748
: success(s), errorMessage(e), planVersion(p), currentVersion(c) {}
48-
49-
DBServerAgencySyncResult(DBServerAgencySyncResult const& other) = default;
50-
DBServerAgencySyncResult& operator=(DBServerAgencySyncResult const& other) = default;
5149
};
5250

5351
class DBServerAgencySync {

arangod/Cluster/FollowerInfo.cpp

Lines changed: 118 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -116,24 +116,31 @@ Result FollowerInfo::add(ServerID const& sid) {
116116
}
117117
#endif
118118
}
119+
std::string planId = std::to_string(_docColl->planId());
119120
// Now tell the agency, path is
120121
// Current/Collections/<dbName>/<collectionID>/<shardID>
121-
std::string path = "Current/Collections/";
122-
path += _docColl->vocbase().name();
123-
path += "/";
124-
path += std::to_string(_docColl->planId());
125-
path += "/";
126-
path += _docColl->name();
122+
std::string path = _docColl->vocbase().name() + "/" + planId + "/";
123+
std::string curPath = "Current/Collections/" + path + _docColl->name();
124+
// We also need the corresponding Plan entry, path is
125+
// Plan/Collections/<dbName>/<collectionID>/shards/<shardID>
126+
std::string planPath = "Plan/Collections/" + path + "shards/" +
127+
_docColl->name();
127128
AgencyComm ac;
128129
double startTime = TRI_microtime();
129130
do {
130-
AgencyCommResult res = ac.getValues(path);
131+
AgencyReadTransaction trx(std::vector<std::string>(
132+
{AgencyCommManager::path(planPath), AgencyCommManager::path(curPath)}));
133+
AgencyCommResult res = ac.sendTransactionWithFailover(trx);
131134

132135
if (res.successful()) {
136+
// Let's look at the results, note that both can be None!
137+
velocypack::Slice planEntry = res.slice()[0].get(
138+
std::vector<std::string>(
139+
{AgencyCommManager::path(), "Plan", "Collections",
140+
_docColl->vocbase().name(), planId, "shards", _docColl->name()}));
133141
velocypack::Slice currentEntry = res.slice()[0].get(std::vector<std::string>(
134142
{AgencyCommManager::path(), "Current", "Collections",
135-
_docColl->vocbase().name(), std::to_string(_docColl->planId()),
136-
_docColl->name()}));
143+
_docColl->vocbase().name(), planId, _docColl->name()}));
137144

138145
if (!currentEntry.isObject()) {
139146
LOG_TOPIC("b753d", ERR, Logger::CLUSTER)
@@ -142,31 +149,47 @@ Result FollowerInfo::add(ServerID const& sid) {
142149
LOG_TOPIC("568de", ERR, Logger::CLUSTER) << "Found: " << currentEntry.toJson();
143150
}
144151
} else {
145-
auto newValue = newShardEntry(currentEntry, sid, true);
146-
std::string key = "Current/Collections/" + _docColl->vocbase().name() +
147-
"/" + std::to_string(_docColl->planId()) + "/" +
148-
_docColl->name();
149-
AgencyWriteTransaction trx;
150-
trx.preconditions.push_back(
151-
AgencyPrecondition(key, AgencyPrecondition::Type::VALUE, currentEntry));
152-
trx.operations.push_back(
153-
AgencyOperation(key, AgencyValueOperationType::SET, newValue.slice()));
154-
trx.operations.push_back(
155-
AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP));
156-
AgencyCommResult res2 = ac.sendTransactionWithFailover(trx);
157-
if (res2.successful()) {
158-
// we are finished!
159-
return {TRI_ERROR_NO_ERROR};
152+
if (!planEntry.isArray() || planEntry.length() == 0 ||
153+
!planEntry[0].isString() ||
154+
!planEntry[0].isEqualString(ServerState::instance()->getId())) {
155+
LOG_TOPIC("54555", INFO, Logger::CLUSTER)
156+
<< "FollowerInfo::add, did not find myself in Plan: " << path
157+
<< " (can happen when the leader changed recently).";
158+
if (!planEntry.isNone()) {
159+
LOG_TOPIC("66762", INFO, Logger::CLUSTER) << "Found: " << planEntry.toJson();
160+
}
161+
return {TRI_ERROR_CLUSTER_NOT_LEADER};
162+
} else {
163+
auto newValue = newShardEntry(currentEntry, sid, true);
164+
AgencyWriteTransaction trx;
165+
trx.preconditions.push_back(
166+
AgencyPrecondition(curPath, AgencyPrecondition::Type::VALUE, currentEntry));
167+
trx.preconditions.push_back(
168+
AgencyPrecondition(planPath, AgencyPrecondition::Type::VALUE, planEntry));
169+
trx.operations.push_back(
170+
AgencyOperation(curPath, AgencyValueOperationType::SET, newValue.slice()));
171+
trx.operations.push_back(
172+
AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP));
173+
AgencyCommResult res2 = ac.sendTransactionWithFailover(trx);
174+
if (res2.successful()) {
175+
return {TRI_ERROR_NO_ERROR};
176+
} else {
177+
LOG_TOPIC("daeda", WARN, Logger::CLUSTER)
178+
<< "FollowerInfo::add, could not cas key " << path;
179+
}
160180
}
161181
}
162182
} else {
163-
LOG_TOPIC("dcf54", ERR, Logger::CLUSTER)
164-
<< "FollowerInfo::add, could not read " << path << " in agency";
183+
LOG_TOPIC("dcf54", WARN, Logger::CLUSTER)
184+
<< "FollowerInfo::add, could not read " << planPath
185+
<< " and " << curPath << " in agency.";
165186
}
166187
std::this_thread::sleep_for(std::chrono::microseconds(500000));
167-
} while (TRI_microtime() < startTime + 30 &&
188+
} while (TRI_microtime() < startTime + 3600 &&
168189
application_features::ApplicationServer::isRetryOK());
169-
190+
// This is important, give it 1h if needed. We really do not want to get
191+
// into the position to not accept a shard getting-in-sync just because
192+
// we cannot talk to the agency temporarily.
170193
int errorCode = (application_features::ApplicationServer::isRetryOK()) ? TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED : TRI_ERROR_SHUTTING_DOWN;
171194
std::string errorMessage = "unable to add follower in agency, timeout in agency CAS operation for key " + path + ": " + TRI_errno_string(errorCode);
172195
LOG_TOPIC("6295b", ERR, Logger::CLUSTER) << errorMessage;
@@ -177,7 +200,9 @@ Result FollowerInfo::add(ServerID const& sid) {
177200
////////////////////////////////////////////////////////////////////////////////
178201
/// @brief remove a follower from a shard, this is only done by the
179202
/// server if a synchronous replication request fails. This reports to
180-
/// the agency under `/Current` but in asynchronous "fire-and-forget"
203+
/// the agency under `/Current`. This method can fail, which is critical,
204+
/// because we cannot drop a follower ourselves and not report this to the
205+
/// agency, since then a failover to a not-in-sync follower might happen.
181206
/// way. The method fails silently, if the follower information has
182207
/// since been dropped (see `dropFollowerInfo` below).
183208
////////////////////////////////////////////////////////////////////////////////
@@ -229,19 +254,28 @@ Result FollowerInfo::remove(ServerID const& sid) {
229254
return {TRI_ERROR_NO_ERROR};
230255
}
231256
#endif
257+
258+
std::string planId = std::to_string(_docColl->planId());
232259
// Now tell the agency, path is
233260
// Current/Collections/<dbName>/<collectionID>/<shardID>
234-
std::string path = "Current/Collections/";
235-
path += _docColl->vocbase().name();
236-
path += "/";
237-
path += std::to_string(_docColl->planId());
238-
path += "/";
239-
path += _docColl->name();
261+
std::string path = _docColl->vocbase().name() + "/" + planId + "/";
262+
std::string curPath = "Current/Collections/" + path + _docColl->name();
263+
// We also need the corresponding Plan entry, path is
264+
// Plan/Collections/<dbName>/<collectionID>/shards/<shardID>
265+
std::string planPath = "Plan/Collections/" + path + "shards/" +
266+
_docColl->name();
240267
AgencyComm ac;
241268
double startTime = TRI_microtime();
242269
do {
243-
AgencyCommResult res = ac.getValues(path);
270+
AgencyReadTransaction trx(std::vector<std::string>(
271+
{AgencyCommManager::path(planPath), AgencyCommManager::path(curPath)}));
272+
AgencyCommResult res = ac.sendTransactionWithFailover(trx);
244273
if (res.successful()) {
274+
// Let's look at the results, note that both can be None!
275+
velocypack::Slice planEntry = res.slice()[0].get(
276+
std::vector<std::string>(
277+
{AgencyCommManager::path(), "Plan", "Collections",
278+
_docColl->vocbase().name(), planId, "shards", _docColl->name()}));
245279
velocypack::Slice currentEntry = res.slice()[0].get(std::vector<std::string>(
246280
{AgencyCommManager::path(), "Current", "Collections",
247281
_docColl->vocbase().name(), std::to_string(_docColl->planId()),
@@ -254,34 +288,61 @@ Result FollowerInfo::remove(ServerID const& sid) {
254288
LOG_TOPIC("57c84", ERR, Logger::CLUSTER) << "Found: " << currentEntry.toJson();
255289
}
256290
} else {
257-
auto newValue = newShardEntry(currentEntry, sid, false);
258-
std::string key = "Current/Collections/" + _docColl->vocbase().name() +
259-
"/" + std::to_string(_docColl->planId()) + "/" +
260-
_docColl->name();
261-
AgencyWriteTransaction trx;
262-
trx.preconditions.push_back(
263-
AgencyPrecondition(key, AgencyPrecondition::Type::VALUE, currentEntry));
264-
trx.operations.push_back(
265-
AgencyOperation(key, AgencyValueOperationType::SET, newValue.slice()));
266-
trx.operations.push_back(
267-
AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP));
268-
AgencyCommResult res2 = ac.sendTransactionWithFailover(trx);
269-
if (res2.successful()) {
270-
// we are finished
271-
LOG_TOPIC("be0cb", DEBUG, Logger::CLUSTER) << "Removing follower " << sid << " from "
291+
if (!planEntry.isArray() || planEntry.length() == 0 ||
292+
!planEntry[0].isString() ||
293+
!planEntry[0].isEqualString(ServerState::instance()->getId())) {
< 10000 /code>
294+
LOG_TOPIC("42231", INFO, Logger::CLUSTER)
295+
<< "FollowerInfo::remove, did not find myself in Plan: " << path
296+
<< " (can happen when the leader changed recently).";
297+
if (!planEntry.isNone()) {
298+
LOG_TOPIC("ffede", INFO, Logger::CLUSTER) << "Found: " << planEntry.toJson();
299+
}
300+
return {TRI_ERROR_CLUSTER_NOT_LEADER};
301+
} else {
302+
auto newValue = newShardEntry(currentEntry, sid, false);
303+
std::string key = "Current/Collections/" + _docColl->vocbase().name() +
304+
"/" + std::to_string(_docColl->planId()) + "/" +
305+
_docColl->name();
306+
AgencyWriteTransaction trx;
307+
trx.preconditions.push_back(
308+
AgencyPrecondition(key, AgencyPrecondition::Type::VALUE, currentEntry));
309+
trx.preconditions.push_back(
310+
AgencyPrecondition(planPath, AgencyPrecondition::Type::VALUE, planEntry));
311+
trx.operations.push_back(
312+
AgencyOperation(key, AgencyValueOperationType::SET, newValue.slice()));
313+
trx.operations.push_back(
314+
AgencyOperation("Current/Version", AgencySimpleOperationType::INCREMENT_OP));
315+
AgencyCommResult res2 = ac.sendTransactionWithFailover(trx);
316+
if (res2.successful()) {
317+
// we are finished
318+
LOG_TOPIC("be0cb", DEBUG, Logger::CLUSTER) << "Removing follower " << sid << " from "
272319
<< _docColl->name() << "succeeded";
273-
return {TRI_ERROR_NO_ERROR};
320+
return {TRI_ERROR_NO_ERROR};
321+
} else {
322+
LOG_TOPIC("67778", WARN, Logger::CLUSTER)
323+
<< "FollowerInfo::remove, could not cas key " << path
324+
<< ". status code: " << res2._statusCode
325+
<< ", incriminating body: " << res2.bodyRef();
326+
}
274327
}
275328
}
276329
} else {
277-
LOG_TOPIC("b7333", ERR, Logger::CLUSTER)
278-
<< "FollowerInfo::remove, could not read " << path << " in agency.";
330+
LOG_TOPIC("b7333", WARN, Logger::CLUSTER)
331+
<< "FollowerInfo::remove, could not read " << planPath
332+
<< " and " << curPath << " in agency.";
279333
}
280334
std::this_thread::sleep_for(std::chrono::microseconds(500000));
281-
} while (TRI_microtime() < startTime + 30 &&
335+
} while (TRI_microtime() < startTime + 7200 &&
282336
application_features::ApplicationServer::isRetryOK());
283-
284-
// rollback
337+
// This is important, give it 2h if needed. We really do not want to get
338+
// into the position to fail to drop a follower, just because we cannot
339+
// talk to the agency temporarily. The worst would be to drop the follower
340+
// locally but not report the fact to the agency. The second worst is to
341+
// not be able to drop the follower, despite the fact that a replication
342+
// was not successful. All else is less dramatic. Therefore we try for
343+
// a long time.
344+
345+
// rollback:
285346
_followers = _oldFollowers;
286347

287348
int errorCode = (application_features::ApplicationServer::isRetryOK()) ? TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED : TRI_ERROR_SHUTTING_DOWN;

0 commit comments

Comments
 (0)
0