diff --git a/CHANGELOG b/CHANGELOG index 3dcce87f0bd4..399604220486 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ devel ----- +* No runtime limits for shard move and server cleanout jobs, instead + possibility to cancel them. + * Fix cluster-internal network protocol to HTTP/1 for now. Any other protocol selected via the startup option `--network.protocol` will automatically be switched to HTTP/1. The startup option `--network.protocol` is now deprecated @@ -22,30 +25,30 @@ devel now the cluster-one-shard-rule can be applied to improve query performance. -* (Enterprise Edition only): added query option `forceOneShardAttributeValue` - to explicitly set a shard key value that will be used during query snippet +* (Enterprise Edition only): added query option `forceOneShardAttributeValue` + to explicitly set a shard key value that will be used during query snippet distribution to limit the query to a specific server in the cluster. - This query option can be used in complex queries in case the query optimizer - cannot automatically detect that the query can be limited to only a single + This query option can be used in complex queries in case the query optimizer + cannot automatically detect that the query can be limited to only a single server (e.g. in a disjoint smart graph case). - When the option is set to the correct shard key value, the query will be - limited to the target server determined by the shard key value. It thus + When the option is set to the correct shard key value, the query will be + limited to the target server determined by the shard key value. It thus requires that all collections in the query use the same distribution (i.e. `distributeShardsLike` attribute via disjoint SmartGraphs). - - Limiting the query to a single DB server is a performance optimization - and may make complex queries run a lot faster because of the reduced - setup and teardown costs and the reduced cluster-internal traffic during + + Limiting the query to a single DB server is a performance optimization + and may make complex queries run a lot faster because of the reduced + setup and teardown costs and the reduced cluster-internal traffic during query execution. - If the option is set incorrectly, i.e. to a wrong shard key value, then - the query may be shipped to a wrong DB server and may not return results - (i.e. empty result set). It is thus the caller's responsibility to set + If the option is set incorrectly, i.e. to a wrong shard key value, then + the query may be shipped to a wrong DB server and may not return results + (i.e. empty result set). It is thus the caller's responsibility to set the `forceOneShardAttributeValue` correctly or not use it. - The `forceOneShardAttributeValue` option will only honor string values. - All other values as well as the empty string will be ignored and treated + The `forceOneShardAttributeValue` option will only honor string values. + All other values as well as the empty string will be ignored and treated as if the option is not set. If the option is set and the query satisfies the requirements for using @@ -54,7 +57,7 @@ devel * Updated ArangoDB Starter to 0.15.2. -* SEARCH-238: Improved SortNodes placement optimization in cluster so +* SEARCH-238: Improved SortNodes placement optimization in cluster so late materialization could cover more cases * Fix some memory leaks after adding optimization rule for AqlAnalyzer. @@ -62,7 +65,7 @@ devel * Fix internal iterator states after intermediate commits in write transactions. Iterators could point to invalid data after an intermediate commit, producing undefined behavior. - + * Fix read-own-write behavior in different scenarios: - in some cases writes performed by an AQL query could be observed within the same query. This was not intended and is fixed now. diff --git a/arangod/Agency/CleanOutServer.cpp b/arangod/Agency/CleanOutServer.cpp index a1d85486e00f..39c3d3acde50 100644 --- a/arangod/Agency/CleanOutServer.cpp +++ b/arangod/Agency/CleanOutServer.cpp @@ -82,17 +82,8 @@ JOB_STATUS CleanOutServer::status() { } if (found > 0) { // some subjob still running - // timeout here: - auto tmp_time = - _snapshot.hasAsString(pendingPrefix + _jobId + "/timeCreated"); - std::string timeCreatedString = tmp_time.value(); - Supervision::TimePoint timeCreated = stringToTimepoint(timeCreatedString); - Supervision::TimePoint now(std::chrono::system_clock::now()); - if (now - timeCreated > std::chrono::duration(86400.0)) { // 1 day - abort("job timed out"); - return FAILED; - } - return PENDING; + // consider cancellation + return considerCancellation() ? FAILED : PENDING; } Node::Children const& failed = _snapshot.hasAsChildren(failedPrefix).value().get(); @@ -198,6 +189,11 @@ bool CleanOutServer::create(std::shared_ptr envelope) { } bool CleanOutServer::start(bool& aborts) { + + if (considerCancellation()) { + return false; + } + // If anything throws here, the run() method catches it and finishes // the job. diff --git a/arangod/Agency/Job.cpp b/arangod/Agency/Job.cpp index 0ae43648ce4c..7f41e8b5d080 100644 --- a/arangod/Agency/Job.cpp +++ b/arangod/Agency/Job.cpp @@ -88,6 +88,17 @@ Job::~Job() = default; // this will be initialized in the AgencyFeature std::string Job::agencyPrefix = "arango"; +bool Job::considerCancellation() { + // Allow for cancellation of shard moves + auto val = + _snapshot.hasAsBool(std::string("/Target/") + jobStatus[_status] + "/" + _jobId + "/abort"); + auto cancelled = val && val.value(); + if (cancelled) { + abort("Killed via API"); + } + return cancelled; +}; + bool Job::finish(std::string const& server, std::string const& shard, bool success, std::string const& reason, query_t const payload) { try { // protect everything, just in case diff --git a/arangod/Agency/Job.h b/arangod/Agency/Job.h index fff39d9b1410..f03aa1d6abf9 100644 --- a/arangod/Agency/Job.h +++ b/arangod/Agency/Job.h @@ -48,6 +48,7 @@ namespace consensus { class Node; enum JOB_STATUS { TODO, PENDING, FINISHED, FAILED, NOTFOUND }; +const std::vector jobStatus {"ToDo", "Pending", "Finished", "Failed"}; const std::vector pos({"/Target/ToDo/", "/Target/Pending/", "/Target/Finished/", "/Target/Failed/"}); extern std::string const mapUniqueToShortID; @@ -88,6 +89,8 @@ struct Job { virtual void run(bool& aborts) = 0; + bool considerCancellation(); + void runHelper(std::string const& server, std::string const& shard, bool& aborts) { if (_status == FAILED) { // happens when the constructor did not work return; diff --git a/arangod/Agency/MoveShard.cpp b/arangod/Agency/MoveShard.cpp index 2d0f1c9ddc02..698db1667d77 100644 --- a/arangod/Agency/MoveShard.cpp +++ b/arangod/Agency/MoveShard.cpp @@ -181,6 +181,11 @@ bool MoveShard::create(std::shared_ptr envelope) { } bool MoveShard::start(bool&) { + + if (considerCancellation()) { + return false; + } + // If anything throws here, the run() method catches it and finishes // the job. @@ -476,6 +481,13 @@ bool MoveShard::start(bool&) { } JOB_STATUS MoveShard::status() { + + if (_status == PENDING || _status == TODO) { + if (considerCancellation()) { + return FAILED; + } + } + if (_status != PENDING) { return _status; } @@ -494,19 +506,6 @@ JOB_STATUS MoveShard::status() { JOB_STATUS MoveShard::pendingLeader() { - auto considerTimeout = [&]() -> bool { - // Not yet all in sync, consider timeout: - std::string timeCreatedString = - _snapshot.hasAsString(pendingPrefix + _jobId + "/timeCreated").value(); - Supervision::TimePoint timeCreated = stringToTimepoint(timeCreatedString); - Supervision::TimePoint now(std::chrono::system_clock::now()); - if (now - timeCreated > std::chrono::duration(43200.0)) { // 12h - abort("MoveShard timed out in pending leader"); - return true; - } - return false; - }; - // Find the other shards in the same distributeShardsLike group: std::vector shardsLikeMe = clones(_snapshot, _database, _collection, _shard); @@ -555,12 +554,9 @@ JOB_STATUS MoveShard::pendingLeader() { } }); - // Consider timeout: + // Consider cancellation: if (done < shardsLikeMe.size()) { - if (considerTimeout()) { - return FAILED; - } - return PENDING; // do not act + return considerCancellation() ? FAILED : PENDING; } // We need to ask the old leader to retire: @@ -622,12 +618,9 @@ JOB_STATUS MoveShard::pendingLeader() { } }); - // Consider timeout: + // Consider cancellation: if (done < shardsLikeMe.size()) { - if (considerTimeout()) { - return FAILED; - } - return PENDING; // do not act! + return considerCancellation() ? FAILED : PENDING; } // We need to switch leaders: @@ -751,12 +744,9 @@ JOB_STATUS MoveShard::pendingLeader() { } }); - // Consider timeout: + // Consider cancellation if (done < shardsLikeMe.size()) { - if (considerTimeout()) { - return FAILED; - } - return PENDING; // do not act! + return considerCancellation() ? FAILED : PENDING; } // We need to end the job, Plan remains unchanged: @@ -840,6 +830,7 @@ JOB_STATUS MoveShard::pendingLeader() { } JOB_STATUS MoveShard::pendingFollower() { + // Check if any of the servers in the Plan are FAILED, if so, // we abort: std::string planPath = @@ -867,16 +858,7 @@ JOB_STATUS MoveShard::pendingFollower() { }); if (done < shardsLikeMe.size()) { - // Not yet all in sync, consider timeout: - std::string timeCreatedString = - _snapshot.hasAsString(pendingPrefix + _jobId + "/timeCreated").value(); - Supervision::TimePoint timeCreated = stringToTimepoint(timeCreatedString); - Supervision::TimePoint now(std::chrono::system_clock::now()); - if (now - timeCreated > std::chrono::duration(10000.0)) { - abort("MoveShard timed out in pending follower"); - return FAILED; - } - return PENDING; + return considerCancellation() ? FAILED : PENDING; } // All in sync, so move on and remove the fromServer, for all shards, @@ -980,7 +962,7 @@ arangodb::Result MoveShard::abort(std::string const& reason) { } } - if (finish("", "", true, "job aborted (1): " + reason, todoPrec)) { + if (finish("", "", false, "job aborted (1): " + reason, todoPrec)) { return result; } _status = PENDING; diff --git a/arangod/RestHandler/RestAdminClusterHandler.cpp b/arangod/RestHandler/RestAdminClusterHandler.cpp index 48b444a791c5..b1cba06b1d08 100644 --- a/arangod/RestHandler/RestAdminClusterHandler.cpp +++ b/arangod/RestHandler/RestAdminClusterHandler.cpp @@ -37,6 +37,7 @@ #include "Agency/TransactionBuilder.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ResultT.h" +#include "Cluster/AgencyCache.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterHelpers.h" #include "Cluster/ClusterInfo.h" @@ -270,6 +271,7 @@ std::string const RestAdminClusterHandler::ResignLeadership = "resignLeadership"; std::string const RestAdminClusterHandler::MoveShard = "moveShard"; std::string const RestAdminClusterHandler::QueryJobStatus = "queryAgencyJob"; +std::string const RestAdminClusterHandler::CancelJob = "cancelAgencyJob"; std::string const RestAdminClusterHandler::RemoveServer = "removeServer"; std::string const RestAdminClusterHandler::RebalanceShards = "rebalanceShards"; std::string const RestAdminClusterHandler::ShardStatistics = "shardStatistics"; @@ -289,7 +291,7 @@ RestStatus RestAdminClusterHandler::execute() { bool const isWriteOperation = (request()->requestType() != rest::RequestType::GET); std::string const& apiJwtPolicy = server().getFeature().apiJwtPolicy(); - if (apiJwtPolicy == "jwt-all" || + if (apiJwtPolicy == "jwt-all" || (apiJwtPolicy == "jwt-write" && isWriteOperation)) { generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN); return RestStatus::DONE; @@ -331,6 +333,8 @@ RestStatus RestAdminClusterHandler::execute() { return handleResignLeadership(); } else if (command == MoveShard) { return handleMoveShard(); + } else if (command == CancelJob) { + return handleCancelJob(); } else if (command == QueryJobStatus) { return handleQueryJobStatus(); } else if (command == RemoveServer) { @@ -452,7 +456,7 @@ RestAdminClusterHandler::FutureVoid RestAdminClusterHandler::tryDeleteServer( return futures::makeFuture(); }); } - + TRI_IF_FAILURE("removeServer::noRetry") { generateError(Result(TRI_ERROR_HTTP_PRECONDITION_FAILED)); return futures::makeFuture(); @@ -528,18 +532,18 @@ RestStatus RestAdminClusterHandler::handleShardStatistics() { generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); return RestStatus::DONE; } - + if (!ServerState::instance()->isCoordinator()) { generateError(rest::ResponseCode::NOT_IMPLEMENTED, TRI_ERROR_CLUSTER_ONLY_ON_COORDINATOR); return RestStatus::DONE; } - + if (!_vocbase.isSystem()) { generateError(GeneralResponse::responseCode(TRI_ERROR_ARANGO_USE_SYSTEM_DATABASE), TRI_ERROR_ARANGO_USE_SYSTEM_DATABASE); return RestStatus::DONE; } - + std::string const& restrictServer = _request->value("DBserver"); bool details = _request->parsedValue("details", false); @@ -881,6 +885,154 @@ RestStatus RestAdminClusterHandler::handleQueryJobStatus() { })); } +RestStatus RestAdminClusterHandler::handleCancelJob() { + if (!ExecContext::current().isAdminUser()) { + generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN); + return RestStatus::DONE; + } + + if (!ServerState::instance()->isCoordinator()) { + generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN, + "only allowed on coordinators"); + return RestStatus::DONE; + } + + if (request()->requestType() != rest::RequestType::POST) { + generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); + return RestStatus::DONE; + } + + bool parseSuccess; + VPackSlice body = parseVPackBody(parseSuccess); + if (!parseSuccess) { + return RestStatus::DONE; + } + + std::string jobId; + if (body.isObject() && body.hasKey("id") && body.get("id").isString()) { + jobId = body.get("id").copyString(); + } else { + generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER, "object with key `id`"); + return RestStatus::DONE; + } + + auto targetPath = arangodb::cluster::paths::root()->arango()->target(); + std::vector paths = { + targetPath->pending()->job(jobId)->str(), + targetPath->failed()->job(jobId)->str(), + targetPath->finished()->job(jobId)->str(), + targetPath->toDo()->job(jobId)->str(), + }; + + try { + auto& agencyCache = server().getFeature().agencyCache(); + auto [acb, idx] = agencyCache.read(paths); + auto res = acb->slice(); + + auto paths = std::vector{ + targetPath->pending()->job(jobId)->vec(), + targetPath->failed()->job(jobId)->vec(), + targetPath->finished()->job(jobId)->vec(), + targetPath->toDo()->job(jobId)->vec(), + }; + + for (auto const& path : paths) { + VPackSlice job = res.at(0).get(path); + + if (job.isObject()) { + LOG_TOPIC("eb139", INFO, Logger::SUPERVISION) + << "Attempting to abort supervision job " << job.toJson(); + auto type = job.get("type").stringView(); + // only moveshard and cleanoutserver may be aborted + if (type != "moveShard" && type != "cleanOutServer") { + generateError( + Result{TRI_ERROR_BAD_PARAMETER, "Only MoveShard and CleanOutServer jobs can be aborted"}); + return RestStatus::DONE; + } else if (path[2] != "Pending" && path[2] != "ToDo") { + generateError(Result{TRI_ERROR_BAD_PARAMETER, path[2] + " jobs can no longer be cancelled."}); + return RestStatus::DONE; + } + + // This tranaction aims at killing a job that is todo or pending. + // A todo job could be pending in the meantime however a pending + // job can never be todo again. Response ist evaluated in 412 result + // below. + auto sendTransaction = [&] { + VPackBuffer trxBody; + { VPackBuilder builder(trxBody); + { VPackArrayBuilder trxs(&builder); + if (path[2] == "ToDo") { + VPackArrayBuilder trx(&builder); + { VPackObjectBuilder op(&builder); + builder.add("arango/Target/ToDo/" + jobId + "/abort", VPackValue(true)); } + { VPackObjectBuilder pre(&builder); + builder.add(VPackValue("arango/Target/ToDo/" + jobId)); + { VPackObjectBuilder val(&builder); + builder.add("oldEmpty", VPackValue(false)); }} + } + VPackArrayBuilder trx(&builder); + { VPackObjectBuilder op(&builder); + builder.add("arango/Target/Pending/" + jobId + "/abort", VPackValue(true)); } + { VPackObjectBuilder pre(&builder); + builder.add(VPackValue("arango/Target/Pending/" + jobId)); + { VPackObjectBuilder val(&builder); + builder.add("oldEmpty", VPackValue(false)); }} + } + } + return AsyncAgencyComm().sendWriteTransaction(60s, std::move(trxBody)); + }; + + return waitForFuture( + sendTransaction() + .thenValue([=](AsyncAgencyCommResult&& wr) { + if (!wr.ok()) { + // Only if no longer pending or todo. + if (wr.statusCode() == 412) { + auto results = wr.slice().get("results"); + if (results[0].getNumber() == 0 && + results[1].getNumber() == 0) { + generateError( + Result{TRI_ERROR_HTTP_PRECONDITION_FAILED, "Job is no longer pending or to do"}); + return; + } + } else { + generateError(wr.asResult()); + return; + } + } + + VPackBuffer payload; + { + VPackBuilder builder(payload); + VPackObjectBuilder ob(&builder); + builder.add("job", VPackValue(jobId)); + builder.add("status", VPackValue("aborted")); + builder.add("error", VPackValue(false)); + } + resetResponse(rest::ResponseCode::OK); + response()->setPayload(std::move(payload)); + }) + .thenError([this](VPackException const& e) { + generateError(Result{TRI_ERROR_HTTP_SERVER_ERROR, e.what()}); + }) + .thenError([this](std::exception const& e) { + generateError(rest::ResponseCode::SERVER_ERROR, + TRI_ERROR_HTTP_SERVER_ERROR, e.what()); + })); + } + } + + generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); + + } catch (VPackException const& e) { + generateError(Result{TRI_ERROR_HTTP_SERVER_ERROR, e.what()}); + } catch (std::exception const& e) { + generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, e.what()); + } + + return RestStatus::DONE; +} + RestStatus RestAdminClusterHandler::handleSingleServerJob(std::string const& job) { if (!ExecContext::current().isAdminUser()) { generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN); @@ -1062,7 +1214,7 @@ RestStatus RestAdminClusterHandler::handleShardDistribution() { builder.add(VPackValue("results")); reporter->getDistributionForDatabase(_vocbase.name(), builder); } - generateOk(rest::ResponseCode::OK, builder); + generateOk(rest::ResponseCode::OK, builder); return RestStatus::DONE; } @@ -1081,7 +1233,7 @@ RestStatus RestAdminClusterHandler::handleGetCollectionShardDistribution(std::st builder.add(VPackValue("results")); reporter->getCollectionDistributionForDatabase(_vocbase.name(), collection, builder); } - generateOk(rest::ResponseCode::OK, builder); + generateOk(rest::ResponseCode::OK, builder); return RestStatus::DONE; } @@ -1204,7 +1356,7 @@ RestStatus RestAdminClusterHandler::setMaintenance(bool wantToActivate) { auto sendTransaction = [&] { if (wantToActivate) { constexpr int timeout = 3600; // 1 hour - return AsyncAgencyComm().setValue(60s, maintenancePath, + return AsyncAgencyComm().setValue(60s, maintenancePath, VPackValue(timepointToString( std::chrono::system_clock::now() + std::chrono::seconds(timeout))), 3600); } else { @@ -1331,7 +1483,7 @@ RestStatus RestAdminClusterHandler::handleGetNumberOfServers() { targetPath->cleanedServers()->vec())); } - generateOk(rest::ResponseCode::OK, builder); + generateOk(rest::ResponseCode::OK, builder); } else { generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, "agency communication failed"); diff --git a/arangod/RestHandler/RestAdminClusterHandler.h b/arangod/RestHandler/RestAdminClusterHandler.h index 2bb593cd5ab4..b0f52cb30263 100644 --- a/arangod/RestHandler/RestAdminClusterHandler.h +++ b/arangod/RestHandler/RestAdminClusterHandler.h @@ -46,6 +46,7 @@ class RestAdminClusterHandler : public RestVocbaseBaseHandler { RequestLane lane() const override final { return RequestLane::CLIENT_SLOW; } private: + static std::string const CancelJob; static std::string const Health; static std::string const NumberOfServers; static std::string const Maintenance; @@ -86,6 +87,7 @@ class RestAdminClusterHandler : public RestVocbaseBaseHandler { RestStatus handleCleanoutServer(); RestStatus handleResignLeadership(); RestStatus handleMoveShard(); + RestStatus handleCancelJob(); RestStatus handleQueryJobStatus(); RestStatus handleRemoveServer(); diff --git a/tests/Agency/CleanOutServerTest.cpp b/tests/Agency/CleanOutServerTest.cpp index cabaf286586a..614088f7cec5 100644 --- a/tests/Agency/CleanOutServerTest.cpp +++ b/tests/Agency/CleanOutServerTest.cpp @@ -585,7 +585,7 @@ TEST_F(CleanOutServerTest, cleanout_server_job_should_move_into_pending_if_ok) { Verify(Method(mockAgent, write)); } -TEST_F(CleanOutServerTest, cleanout_server_job_should_abort_after_timeout) { +TEST_F(CleanOutServerTest, test_cancel_pending_job) { TestStructureType createTestStructure = [&](VPackSlice const& s, std::string const& path) { std::unique_ptr builder; builder.reset(new VPackBuilder()); @@ -604,13 +604,9 @@ TEST_F(CleanOutServerTest, cleanout_server_job_should_abort_after_timeout) { builder->add(VPackValue(JOBID)); builder->add(VPackValue(VPackValueType::Object)); for (auto const& jobIt : VPackObjectIterator(job.slice())) { - if (jobIt.key.copyString() == "timeCreated") { - builder->add(jobIt.key.copyString(), - VPackValue("2015-01-03T20:00:00Z")); - } else { builder->add(jobIt.key.copyString(), jobIt.value); - } } + builder->add("abort", VPackValue(true)); builder->close(); } else if (path == "/arango/Target/ToDo") { VPackBuilder moveBuilder = createMoveShardJob(); @@ -657,8 +653,69 @@ TEST_F(CleanOutServerTest, cleanout_server_job_should_abort_after_timeout) { Node agency = createAgency(createTestStructure); // should not throw auto cleanOutServer = CleanOutServer(agency, &agent, JOB_STATUS::PENDING, JOBID); - cleanOutServer.run(aborts); - Verify(Method(mockAgent, write)); + + Mock spy(cleanOutServer); + Fake(Method(spy, abort)); + + Job& spyCleanOutServer = spy.get(); + spyCleanOutServer.run(aborts); + Verify(Method(spy, abort)); + +} + +TEST_F(CleanOutServerTest, test_cancel_todo_job) { + TestStructureType createTestStructure = [&](VPackSlice const& s, std::string const& path) { + std::unique_ptr builder; + builder.reset(new VPackBuilder()); + if (s.isObject()) { + builder->add(VPackValue(VPackValueType::Object)); + for (auto it : VPackObjectIterator(s)) { + auto childBuilder = + createTestStructure(it.value, path + "/" + it.key.copyString()); + if (childBuilder) { + builder->add(it.key.copyString(), childBuilder->slice()); + } + } + + if (path == "/arango/Target/ToDo") { + auto job = createJob(SERVER); + builder->add(VPackValue(JOBID)); + builder->add(VPackValue(VPackValueType::Object)); + for (auto const& jobIt : VPackObjectIterator(job.slice())) { + builder->add(jobIt.key.copyString(), jobIt.value); + } + builder->add("abort", VPackValue(true)); + builder->close(); + VPackBuilder moveBuilder = createMoveShardJob(); + builder->add("1-0", moveBuilder.slice()); + } + builder->close(); + } else { + builder->add(s); + } + return builder; + }; + + int qCount = 0; + Mock mockAgent; + When(Method(mockAgent, write)).AlwaysDo([&](query_t const& q, consensus::AgentInterface::WriteMode w) -> write_ret_t { + if (qCount++ == 0) { + checkFailed(JOB_STATUS::TODO, q); + } + return fakeWriteResult; + }); + AgentInterface& agent = mockAgent.get(); + + Node agency = createAgency(createTestStructure); + // should not throw + auto cleanOutServer = CleanOutServer(agency, &agent, JOB_STATUS::TODO, JOBID); + Mock spy(cleanOutServer); + Fake(Method(spy, abort)); + + Job& spyCleanOutServer = spy.get(); + spyCleanOutServer.run(aborts); + Verify(Method(spy, abort)); + } TEST_F(CleanOutServerTest, when_there_are_still_subjobs_it_should_wait) { @@ -838,7 +895,7 @@ TEST_F(CleanOutServerTest, when_the_cleanout_server_job_aborts_abort_all_subjobs EXPECT_TRUE(writes.get("/arango/Target/ToDo/1-0").get("op").copyString() == "delete"); // a not yet started job will be moved to finished - EXPECT_TRUE(std::string(writes.get("/arango/Target/Finished/1-0").typeName()) == + EXPECT_TRUE(std::string(writes.get("/arango/Target/Failed/1-0").typeName()) == "object"); auto preconds = q->slice()[0][1]; EXPECT_TRUE(preconds.get("/arango/Target/ToDo/1-0").get("oldEmpty").isFalse()); diff --git a/tests/Agency/MoveShardTest.cpp b/tests/Agency/MoveShardTest.cpp index 08950b06c3eb..8a3da6323f9b 100644 --- a/tests/Agency/MoveShardTest.cpp +++ b/tests/Agency/MoveShardTest.cpp @@ -935,65 +935,6 @@ TEST_F(MoveShardTest, when_moving_a_shard_that_is_a_distributeshardslike_leader_ Verify(Method(mockAgent, write)); } -TEST_F(MoveShardTest, if_the_job_is_too_old_it_should_be_aborted_to_prevent_a_deadloop) { - std::function(VPackSlice const&, std::string const&)> createTestStructure = - [&](VPackSlice const& s, std::string const& path) { - std::unique_ptr builder; - builder.reset(new VPackBuilder()); - if (s.isObject()) { - builder->add(VPackValue(VPackValueType::Object)); - for (auto it : VPackObjectIterator(s)) { - auto childBuilder = - createTestStructure(it.value, path + "/" + it.key.copyString()); - if (childBuilder) { - builder->add(it.key.copyString(), childBuilder->slice()); - } - } - - if (path == "/arango/Target/Pending") { - VPackBuilder pendingJob; - { - VPackObjectBuilder b(&pendingJob); - auto plainJob = createJob(COLLECTION, SHARD_FOLLOWER1, FREE_SERVER); - for (auto it : VPackObjectIterator(plainJob.slice())) { - pendingJob.add(it.key.copyString(), it.value); - } - pendingJob.add("timeCreated", VPackValue("2015-01-03T20:00:00Z")); - } - builder->add(jobId, pendingJob.slice()); - } - builder->close(); - } else { - if (path == "/arango/Plan/Collections/" + DATABASE + "/" + - COLLECTION + "/shards/" + SHARD) { - builder->add(VPackValue(VPackValueType::Array)); - builder->add(VPackValue(SHARD_LEADER)); - builder->add(VPackValue(SHARD_FOLLOWER1)); - builder->add(VPackValue(FREE_SERVER)); - builder->close(); - } else { - builder->add(s); - } - } - return builder; - }; - - auto builder = createTestStructure(baseStructure.toBuilder().slice(), ""); - Node agency = createAgencyFromBuilder(*builder); - - Mock mockAgent; - AgentInterface& agent = mockAgent.get(); - - auto moveShard = MoveShard(agency, &agent, PENDING, jobId); - Mock spy(moveShard); - Fake(Method(spy, abort)); - - Job& spyMoveShard = spy.get(); - spyMoveShard.run(aborts); - - Verify(Method(spy, abort)); -} - TEST_F(MoveShardTest, if_the_to_server_no_longer_replica_we_should_abort) { std::function(VPackSlice const&, std::string const&)> createTestStructure = [&](VPackSlice const& s, std::string const& path) { @@ -1059,65 +1000,6 @@ TEST_F(MoveShardTest, if_the_to_server_no_longer_replica_we_should_abort) { Verify(Method(spy, abort)); } -TEST_F(MoveShardTest, if_the_job_is_too_old_leader_case_it_should_be_aborted_to_prevent_deadloop) { - std::function(VPackSlice const&, std::string const&)> createTestStructure = - [&](VPackSlice const& s, std::string const& path) { - std::unique_ptr builder; - builder.reset(new VPackBuilder()); - if (s.isObject()) { - builder->add(VPackValue(VPackValueType::Object)); - for (auto it : VPackObjectIterator(s)) { - auto childBuilder = - createTestStructure(it.value, path + "/" + it.key.copyString()); - if (childBuilder) { - builder->add(it.key.copyString(), childBuilder->slice()); - } - } - - if (path == "/arango/Target/Pending") { - VPackBuilder pendingJob; - { - VPackObjectBuilder b(&pendingJob); - auto plainJob = createJob(COLLECTION, SHARD_LEADER, FREE_SERVER); - for (auto it : VPackObjectIterator(plainJob.slice())) { - pendingJob.add(it.key.copyString(), it.value); - } - pendingJob.add("timeCreated", VPackValue("2015-01-03T20:00:00Z")); - } - builder->add(jobId, pendingJob.slice()); - } - builder->close(); - } else { - if (path == "/arango/Plan/Collections/" + DATABASE + "/" + - COLLECTION + "/shards/" + SHARD) { - builder->add(VPackValue(VPackValueType::Array)); - builder->add(VPackValue(SHARD_LEADER)); - builder->add(VPackValue(SHARD_FOLLOWER1)); - builder->add(VPackValue(FREE_SERVER)); - builder->close(); - } else { - builder->add(s); - } - } - return builder; - }; - - auto builder = createTestStructure(baseStructure.toBuilder().slice(), ""); - Node agency = createAgencyFromBuilder(*builder); - - Mock mockAgent; - AgentInterface& agent = mockAgent.get(); - - auto moveShard = MoveShard(agency, &agent, PENDING, jobId); - Mock spy(moveShard); - Fake(Method(spy, abort)); - - Job& spyMoveShard = spy.get(); - spyMoveShard.run(aborts); - - Verify(Method(spy, abort)); -} - TEST_F(MoveShardTest, if_the_collection_was_dropped_while_moving_finish_the_job) { std::function(VPackSlice const&, std::string const&)> createTestStructure = [&](VPackSlice const& s, std::string const& path) { @@ -1728,7 +1610,7 @@ TEST_F(MoveShardTest, a_moveshard_job_that_just_made_it_to_todo_can_simply_be_ab auto writes = q->slice()[0][0]; EXPECT_TRUE(writes.get("/arango/Target/ToDo/1").get("op").copyString() == "delete"); - EXPECT_TRUE(std::string(writes.get("/arango/Target/Finished/1").typeName()) == + EXPECT_TRUE(std::string(writes.get("/arango/Target/Failed/1").typeName()) == "object"); auto precond = q->slice()[0][1]; EXPECT_TRUE(precond.get("/arango/Target/ToDo/1").get("oldEmpty").isFalse()); @@ -2834,7 +2716,7 @@ TEST_F(MoveShardTest, trying_to_abort_a_finished_should_result_in_failure) { EXPECT_EQ(result.errorNumber(), TRI_ERROR_SUPERVISION_GENERAL_FAILURE); } -TEST_F(MoveShardTest, if_the_job_fails_while_trying_to_switch_over_leadership_it_should_be_aborted) { +TEST_F(MoveShardTest, test_cancel_pending_job) { std::function(VPackSlice const&, std::string const&)> createTestStructure = [&](VPackSlice const& s, std::string const& path) { std::unique_ptr builder; @@ -2857,7 +2739,7 @@ TEST_F(MoveShardTest, if_the_job_fails_while_trying_to_switch_over_leadership_it for (auto it : VPackObjectIterator(plainJob.slice())) { pendingJob.add(it.key.copyString(), it.value); } - pendingJob.add("timeCreated", VPackValue("2015-01-03T20:00:00Z")); + pendingJob.add("abort", VPackValue(true)); } builder->add(jobId, pendingJob.slice()); } @@ -2866,9 +2748,8 @@ TEST_F(MoveShardTest, if_the_job_fails_while_trying_to_switch_over_leadership_it if (path == "/arango/Plan/Collections/" + DATABASE + "/" + COLLECTION + "/shards/" + SHARD) { builder->add(VPackValue(VPackValueType::Array)); - builder->add(VPackValue("_" + SHARD_LEADER)); - builder->add(VPackValue(SHARD_FOLLOWER1)); builder->add(VPackValue(FREE_SERVER)); + builder->add(VPackValue(SHARD_FOLLOWER1)); builder->close(); } else { builder->add(s); @@ -2893,7 +2774,7 @@ TEST_F(MoveShardTest, if_the_job_fails_while_trying_to_switch_over_leadership_it Verify(Method(spy, abort)); } -TEST_F(MoveShardTest, if_the_job_timeouts_while_the_new_leader_is_trying_to_take_over_the_job_should_be_aborted) { +TEST_F(MoveShardTest, test_cancel_todo_job) { std::function(VPackSlice const&, std::string const&)> createTestStructure = [&](VPackSlice const& s, std::string const& path) { std::unique_ptr builder; @@ -2908,7 +2789,7 @@ TEST_F(MoveShardTest, if_the_job_timeouts_while_the_new_leader_is_trying_to_take } } - if (path == "/arango/Target/Pending") { + if (path == "/arango/Target/ToDo") { VPackBuilder pendingJob; { VPackObjectBuilder b(&pendingJob); @@ -2916,7 +2797,7 @@ TEST_F(MoveShardTest, if_the_job_timeouts_while_the_new_leader_is_trying_to_take for (auto it : VPackObjectIterator(plainJob.slice())) { pendingJob.add(it.key.copyString(), it.value); } - pendingJob.add("timeCreated", VPackValue("2015-01-03T20:00:00Z")); + pendingJob.add("abort", VPackValue(true)); } builder->add(jobId, pendingJob.slice()); } @@ -2941,7 +2822,7 @@ TEST_F(MoveShardTest, if_the_job_timeouts_while_the_new_leader_is_trying_to_take Mock mockAgent; AgentInterface& agent = mockAgent.get(); - auto moveShard = MoveShard(agency, &agent, PENDING, jobId); + auto moveShard = MoveShard(agency, &agent, TODO, jobId); Mock spy(moveShard); Fake(Method(spy, abort)); diff --git a/tests/js/server/resilience/move/moving-shards-cluster.js b/tests/js/server/resilience/move/moving-shards-cluster.js index 6c1873c97e69..c57d30a4958d 100644 --- a/tests/js/server/resilience/move/moving-shards-cluster.js +++ b/tests/js/server/resilience/move/moving-shards-cluster.js @@ -366,7 +366,7 @@ function MovingShardsSuite ({useData}) { /// @brief order the cluster to clean out a server: //////////////////////////////////////////////////////////////////////////////// - function cleanOutServer(id) { + function cleanOutServer(id, dontwait = false) { var coordEndpoint = global.ArangoClusterInfo.getServerEndpoint("Coordinator0001"); var request = require("@arangodb/request"); @@ -385,6 +385,9 @@ function MovingShardsSuite ({useData}) { } console.info("cleanOutServer job:", JSON.stringify(body)); console.info("result of request:", JSON.stringify(result.json)); + if (dontwait) { + return result; + } // Now wait until the job we triggered is finished: var count = 1200; // seconds while (true) { @@ -535,11 +538,11 @@ function MovingShardsSuite ({useData}) { "Exception for PUT /_admin/cluster/moveShard:", err.stack); return false; } + console.info("moveShard job:", JSON.stringify(body)); + console.info("result of request:", JSON.stringify(result.json)); if (dontwait) { return result; } - console.info("moveShard job:", JSON.stringify(body)); - console.info("result of request:", JSON.stringify(result.json)); // Now wait until the job we triggered is finished: var count = 600; // seconds while (true) { @@ -585,12 +588,36 @@ function MovingShardsSuite ({useData}) { } +//////////////////////////////////////////////////////////////////////////////// +/// @brief Abort job +//////////////////////////////////////////////////////////////////////////////// + + function cancelAgencyJob(id) { + console.log("Killing job " + id); + var coordEndpoint = + global.ArangoClusterInfo.getServerEndpoint("Coordinator0001"); + var request = require("@arangodb/request"); + var endpointToURL = require("@arangodb/cluster").endpointToURL; + var url = endpointToURL(coordEndpoint); + var req; + try { + req = request({ method: "POST", + url: url + "/_admin/cluster/cancelAgencyJob", + body: JSON.stringify({id:id}) }); + } catch (err) { + console.error( + "Exception for POS /_admin/cluster/cancelAgencyJob:" + {id:id}, err.stack); + return false; + } + return JSON.parse(req.body).error === false; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief create some collections //////////////////////////////////////////////////////////////////////////////// - function createSomeCollections(n, nrShards, replFactor, useData) { + function createSomeCollections(n, nrShards, replFactor, useData, otherNumDocuments = 0) { assertEqual('boolean', typeof useData); var systemCollServers = findCollectionServers("_system", "_graphs"); console.info("System collections use servers:", systemCollServers); @@ -605,7 +632,8 @@ function MovingShardsSuite ({useData}) { if (useData) { // insert some documents - coll.insert(_.range(0, numDocuments).map(v => ({ value: v, x: "someString" + v }))); + let nd = (otherNumDocuments === 0) ? numDocuments : otherNumDocuments; + coll.insert(_.range(0, nd).map(v => ({ value: v, x: "someString" + v }))); } var servers = findCollectionServers("_system", name); @@ -623,8 +651,9 @@ function MovingShardsSuite ({useData}) { } } - function checkCollectionContents() { - const numDocs = useData ? numDocuments : 0; + function checkCollectionContents(otherNumDocuments = 0) { + let nd = (otherNumDocuments === 0) ? numDocuments : otherNumDocuments; + const numDocs = useData ? nd : 0; for(const collection of c) { assertEqual(numDocs, collection.count()); const values = db._query( @@ -1052,6 +1081,130 @@ function MovingShardsSuite ({useData}) { checkCollectionContents(); }, +//////////////////////////////////////////////////////////////////////////////// +/// @brief kill todo moveShard job +//////////////////////////////////////////////////////////////////////////////// + + testCancelToDoMoveShard : function() { + createSomeCollections(1, 1, 3, useData); + assertTrue(waitForSynchronousReplication("_system")); + var servers = findCollectionServers("_system", c[1].name()); + var fromServer = servers[0]; + var toServer = findServerNotOnList(servers); + var cinfo = global.ArangoClusterInfo.getCollectionInfo( + "_system", c[1].name()); + var shard = Object.keys(cinfo.shards)[0]; + assertTrue(maintenanceMode("on")); + let result = moveShard("_system", c[1]._id, shard, fromServer, toServer, true, "Finished"); + assertEqual(result.statusCode, 202); + let id = JSON.parse(result.body).id; + assertTrue(cancelAgencyJob(id)); + assertTrue(maintenanceMode("off")); + var job = queryAgencyJob(result.json.id); + assertEqual(job.status, "Failed"); + assertEqual(job.abort); + assertTrue(waitForSupervision()); + checkCollectionContents(); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief kill pending moveShard job +//////////////////////////////////////////////////////////////////////////////// + + testCancelPendingMoveShard : function() { + if (useData) { + for (var i = 0; i < c.length; ++i) { + c[i].drop(); + } + c = []; + let otherNumDocuments = 100000; + createSomeCollections(1, 1, 3, useData, otherNumDocuments); + assertTrue(waitForSynchronousReplication("_system")); + var servers = findCollectionServers("_system", c[0].name()); + var fromServer = servers[0]; + var toServer = findServerNotOnList(servers); + var cinfo = global.ArangoClusterInfo.getCollectionInfo( + "_system", c[0].name()); + var shard = Object.keys(cinfo.shards)[0]; + let result = moveShard("_system", c[0]._id, shard, fromServer, toServer, true, "Finished"); + assertEqual(result.statusCode, 202); + let id = JSON.parse(result.body).id; + while (queryAgencyJob(result.json.id).status === "ToDo") { // wait for job to start + wait(0.1); + } + assertTrue(cancelAgencyJob(id)); + while (queryAgencyJob(result.json.id).status === "Pending") { // wait for job to be killed + wait(0.1); + } + var job = queryAgencyJob(result.json.id); + assertEqual(job.status, "Failed"); + assertEqual(job.abort); + assertTrue(waitForSupervision()); + checkCollectionContents(otherNumDocuments); + } + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief kill todo cleanOutServer job +//////////////////////////////////////////////////////////////////////////////// + + testCancelToDoCleanOutServer : function() { + createSomeCollections(1, 1, 3, useData); + assertTrue(waitForSynchronousReplication("_system")); + var servers = findCollectionServers("_system", c[1].name()); + var fromServer = servers[0]; + var cinfo = global.ArangoClusterInfo.getCollectionInfo( + "_system", c[1].name()); + var shard = Object.keys(cinfo.shards)[0]; + assertTrue(maintenanceMode("on")); + let result = cleanOutServer(servers[0], true); + assertEqual(result.statusCode, 202); + let id = JSON.parse(result.body).id; + assertTrue(cancelAgencyJob(id)); + assertTrue(maintenanceMode("off")); + var job = queryAgencyJob(result.json.id); + assertEqual(job.status, "Failed"); + assertEqual(job.abort); + assertTrue(waitForSupervision()); + checkCollectionContents(); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief kill pending moveShard job +//////////////////////////////////////////////////////////////////////////////// + + testCancelPendingCleanOutServer : function() { + if (useData) { + for (var i = 0; i < c.length; ++i) { + c[i].drop(); + } + c = []; + let otherNumDocuments = 100000; + createSomeCollections(1, 1, 3, useData, otherNumDocuments); + assertTrue(waitForSynchronousReplication("_system")); + var servers = findCollectionServers("_system", c[0].name()); + var fromServer = servers[0]; + var cinfo = global.ArangoClusterInfo.getCollectionInfo( + "_system", c[0].name()); + var shard = Object.keys(cinfo.shards)[0]; + let result = cleanOutServer(fromServer, true); + assertEqual(result.statusCode, 202); + let id = JSON.parse(result.body).id; + while (queryAgencyJob(result.json.id).status === "ToDo") { // wait for job to start + wait(0.1); + } + assertTrue(cancelAgencyJob(id)); + while (queryAgencyJob(result.json.id).status === "Pending") { // wait for job to be killed + wait(0.1); + } + var job = queryAgencyJob(result.json.id); + assertEqual(job.status, "Failed"); + assertEqual(job.abort); + assertTrue(waitForSupervision()); + checkCollectionContents(otherNumDocuments); + } + }, + }; }