@@ -42,18 +42,15 @@ ActiveFailoverJob::ActiveFailoverJob(Node const& snapshot, AgentInterface* agent
42
42
JOB_STATUS status, std::string const & jobId)
43
43
: Job(status, snapshot, agent, jobId) {
44
44
// Get job details from agency:
45
- std::string path = pos[status] + _jobId + " /" ;
46
- auto tmp_server = _snapshot.hasAsString (path + " server" );
47
- auto tmp_creator = _snapshot.hasAsString (path + " creator" );
48
-
49
- if (tmp_server.second && tmp_creator.second ) {
50
- _server = tmp_server.first ;
51
- _creator = tmp_creator.first ;
52
- } else {
45
+ try {
46
+ std::string path = pos[status] + _jobId + " /" ;
47
+ _server = _snapshot.get (path + " server" ).getString ();
48
+ _creator = _snapshot.get (path + " creator" ).getString ();
49
+ } catch (std::exception const & e) {
53
50
std::stringstream err;
54
51
err << " Failed to find job " << _jobId << " in agency." ;
55
52
LOG_TOPIC (ERR, Logger::SUPERVISION) << err.str ();
56
- finish (tmp_server. first , " " , false , err.str ());
53
+ finish (_server , " " , false , err.str ());
57
54
_status = FAILED;
58
55
}
59
56
}
@@ -139,20 +136,22 @@ bool ActiveFailoverJob::start() {
139
136
return finish (_server, " " , true , reason); // move to /Target/Finished
140
137
}
141
138
142
- auto leader = _snapshot.hasAsSlice (asyncReplLeader);
143
- if (!leader.second || leader.first .compareString (_server) != 0 ) {
139
+ Node leader = _snapshot.get (asyncReplLeader);
140
+ if (!leader.isString () || leader.slice () .compareString (_server) != 0 ) {
144
141
std::string reason = " Server " + _server + " is not the current replication leader" ;
145
142
LOG_TOPIC (INFO, Logger::SUPERVISION) << reason;
146
143
return finish (_server, " " , true , reason); // move to /Target/Finished
147
144
}
148
145
149
146
// Abort job blocking server if abortable
150
- auto jobId = _snapshot.hasAsString (blockedServersPrefix + _server);
151
- if (jobId.second && !abortable (_snapshot, jobId.first )) {
152
- return false ;
153
- } else if (jobId.second ) {
154
- JobContext (PENDING, jobId.first , _snapshot, _agent).abort ();
155
- }
147
+ try {
148
+ std::string jobId = _snapshot (blockedServersPrefix + _server).getString ();
149
+ if (!abortable (_snapshot, jobId)) {
150
+ return false ;
151
+ } else {
152
+ JobContext (PENDING, jobId, _snapshot, _agent).abort ();
153
+ }
154
+ } catch (...) {}
156
155
157
156
// Todo entry
158
157
Builder todo;
@@ -195,7 +194,7 @@ bool ActiveFailoverJob::start() {
195
194
// Destination server should not be blocked by another job
196
195
addPreconditionServerNotBlocked (pending, newLeader);
197
196
// AsyncReplication leader must be the failed server
198
- addPreconditionUnchanged (pending, asyncReplLeader, leader.first );
197
+ addPreconditionUnchanged (pending, asyncReplLeader, leader.slice () );
199
198
} // precondition done
200
199
201
200
} // array for transaction done
@@ -247,20 +246,17 @@ arangodb::Result ActiveFailoverJob::abort() {
247
246
typedef std::pair<std::string, TRI_voc_tick_t> ServerTick;
248
247
// / Try to select the follower most in-sync with failed leader
249
248
std::string ActiveFailoverJob::findBestFollower () {
250
- std::vector<std::string> healthy = healthyServers (_snapshot);
251
- // the failed leader should never appear as healthy
252
- TRI_ASSERT (std::find (healthy.begin (), healthy.end (), _server) == healthy.end ());
249
+ std::vector<std::string> as = healthyServers (_snapshot);
253
250
254
251
// blocked; (not sure if this can even happen)
255
252
try {
256
253
for (auto const & srv : _snapshot (blockedServersPrefix).children ()) {
257
- healthy .erase (std::remove (healthy .begin (), healthy .end (), srv.first ), healthy .end ());
254
+ as .erase (std::remove (as .begin (), as .end (), srv.first ), as .end ());
258
255
}
259
256
} catch (...) {}
260
257
261
258
std::vector<ServerTick> ticks;
262
- try {
263
- // collect tick values from transient state
259
+ try { // collect tick values from transient state
264
260
query_t trx = std::make_unique<VPackBuilder>();
265
261
{
266
262
VPackArrayBuilder transactions (trx.get ());
@@ -277,15 +273,13 @@ std::string ActiveFailoverJob::findBestFollower() {
277
273
VPackSlice obj = resp.at (0 ).get ({ Job::agencyPrefix, " AsyncReplication" });
278
274
for (VPackObjectIterator::ObjectPair pair : VPackObjectIterator (obj)) {
279
275
std::string srvUUID = pair.key .copyString ();
280
- bool isAvailable = std::find (healthy.begin (), healthy.end (), srvUUID) != healthy.end ();
281
- if (!isAvailable) {
276
+ if (std::find (as.begin (), as.end (), srvUUID) == as.end ()) {
282
277
continue ; // skip inaccessible servers
283
278
}
284
- TRI_ASSERT (srvUUID != _server);
285
279
286
280
VPackSlice leader = pair.value .get (" leader" ); // broken leader
287
281
VPackSlice lastTick = pair.value .get (" lastTick" );
288
- if (leader.isString () && leader.isEqualString (_server) &&
282
+ if (leader.isString () && leader.compareString (_server) == 0 &&
289
283
lastTick.isNumber ()) {
290
284
ticks.emplace_back (std::move (srvUUID), lastTick.getUInt ());
291
285
}
@@ -298,7 +292,6 @@ std::string ActiveFailoverJob::findBestFollower() {
298
292
return a.second > b.second ;
299
293
});
300
294
if (!ticks.empty ()) {
301
- TRI_ASSERT (ticks.size () == 1 || ticks[0 ].second >= ticks[1 ].second );
302
295
return ticks[0 ].first ;
303
296
}
304
297
return " " ; // fallback to any available server
0 commit comments