8000 allow for long supervision job runtimes by kvahed · Pull Request #14741 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
8000
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
devel
-----

* No runtime limits for shard move and server cleanout jobs, instead
possibility to cancel them.

* Updated ArangoDB Starter to 0.15.2.

* SEARCH-238: Improved SortNodes placement optimization in cluster so
Expand Down
18 changes: 7 additions & 11 deletions arangod/Agency/CleanOutServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,8 @@ JOB_STATUS CleanOutServer::status() {
}

if (found > 0) { // some subjob still running
// timeout here:
auto tmp_time =
_snapshot.hasAsString(pendingPrefix + _jobId + "/timeCreated");
std::string timeCreatedString = tmp_time.value();
Supervision::TimePoint timeCreated = stringToTimepoint(timeCreatedString);
Supervision::TimePoint now(std::chrono::system_clock::now());
if (now - timeCreated > std::chrono::duration<double>(86400.0)) { // 1 day
abort("job timed out");
return FAILED;
}
return PENDING;
// consider cancellation
return considerCancellation() ? FAILED : PENDING;
}

Node::Children const& failed = _snapshot.hasAsChildren(failedPrefix).value().get();
Expand Down Expand Up @@ -198,6 +189,11 @@ bool CleanOutServer::create(std::shared_ptr<VPackBuilder> envelope) {
}

