@@ -116,24 +116,31 @@ Result FollowerInfo::add(ServerID const& sid) {
116
116
}
117
117
#endif
118
118
}
119
+ std::string planId = std::to_string (_docColl->planId ());
119
120
// Now tell the agency, path is
120
121
// Current/Collections/<dbName>/<collectionID>/<shardID>
121
- std::string path = " Current/Collections /" ;
122
- path += _docColl->vocbase (). name ();
123
- path += " / " ;
124
- path += std::to_string (_docColl-> planId ());
125
- path += " / " ;
126
- path += _docColl->name ();
122
+ std::string path = _docColl-> vocbase (). name () + " / " + planId + " /" ;
123
+ std::string curPath = " Current/Collections/ " + path + _docColl->name ();
124
+ // We also need the corresponding Plan entry, path is
125
+ // Plan/Collections/<dbName>/<collectionID>/shards/<shardID>
126
+ std::string planPath = " Plan/Collections/ " + path + " shards/ " +
127
+ _docColl->name ();
127
128
AgencyComm ac;
128
129
double startTime = TRI_microtime ();
129
130
do {
130
- AgencyCommResult res = ac.getValues (path);
131
+ AgencyReadTransaction trx (std::vector<std::string>(
132
+ {AgencyCommManager::path (planPath), AgencyCommManager::path (curPath)}));
133
+ AgencyCommResult res = ac.sendTransactionWithFailover (trx);
131
134
132
135
if (res.successful ()) {
136
+ // Let's look at the results, note that both can be None!
137
+ velocypack::Slice planEntry = res.slice ()[0 ].get (
138
+ std::vector<std::string>(
139
+ {AgencyCommManager::path (), " Plan" , " Collections" ,
140
+ _docColl->vocbase ().name (), planId, " shards" , _docColl->name ()}));
133
141
velocypack::Slice currentEntry = res.slice ()[0 ].get (std::vector<std::string>(
134
142
{AgencyCommManager::path (), " Current" , " Collections" ,
135
- _docColl->vocbase ().name (), std::to_string (_docColl->planId ()),
136
- _docColl->name ()}));
143
+ _docColl->vocbase ().name (), planId, _docColl->name ()}));
137
144
138
145
if (!currentEntry.isObject ()) {
139
146
LOG_TOPIC (" b753d" , ERR, Logger::CLUSTER)
@@ -142,31 +149,47 @@ Result FollowerInfo::add(ServerID const& sid) {
142
149
LOG_TOPIC (" 568de" , ERR, Logger::CLUSTER) << " Found: " << currentEntry.toJson ();
143
150
}
144
151
} else {
145
- auto newValue = newShardEntry (currentEntry, sid, true );
146
- std::string key = " Current/Collections/" + _docColl->vocbase ().name () +
147
- " /" + std::to_string (_docColl->planId ()) + " /" +
148
- _docColl->name ();
149
- AgencyWriteTransaction trx;
150
- trx.preconditions .push_back (
151
- AgencyPrecondition (key, AgencyPrecondition::Type::VALUE, currentEntry));
152
- trx.operations .push_back (
153
- AgencyOperation (key, AgencyValueOperationType::SET, newValue.slice ()));
154
- trx.operations .push_back (
155
- AgencyOperation (" Current/Version" , AgencySimpleOperationType::INCREMENT_OP));
156
- AgencyCommResult res2 = ac.sendTransactionWithFailover (trx);
157
- if (res2.successful ()) {
158
- // we are finished!
159
- return {TRI_ERROR_NO_ERROR};
152
+ if (!planEntry.isArray () || planEntry.length () == 0 ||
153
+ !planEntry[0 ].isString () ||
154
+ !planEntry[0 ].isEqualString (ServerState::instance ()->getId ())) {
155
+ LOG_TOPIC (" 54555" , INFO, Logger::CLUSTER)
156
+ << " FollowerInfo::add, did not find myself in Plan: " << path
157
+ << " (can happen when the leader changed recently)." ;
158
+ if (!planEntry.isNone ()) {
159
+ LOG_TOPIC (" 66762" , INFO, Logger::CLUSTER) << " Found: " << planEntry.toJson ();
160
+ }
161
+ return {TRI_ERROR_CLUSTER_NOT_LEADER};
162
+ } else {
163
+ auto newValue = newShardEntry (currentEntry, sid, true );
164
+ AgencyWriteTransaction trx;
165
+ trx.preconditions .push_back (
166
+ AgencyPrecondition (curPath, AgencyPrecondition::Type::VALUE, currentEntry));
167
+ trx.preconditions .push_back (
168
+ AgencyPrecondition (planPath, AgencyPrecondition::Type::VALUE, planEntry));
169
+ trx.operations .push_back (
170
+ AgencyOperation (curPath, AgencyValueOperationType::SET, newValue.slice ()));
171
+ trx.operations .push_back (
172
+ AgencyOperation (" Current/Version" , AgencySimpleOperationType::INCREMENT_OP));
173
+ AgencyCommResult res2 = ac.sendTransactionWithFailover (trx);
174
+ if (res2.successful ()) {
175
+ return {TRI_ERROR_NO_ERROR};
176
+ } else {
177
+ LOG_TOPIC (" daeda" , WARN, Logger::CLUSTER)
178
+ << " FollowerInfo::add, could not cas key " << path;
179
+ }
160
180
}
161
181
}
162
182
} else {
163
- LOG_TOPIC (" dcf54" , ERR, Logger::CLUSTER)
164
- << " FollowerInfo::add, could not read " << path << " in agency" ;
183
+ LOG_TOPIC (" dcf54" , WARN, Logger::CLUSTER)
184
+ << " FollowerInfo::add, could not read " << planPath
185
+ << " and " << curPath << " in agency." ;
165
186
}
166
187
std::this_thread::sleep_for (std::chrono::microseconds (500000 ));
167
- } while (TRI_microtime () < startTime + 30 &&
188
+ } while (TRI_microtime () < startTime + 3600 &&
168
189
application_features::ApplicationServer::isRetryOK ());
169
-
190
+ // This is important, give it 1h if needed. We really do not want to get
191
+ // into the position to not accept a shard getting-in-sync just because
192
+ // we cannot talk to the agency temporarily.
170
193
int errorCode = (application_features::ApplicationServer::isRetryOK ()) ? TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED : TRI_ERROR_SHUTTING_DOWN;
171
194
std::string errorMessage = " unable to add follower in agency, timeout in agency CAS operation for key " + path + " : " + TRI_errno_string (errorCode);
172
195
LOG_TOPIC (" 6295b" , ERR, Logger::CLUSTER) << errorMessage;
@@ -177,7 +200,9 @@ Result FollowerInfo::add(ServerID const& sid) {
177
200
// //////////////////////////////////////////////////////////////////////////////
178
201
// / @brief remove a follower from a shard, this is only done by the
179
202
// / server if a synchronous replication request fails. This reports to
180
- // / the agency under `/Current` but in asynchronous "fire-and-forget"
203
+ // / the agency under `/Current`. This method can fail, which is critical,
204
+ // / because we cannot drop a follower ourselves and not report this to the
205
+ // / agency, since then a failover to a not-in-sync follower might happen.
181
206
// / way. The method fails silently, if the follower information has
182
207
// / since been dropped (see `dropFollowerInfo` below).
183
208
// //////////////////////////////////////////////////////////////////////////////
@@ -229,19 +254,28 @@ Result FollowerInfo::remove(ServerID const& sid) {
229
254
return {TRI_ERROR_NO_ERROR};
230
255
}
231
256
#endif
257
+
258
+ std::string planId = std::to_string (_docColl->planId ());
232
259
// Now tell the agency, path is
233
260
// Current/Collections/<dbName>/<collectionID>/<shardID>
234
- std::string path = " Current/Collections /" ;
235
- path += _docColl->vocbase (). name ();
236
- path += " / " ;
237
- path += std::to_string (_docColl-> planId ());
238
- path += " / " ;
239
- path += _docColl->name ();
261
+ std::string path = _docColl-> vocbase (). name () + " / " + planId + " /" ;
262
+ std::string curPath = " Current/Collections/ " + path + _docColl->name ();
263
+ // We also need the corresponding Plan entry, path is
264
+ // Plan/Collections/<dbName>/<collectionID>/shards/<shardID>
265
+ std::string planPath = " Plan/Collections/ " + path + " shards/ " +
266
+ _docColl->name ();
240
267
AgencyComm ac;
241
268
double startTime = TRI_microtime ();
242
269
do {
243
- AgencyCommResult res = ac.getValues (path);
270
+ AgencyReadTransaction trx (std::vector<std::string>(
271
+ {AgencyCommManager::path (planPath), AgencyCommManager::path (curPath)}));
272
+ AgencyCommResult res = ac.sendTransactionWithFailover (trx);
244
273
if (res.successful ()) {
274
+ // Let's look at the results, note that both can be None!
275
+ velocypack::Slice planEntry = res.slice ()[0 ].get (
276
+ std::vector<std::string>(
277
+ {AgencyCommManager::path (), " Plan" , " Collections" ,
278
+ _docColl->vocbase ().name (), planId, " shards" , _docColl->name ()}));
245
279
velocypack::Slice currentEntry = res.slice ()[0 ].get (std::vector<std::string>(
246
280
{AgencyCommManager::path (), " Current" , " Collections" ,
247
281
_docColl->vocbase ().name (), std::to_string (_docColl->planId ()),
@@ -254,34 +288,61 @@ Result FollowerInfo::remove(ServerID const& sid) {
254
288
LOG_TOPIC (" 57c84" , ERR, Logger::CLUSTER) << " Found: " << currentEntry.toJson ();
255
289
}
256
290
} else {
257
- auto newValue = newShardEntry (currentEntry, sid, false );
258
- std::string key = " Current/Collections/" + _docColl->vocbase ().name () +
259
- " /" + std::to_string (_docColl->planId ()) + " /" +
260
- _docColl->name ();
261
- AgencyWriteTransaction trx;
262
- trx.preconditions .push_back (
263
- AgencyPrecondition (key, AgencyPrecondition::Type::VALUE, currentEntry));
264
- trx.operations .push_back (
265
- AgencyOperation (key, AgencyValueOperationType::SET, newValue.slice ()));
266
- trx.operations .push_back (
267
- AgencyOperation (" Current/Version" , AgencySimpleOperationType::INCREMENT_OP));
268
- AgencyCommResult res2 = ac.sendTransactionWithFailover (trx);
269
- if (res2.successful ()) {
270
- // we are finished
271
- LOG_TOPIC (" be0cb" , DEBUG, Logger::CLUSTER) << " Removing follower " << sid << " from "
291
+ if (!planEntry.isArray () || planEntry.length () == 0 ||
292
+ !planEntry[0 ].isString () ||
293
+ !planEntry[0 ].isEqualString (ServerState::instance ()->getId ())) {
<
10000
/code>
294
+ LOG_TOPIC (" 42231" , INFO, Logger::CLUSTER)
295
+ << " FollowerInfo::remove, did not find myself in Plan: " << path
296
+ << " (can happen when the leader changed recently)." ;
297
+ if (!planEntry.isNone ()) {
298
+ LOG_TOPIC (" ffede" , INFO, Logger::CLUSTER) << " Found: " << planEntry.toJson ();
299
+ }
300
+ return {TRI_ERROR_CLUSTER_NOT_LEADER};
301
+ } else {
302
+ auto newValue = newShardEntry (currentEntry, sid, false );
303
+ std::string key = " Current/Collections/" + _docColl->vocbase ().name () +
304
+ " /" + std::to_string (_docColl->planId ()) + " /" +
305
+ _docColl->name ();
306
+ AgencyWriteTransaction trx;
307
+ trx.preconditions .push_back (
308
+ AgencyPrecondition (key, AgencyPrecondition::Type::VALUE, currentEntry));
309
+ trx.preconditions .push_back (
310
+ AgencyPrecondition (planPath, AgencyPrecondition::Type::VALUE, planEntry));
311
+ trx.operations .push_back (
312
+ AgencyOperation (key, AgencyValueOperationType::SET, newValue.slice ()));
313
+ trx.operations .push_back (
314
+ AgencyOperation (" Current/Version" , AgencySimpleOperationType::INCREMENT_OP));
315
+ AgencyCommResult res2 = ac.sendTransactionWithFailover (trx);
316
+ if (res2.successful ()) {
317
+ // we are finished
318
+ LOG_TOPIC (" be0cb" , DEBUG, Logger::CLUSTER) << " Removing follower " << sid << " from "
272
319
<< _docColl->name () << " succeeded" ;
273
- return {TRI_ERROR_NO_ERROR};
320
+ return {TRI_ERROR_NO_ERROR};
321
+ } else {
322
+ LOG_TOPIC (" 67778" , WARN, Logger::CLUSTER)
323
+ << " FollowerInfo::remove, could not cas key " << path
324
+ << " . status code: " << res2._statusCode
325
+ << " , incriminating body: " << res2.bodyRef ();
326
+ }
274
327
}
275
328
}
276
329
} else {
277
- LOG_TOPIC (" b7333" , ERR, Logger::CLUSTER)
278
- << " FollowerInfo::remove, could not read " << path << " in agency." ;
330
+ LOG_TOPIC (" b7333" , WARN, Logger::CLUSTER)
331
+ << " FollowerInfo::remove, could not read " << planPath
332
+ << " and " << curPath << " in agency." ;
279
333
}
280
334
std::this_thread::sleep_for (std::chrono::microseconds (500000 ));
281
- } while (TRI_microtime () < startTime + 30 &&
335
+ } while (TRI_microtime () < startTime + 7200 &&
282
336
application_features::ApplicationServer::isRetryOK ());
283
-
284
- // rollback
337
+ // This is important, give it 2h if needed. We really do not want to get
338
+ // into the position to fail to drop a follower, just because we cannot
339
+ // talk to the agency temporarily. The worst would be to drop the follower
340
+ // locally but not report the fact to the agency. The second worst is to
341
+ // not be able to drop the follower, despite the fact that a replication
342
+ // was not successful. All else is less dramatic. Therefore we try for
343
+ // a long time.
344
+
345
+ // rollback:
285
346
_followers = _oldFollowers;
286
347
287
348
int errorCode = (application_features::ApplicationServer::isRetryOK ()) ? TRI_ERROR_CLUSTER_AGENCY_COMMUNICATION_FAILED : TRI_ERROR_SHUTTING_DOWN;
0 commit comments