bool CleanOutServer::start(bool& aborts) {

if (considerCancellation()) {
return false;
}

// If anything throws here, the run() method catches it and finishes
// the job.

10BC0 Expand Down
11 changes: 11 additions & 0 deletions arangod/Agency/Job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ Job::~Job() = default;
// this will be initialized in the AgencyFeature
std::string Job::agencyPrefix = "arango";

bool Job::considerCancellation() {
// Allow for cancellation of shard moves
auto [cancel,exists] =
_snapshot.hasAsBool(std::string("/Target/") + jobStatus[_status] + "/" + _jobId + "/abort");
auto cancelled = exists && cancel;
if (cancelled) {
abort("Killed via API");
}
return cancelled;
};

bool Job::finish(std::string const& server, std::string const& shard,
bool success, std::string const& reason, query_t const payload) {
try { // protect everything, just in case
Expand Down
3 changes: 3 additions & 0 deletions arangod/Agency/Job.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace consensus {
class Node;

enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND };
const std::vector<std::string> jobStatus {"ToDo", "Pending", "Finished", "Failed"};
const std::vector<std::string> pos({"/Target/ToDo/", "/Target/Pending/",
"/Target/Finished/", "/Target/Failed/"});
extern std::string const mapUniqueToShortID;
Expand Down Expand Up @@ -88,6 +89,8 @@ struct Job {

virtual void run(bool& aborts) = 0;

bool considerCancellation();

void runHelper(std::string const& server, std::string const& shard, bool& aborts) {
if (_status == FAILED) { // happens when the constructor did not work
return;
Expand Down
58 changes: 20 additions & 38 deletions arangod/Agency/MoveShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ bool MoveShard::create(std::shared_ptr<VPackBuilder> envelope) {
}

bool MoveShard::start(bool&) {

if (considerCancellation()) {
return false;
}

// If anything throws here, the run() method catches it and finishes
// the job.

Expand Down Expand Up @@ -476,6 +481,13 @@ bool MoveShard::start(bool&) {
}

JOB_STATUS MoveShard::status() {

if (_status == PENDING || _status == TODO) {
if (considerCancellation()) {
return FAILED;
}
}

if (_status != PENDING) {
return _status;
}
Expand All @@ -494,19 +506,6 @@ JOB_STATUS MoveShard::status() {

JOB_STATUS MoveShard::pendingLeader() {

auto considerTimeout = [&]() -> bool {
// Not yet all in sync, consider timeout:
std::string timeCreatedString =
_snapshot.hasAsString(pendingPrefix + _jobId + "/timeCreated").value();
Supervision::TimePoint timeCreated = stringToTimepoint(timeCreatedString);
Supervision::TimePoint now(std::chrono::system_clock::now());
if (now - timeCreated > std::chrono::duration<double>(43200.0)) { // 12h
abort("MoveShard timed out in pending leader");
return true;
}
return false;
};

// Find the other shards in the same distributeShardsLike group:
std::vector<Job::shard_t> shardsLikeMe =
clones(_snapshot, _database, _collection, _shard);
Expand Down Expand Up @@ -555,12 +554,9 @@ JOB_STATUS MoveShard::pendingLeader() {
}
});

// Consider timeout:
// Consider cancellation:
if (done < shardsLikeMe.size()) {
if (considerTimeout()) {
return FAILED;
}
return PENDING; // do not act
return considerCancellation() ? FAILED : PENDING;
}

// We need to ask the old leader to retire:
Expand Down Expand Up @@ -622,12 +618,9 @@ JOB_STATUS MoveShard::pendingLeader() {
}
});

// Consider timeout:
// Consider cancellation:
if (done < shardsLikeMe.size()) {
if (considerTimeout()) {
return FAILED;
}
return PENDING; // do not act!
return considerCancellation() ? FAILED : PENDING;
}

// We need to switch leaders:
Expand Down Expand Up @@ -751,12 +744,9 @@ JOB_STATUS MoveShard::pendingLeader() {
}
});

// Consider timeout:
// Consider cancellation
if (done < shardsLikeMe.size()) {
if (considerTimeout()) {
return FAILED;
}
return PENDING; // do not act!
return considerCancellation() ? FAILED : PENDING;
}

// We need to end the job, Plan remains unchanged:
Expand Down Expand Up @@ -840,6 +830,7 @@ JOB_STATUS MoveShard::pendingLeader() {
}

JOB_STATUS MoveShard::pendingFollower() {

// Check if any of the servers in the Plan are FAILED, if so,
// we abort:
std::string planPath =
Expand Down Expand Up @@ -867,16 +858,7 @@ JOB_STATUS MoveShard::pendingFollower() {
});

if (done < shardsLikeMe.size()) {
// Not yet all in sync, consider timeout:
std::string timeCreatedString =
_snapshot.hasAsString(pendingPrefix + _jobId + "/timeCreated").value();
Supervision::TimePoint timeCreated = stringToTimepoint(timeCreatedString);
Supervision::TimePoint now(std::chrono::system_clock::now());
if (now - timeCreated > std::chrono::duration<double>(10000.0)) {
abort("MoveShard timed out in pending follower");
return FAILED;
}
return PENDING;
return considerCancellation() ? FAILED : PENDING;
}

// All in sync, so move on and remove the fromServer, for all shards,
Expand Down
142 changes: 141 additions & 1 deletion arangod/RestHandler/RestAdminClusterHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "Agency/TransactionBuilder.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ResultT.h"
#include "Cluster/AgencyCache.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterHelpers.h"
#include "Cluster/ClusterInfo.h"
Expand Down Expand Up @@ -270,6 +271,7 @@ std::string const RestAdminClusterHandler::ResignLeadership =
"resignLeadership";
std::string const RestAdminClusterHandler::MoveShard = "moveShard";
std::string const RestAdminClusterHandler::QueryJobStatus = "queryAgencyJob";
std::string const RestAdminClusterHandler::CancelJob = "cancelAgencyJob";
std::string const RestAdminClusterHandler::RemoveServer = "removeServer";
std::string const RestAdminClusterHandler::Rebalan 29C2 ceShards = "rebalanceShards";
std::string const RestAdminClusterHandler::ShardStatistics = "shardStatistics";
Expand Down Expand Up @@ -331,6 +333,8 @@ RestStatus RestAdminClusterHandler::execute() {
return handleResignLeadership();
} else if (command == MoveShard) {
return handleMoveShard();
} else if (command == CancelJob) {
return handleCancelJob();
} else if (command == QueryJobStatus) {
return handleQueryJobStatus();
} else if (command == RemoveServer) {

Expand Down Expand Up

@@ -881,6 +885,142 @@ RestStatus RestAdminClusterHandler::handleQueryJobStatus() {
}));
}

RestStatus RestAdminClusterHandler::handleCancelJob() {
if (!ExecContext::current().isAdminUser()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN);
return RestStatus::DONE;
}

if (!ServerState::instance()->isCoordinator()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN,
"only allowed on coordinators");
return RestStatus::DONE;
}

if (request()->requestType() != rest::RequestType::POST) {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
return RestStatus::DONE;
}

bool parseSuccess;
VPackSlice body = parseVPackBody(parseSuccess);
if (!parseSuccess) {
return RestStatus::DONE;
}

std::string jobId;
if (body.isObject() && body.hasKey("id") && body.get("id").isString()) {
jobId = body.get("id").copyString();
} else {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER, "object with key `id`");
return RestStatus::DONE;
}

auto targetPath = arangodb::cluster::paths::root()->arango()->target();
std::vector<std::string> paths = {
targetPath->pending()->job(jobId)->str(),
targetPath->failed()->job(jobId)->str(),
targetPath->finished()->job(jobId)->str(),
targetPath->toDo()->job(jobId)->str(),
};

try {
auto& agencyCache = server().getFeature<ClusterFeature>().agencyCache();
auto [acb, idx] = agencyCache.read(paths);
auto res = acb->slice();

auto paths = std::vector{
targetPath->pending()->job(jobId)->vec(),
targetPath->failed()->job(jobId)->vec(),
targetPath->finished()->job(jobId)->vec(),
targetPath->toDo()->job(jobId)->vec(),
};

for (auto const& path : paths) {
VPackSlice job = res.at(0).get(path);

if (job.isObject()) {
LOG_DEVEL << job.toJson();
LOG_TOPIC("eb139", INFO, Logger::SUPERVISION)
<< "Attempting to abort supervision job " << job.toJson();
auto type = job.get("type").copyString();
if (type != "moveShard") { // only moveshard may be aborted
generateError(Result{400, "Only MoveShard jobs can be aborted"});
return RestStatus::DONE;
} else if (path[2] != "Pending" && path[2] != "ToDo") {
generateError(Result{400, path[2] + " jobs can no longer be cancelled."});
return RestStatus::DONE;
}

auto sendTransaction = [&] {
VPackBuffer<uint8_t> trxBody;
{ VPackBuilder builder(trxBody);
{ VPackArrayBuilder trxs(&builder);
if (path[2] == "ToDo") {
VPackArrayBuilder trx(&builder);
{ VPackObjectBuilder op(&builder);
builder.add("arango/Target/ToDo/" + jobId + "/abort", VPackValue(true)); }
{ VPackObjectBuilder pre(&builder);
builder.add(VPackValue("arango/Target/ToDo/" + jobId));
{ VPackObjectBuilder val(&builder);
builder.add("oldEmpty", VPackValue(false)); }}
}
VPackArrayBuilder trx(&builder);
{ VPackObjectBuilder op(&builder);
builder.add("arango/Target/Pending/" + jobId + "/abort", VPackValue(true)); }
{ VPackObjectBuilder pre(&builder);
builder.add(VPackValue("arango/Target/Pending/" + jobId));
{ VPackObjectBuilder val(&builder);
builder.add("oldEmpty", VPackValue(false)); }}
}
}
return AsyncAgencyComm().sendWriteTransaction(60s, std::move(trxBody));
};

waitForFuture(
sendTransaction()
.thenValue([this](AsyncAgencyCommResult&& wr) {
if (!wr.ok()) {
if (wr.statusCode() == 412) {
generateError(Result{412, "Job is no longer running"});
} else {
generateError(wr.asResult());
}
}
return futures::makeFuture();
})
.thenError<VPackException>([this](VPackException const& e) {
generateError(Result{e.errorCode(), e.what()});
})
.thenError<std::exception>([this](std::exception const& e) {
generateError(rest::ResponseCode::SERVER_ERROR,
TRI_ERROR_HTTP_SERVER_ERROR, e.what());
}));
VPackBuffer<uint8_t> payload;
{
VPackBuilder builder(payload);
VPackObjectBuilder ob(&builder);
builder.add("job", VPackValue(jobId));
builder.add("status", VPackValue("aborted"));
builder.add("error", VPackValue(false));
}
resetResponse(rest::ResponseCode::OK);
response()->setPayload(std::move(payload));
return RestStatus::DONE;
}
}

generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);

} catch (VPackException const& e) {
generateError(Result{e.errorCode(), e.what()});
} catch (std::exception const& e) {
generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, e.what());
}

return RestStatus::DONE;
}

RestStatus RestAdminClusterHandler::handleSingleServerJob(std::string const& job) {
if (!ExecContext::current().isAdminUser()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN);
Expand Down Expand Up @@ -1204,7 +1344,7 @@ RestStatus RestAdminClusterHandler::setMaintenance(bool wantToActivate) {
auto sendTransaction = [&] {
if (wantToActivate) {
constexpr int timeout = 3600; // 1 hour
return AsyncAgencyComm().setValue(60s, maintenancePath,
return AsyncAgencyComm().setValue(60s, maintenancePath,
VPackValue(timepointToString(
std::chrono::system_clock::now() + std::chrono::seconds(timeout))), 3600);
} else {
Expand Down
2 changes: 2 additions & 0 deletions arangod/RestHandler/RestAdminClusterHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RestAdminClusterHandler : public RestVocbaseBaseHandler {
RequestLane lane() const override final { return RequestLane::CLIENT_SLOW; }

private:
static std::string const CancelJob;
static std::string const Health;
static std::string const NumberOfServers;
static std::string const Maintenance;
Expand Down Expand Up @@ -86,6 +87,7 @@ class RestAdminClusterHandler : public RestVocbaseBaseHandler {
RestStatus handleCleanoutServer();
RestStatus handleResignLeadership();
RestStatus handleMoveShard();
RestStatus handleCancelJob();
RestStatus handleQueryJobStatus();

RestStatus handleRemoveServer();
Expand Down
Loading
0