From ce911e07086ea1ee7c24002666ffff5e1cdb167d Mon Sep 17 00:00:00 2001 From: jsteemann Date: Wed, 2 Jun 2021 00:11:58 +0200 Subject: [PATCH 1/2] added a TTL for Pregel conductors, plus GC --- CHANGELOG | 8 +++++ arangod/Pregel/Conductor.cpp | 56 +++++++++++++++++++++++------ arangod/Pregel/Conductor.h | 7 ++++ arangod/Pregel/PregelFeature.cpp | 61 +++++++++++++++++++++++--------- arangod/Pregel/PregelFeature.h | 22 ++++++++++-- 5 files changed, 123 insertions(+), 31 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 2d67631d6c39..ad6a2d48f70e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,13 @@ devel ----- + +* Added garbage collection for finished and failed Pregel conductors. + Previously, Pregel executions that finished successfully or unsuccessfully + remained in memory until being explicitly canceled. This prevented a + cleanup of abandoned jobs. Such jobs are now automatically cleaned + about 10 minutes after finalization. The time-to-live values can be + overriden per Pregel job by passing a "ttl" value. + * Added check for data type compatibility between members of pipeline ArangoSearch analyzer. diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 767d34901c98..8832ff3a2645 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -116,6 +116,11 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase, if (!_storeResults) { LOG_TOPIC("f3817", DEBUG, Logger::PREGEL) << "Will keep results in-memory"; } + + // time-to-live for finished/failed Pregel jobs before garbage collection. + // default timeout is 10 minutes for each conductor + uint64_t ttl = 600; + _ttl = std::chrono::seconds(VelocyPackHelper::getNumericValue(config, "ttl", ttl)); } Conductor::~Conductor() { @@ -136,13 +141,13 @@ void Conductor::start() { _endTimeSecs = _startTimeSecs; _globalSuperstep = 0; - _state = ExecutionState::RUNNING; + updateState(ExecutionState::RUNNING); LOG_TOPIC("3a255", DEBUG, Logger::PREGEL) << "Telling workers to load the data"; auto res = _initializeWorkers(Utils::startExecutionPath, VPackSlice()); if (res != TRI_ERROR_NO_ERROR) { - _state = ExecutionState::CANCELED; + updateState(ExecutionState::CANCELED); LOG_TOPIC("30171", ERR, Logger::PREGEL) << "Not all DBServers started the execution"; } @@ -187,7 +192,7 @@ bool Conductor::_startGlobalStep() { }); if (res != TRI_ERROR_NO_ERROR) { - _state = ExecutionState::IN_ERROR; + updateState(ExecutionState::IN_ERROR); LOG_TOPIC("04189", ERR, Logger::PREGEL) << "Seems there is at least one worker out of order"; // the recovery mechanisms should take care of this @@ -235,10 +240,10 @@ bool Conductor::_startGlobalStep() { if (!proceed || done || _globalSuperstep >= _maxSuperstep) { // tells workers to store / discard results if (_storeResults) { - _state = ExecutionState::STORING; + updateState(ExecutionState::STORING); _finalizeWorkers(); } else { // just stop the timer - _state = _inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE; + updateState(_inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE); _endTimeSecs = TRI_microtime(); LOG_TOPIC("9e82c", INFO, Logger::PREGEL) << "Done, execution took: " << totalRuntimeSecs() << " s"; @@ -253,7 +258,7 @@ bool Conductor::_startGlobalStep() { _masterContext->_edgeCount = _totalEdgesCount; _masterContext->_reports = &_reports; if (!_masterContext->preGlobalSuperstepWithResult()) { - _state = ExecutionState::FATAL_ERROR; + updateState(ExecutionState::FATAL_ERROR); _endTimeSecs = TRI_microtime(); return false; } @@ -282,7 +287,7 @@ bool Conductor::_startGlobalStep() { // start vertex level operations, does not get a response auto res = _sendToAllDBServers(Utils::startGSSPath, b); // call me maybe if (res != TRI_ERROR_NO_ERROR) { - _state = ExecutionState::IN_ERROR; + updateState(ExecutionState::IN_ERROR); LOG_TOPIC("f34bb", ERR, Logger::PREGEL) << "Conductor could not start GSS " << _globalSuperstep; // the recovery mechanisms should take care od this @@ -449,7 +454,7 @@ void Conductor::finishedRecoveryStep(VPackSlice const& data) { b.close(); res = _sendToAllDBServers(Utils::finalizeRecoveryPath, b); if (res == TRI_ERROR_NO_ERROR) { - _state = ExecutionState::RUNNING; + updateState(ExecutionState::RUNNING); _startGlobalStep(); } } @@ -466,7 +471,7 @@ void Conductor::cancel() { void Conductor::cancelNoLock() { _callbackMutex.assertLockedByCurrentThread(); - _state = ExecutionState::CANCELED; + updateState(ExecutionState::CANCELED); bool ok = basics::function_utils::retryUntilTimeout( [this]() -> bool { return (_finalizeWorkers() != TRI_ERROR_QUEUE_FULL); }, Logger::PREGEL, "cancel worker execution"); @@ -490,7 +495,7 @@ void Conductor::startRecovery() { // we lost a DBServer, we need to reconfigure all remainging servers // so they load the data for the lost machine - _state = ExecutionState::RECOVERING; + updateState(ExecutionState::RECOVERING); _statistics.reset(); TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); @@ -788,7 +793,7 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) { // do not swap an error state to done bool didStore = false; if (_state == ExecutionState::STORING) { - _state = _inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE; + updateState(_inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE); didStore = true; } _endTimeSecs = TRI_microtime(); // offically done @@ -837,6 +842,25 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) { } } +bool Conductor::canBeGarbageCollected() const { + // we don't want to block other operations for longer, so if we can't immediately + // acuqire the mutex here, we assume a conductor cannot be garbage-collected. + // the same conductor will be probed later anyway, so we should be fine + TRY_MUTEX_LOCKER(guard, _callbackMutex); + + if (guard.isLocked()) { + if (_state == ExecutionState::CANCELED || + _state == ExecutionState::DONE || + _state == ExecutionState::IN_ERROR || + _state == ExecutionState::FATAL_ERROR) { + return (_expires != std::chrono::steady_clock::time_point{} && + _expires <= std::chrono::steady_clock::now()); + } + } + + return false; +} + void Conductor::collectAQLResults(VPackBuilder& outBuilder, bool withId) { MUTEX_LOCKER(guard, _callbackMutex); @@ -999,3 +1023,13 @@ std::vector Conductor::getShardIds(ShardID const& collection) const { return result; } + +void Conductor::updateState(ExecutionState state) { + _state = state; + if (_state == ExecutionState::CANCELED || + _state == ExecutionState::DONE || + _state == ExecutionState::IN_ERROR || + _state == ExecutionState::FATAL_ERROR) { + _expires = std::chrono::steady_clock::now() + _ttl; + } +} diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index d04a766fa533..76d0f71651b5 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -34,6 +34,8 @@ #include "Scheduler/Scheduler.h" #include "Utils/DatabaseGuard.h" +#include + namespace arangodb { namespace pregel { @@ -63,6 +65,8 @@ class Conductor : public std::enable_shared_from_this { ExecutionState _state = ExecutionState::DEFAULT; PregelFeature& _feature; + std::chrono::steady_clock::time_point _expires; + std::chrono::seconds _ttl = std::chrono::seconds(300); const DatabaseGuard _vocbaseGuard; const uint64_t _executionNumber; VPackBuilder _userParams; @@ -145,8 +149,11 @@ class Conductor : public std::enable_shared_from_this { return _endTimeSecs == 0.0 ? TRI_microtime() - _startTimeSecs : _endTimeSecs - _startTimeSecs; } + bool canBeGarbageCollected() const; + private: void cancelNoLock(); + void updateState(ExecutionState state); }; } // namespace pregel } // namespace arangodb diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index 9e1e9fd28097..df37f0f3c43a 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -53,14 +53,6 @@ bool authorized(std::string const& user) { return (user == exec.user()); } -bool authorized(std::pair> const& conductor) { - return ::authorized(conductor.first); -} - -bool authorized(std::pair> const& worker) { - return ::authorized(worker.first); -} - /// @brief custom deleter for the PregelFeature. /// it does nothing, i.e. doesn't delete it. This is because the ApplicationServer /// is managing the PregelFeature, but we need a shared_ptr to it here as well. The @@ -201,7 +193,7 @@ std::pair PregelFeature::startExecution( return std::make_pair(Result{TRI_ERROR_INTERNAL}, 0); } } - + uint64_t en = createExecutionNumber(); auto c = std::make_shared(en, vocbase, vertexCollections, edgeColls, edgeCollectionRestrictions, @@ -233,25 +225,49 @@ size_t PregelFeature::availableParallelism() { return procNum < 1 ? 1 : procNum; } -void PregelFeature::start() { - if (ServerState::instance()->isAgent()) { +void PregelFeature::scheduleGarbageCollection() { + if (isStopping()) { return; } + std::chrono::seconds offset = std::chrono::seconds(10); + + TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); + Scheduler* scheduler = SchedulerFeature::SCHEDULER; + auto [queued, handle] = scheduler->queueDelay(RequestLane::INTERNAL_LOW, offset, [this](bool canceled) { + if (!canceled) { + garbageCollectConductors(); + scheduleGarbageCollection(); + } + }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No thread available to queue Pregel garbage collection"); + } + + _gcHandle = std::move(handle); +} + +void PregelFeature::start() { if (ServerState::instance()->isCoordinator()) { auto& ci = server().getFeature().clusterInfo(); _recoveryManager = std::make_unique(ci); _recoveryManagerPtr.store(_recoveryManager.get(), std::memory_order_release); } + + if (!ServerState::instance()->isAgent()) { + scheduleGarbageCollection(); + } } void PregelFeature::beginShutdown() { TRI_ASSERT(isStopping()); + _gcHandle.reset(); MUTEX_LOCKER(guard, _mutex); // cancel all conductors and workers for (auto& it : _conductors) { - it.second.second->cancel(); + it.second.conductor->cancel(); } for (auto it : _workers) { it.second.second->cancelGlobalStep(VPackSlice()); @@ -270,7 +286,7 @@ void PregelFeature::unprepare() { // to conductors and workers been dropped! #ifdef ARANGODB_ENABLE_MAINTAINER_MODE for (auto& it : cs) { - TRI_ASSERT(it.second.second.use_count() == 1); + TRI_ASSERT(it.second.conductor.use_count() == 1); } for (auto it : _workers) { @@ -290,14 +306,25 @@ void PregelFeature::addConductor(std::shared_ptr&& c, uint64_t execut std::string user = ExecContext::current().user(); MUTEX_LOCKER(guard, _mutex); - _conductors.try_emplace(executionNumber, - std::move(user), std::move(c)); + _conductors.try_emplace(executionNumber, ConductorEntry{ std::move(user), std::chrono::steady_clock::time_point{}, std::move(c) }); } std::shared_ptr PregelFeature::conductor(uint64_t executionNumber) { MUTEX_LOCKER(guard, _mutex); auto it = _conductors.find(executionNumber); - return (it != _conductors.end() && ::authorized(it->second)) ? it->second.second : nullptr; + return (it != _conductors.end() && ::authorized(it->second.user)) ? it->second.conductor : nullptr; +} + +void PregelFeature::garbageCollectConductors() { + // iterate over all conductors and remove the ones which can be garbage-collected + MUTEX_LOCKER(guard, _mutex); + for (auto it = _conductors.begin(); it != _conductors.end(); /*no hoisting*/) { + if (it->second.conductor->canBeGarbageCollected()) { + it = _conductors.erase(it); + } else { + ++it; + } + } } void PregelFeature::addWorker(std::shared_ptr&& w, uint64_t executionNumber) { @@ -314,7 +341,7 @@ void PregelFeature::addWorker(std::shared_ptr&& w, uint64_t executionNu std::shared_ptr PregelFeature::worker(uint64_t executionNumber) { MUTEX_LOCKER(guard, _mutex); auto it = _workers.find(executionNumber); - return (it != _workers.end() && ::authorized(it->second)) ? it->second.second : nullptr; + return (it != _workers.end() && ::authorized(it->second.first)) ? it->second.second : nullptr; } void PregelFeature::cleanupConductor(uint64_t executionNumber) { diff --git a/arangod/Pregel/PregelFeature.h b/arangod/Pregel/PregelFeature.h index 1a31bd2613ea..318ee3546c54 100644 --- a/arangod/Pregel/PregelFeature.h +++ b/arangod/Pregel/PregelFeature.h @@ -24,8 +24,11 @@ #pragma once #include +#include #include +#include #include +#include #include #include @@ -34,6 +37,7 @@ #include "ApplicationFeatures/ApplicationFeature.h" #include "Basics/Common.h" #include "Basics/Mutex.h" +#include "Scheduler/Scheduler.h" struct TRI_vocbase_t; @@ -69,6 +73,8 @@ class PregelFeature final : public application_features::ApplicationFeature { void addConductor(std::shared_ptr&&, uint64_t executionNumber); std::shared_ptr conductor(uint64_t executionNumber); + void garbageCollectConductors(); + void addWorker(std::shared_ptr&&, uint64_t executionNumber); std::shared_ptr worker(uint64_t executionNumber); @@ -80,11 +86,13 @@ class PregelFeature final : public application_features::ApplicationFeature { } void handleConductorRequest(TRI_vocbase_t& vocbase, std::string const& path, - VPackSlice const& body, VPackBuilder& outResponse); + VPackSlice const& body, VPackBuilder& outResponse); void handleWorkerRequest(TRI_vocbase_t& vocbase, std::string const& path, - VPackSlice const& body, VPackBuilder& outBuilder); + VPackSlice const& body, VPackBuilder& outBuilder); private: + void scheduleGarbageCollection(); + Mutex _mutex; std::unique_ptr _recoveryManager; /// @brief _recoveryManagerPtr always points to the same object as _recoveryManager, but allows @@ -93,8 +101,16 @@ class PregelFeature final : public application_features::ApplicationFeature { /// pointer. This only works because _recoveryManager is only initialzed once and lives until the /// owning PregelFeature instance is also destroyed. std::atomic _recoveryManagerPtr{nullptr}; + + Scheduler::WorkHandle _gcHandle; + + struct ConductorEntry { + std::string user; + std::chrono::steady_clock::time_point expires; + std::shared_ptr conductor; + }; - std::unordered_map>> _conductors; + std::unordered_map _conductors; std::unordered_map>> _workers; }; From 11e7d47dd0259b9a08c90b408d89c1ae8437dc89 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 29 Jun 2021 16:13:18 +0200 Subject: [PATCH 2/2] make pregel tasks GET API load-balancing aware --- arangod/Pregel/Conductor.cpp | 7 +- arangod/Pregel/PregelFeature.cpp | 83 ++++++++++++++++++- arangod/Pregel/PregelFeature.h | 3 +- .../RestHandler/RestControlPregelHandler.cpp | 5 +- arangod/V8Server/v8-collection.cpp | 2 +- .../load-balancing-pregel-noauth-cluster.js | 62 +++++++++++++- 6 files changed, 152 insertions(+), 10 deletions(-) diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 29127ca926bc..b00d47dd10f8 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -899,7 +899,7 @@ void Conductor::toVelocyPack(VPackBuilder& result) const { result.add("algorithm", VPackValue(_algorithm->name())); } result.add("created", VPackValue(timepointToString(_created))); - if (_state != ExecutionState::DEFAULT) { + if (_expires != std::chrono::system_clock::time_point{}) { result.add("expires", VPackValue(timepointToString(_expires))); } result.add("ttl", VPackValue(_ttl.count())); @@ -919,7 +919,10 @@ void Conductor::toVelocyPack(VPackBuilder& result) const { result.add("vertexCount", VPackValue(_totalVerticesCount)); result.add("edgeCount", VPackValue(_totalEdgesCount)); } - result.add("parallelism", _userParams.slice().get(Utils::parallelismKey)); + VPackSlice p = _userParams.slice().get(Utils::parallelismKey); + if (!p.isNone()) { + result.add("parallelism", p); + } if (_masterContext) { VPackObjectBuilder ob(&result, "masterContext"); _masterContext->serializeValues(result); diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index 1bc0f813a4eb..a3d0acf4e73f 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -33,6 +33,9 @@ #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" #include "FeaturePhases/V8FeaturePhase.h" +#include "GeneralServer/AuthenticationFeature.h" +#include "Network/Methods.h" +#include "Network/NetworkFeature.h" #include "Pregel/AlgoRegistry.h" #include "Pregel/Conductor.h" #include "Pregel/Recovery.h" @@ -44,6 +47,9 @@ #include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" +using namespace arangodb; +using namespace arangodb::pregel; + namespace { bool authorized(std::string const& user) { auto const& exec = arangodb::ExecContext::current(); @@ -53,6 +59,17 @@ bool authorized(std::string const& user) { return (user == exec.user()); } +network::Headers buildHeaders() { + auto auth = AuthenticationFeature::instance(); + + network::Headers headers; + if (auth != nullptr && auth->isActive()) { + headers.try_emplace(StaticStrings::Authorization, + "bearer " + auth->tokenCache().jwtToken()); + } + return headers; +} + /// @brief custom deleter for the PregelFeature. /// it does nothing, i.e. doesn't delete it. This is because the ApplicationServer /// is managing the PregelFeature, but we need a shared_ptr to it here as well. The @@ -64,9 +81,6 @@ struct NonDeleter { } // namespace -using namespace arangodb; -using namespace arangodb::pregel; - std::pair PregelFeature::startExecution( TRI_vocbase_t& vocbase, std::string algorithm, std::vector const& vertexCollections, @@ -499,7 +513,11 @@ uint64_t PregelFeature::numberOfActiveConductors() const { return nr; } -void PregelFeature::toVelocyPack(arangodb::velocypack::Builder& result) const { +Result PregelFeature::toVelocyPack(TRI_vocbase_t& vocbase, + arangodb::velocypack::Builder& result, + bool allDatabases, bool fanout) const { + Result res; + result.openArray(); { MUTEX_LOCKER(guard, _mutex); @@ -513,5 +531,62 @@ void PregelFeature::toVelocyPack(arangodb::velocypack::Builder& result) const { ce.conductor->toVelocyPack(result); } } + + if (ServerState::instance()->isCoordinator() && fanout) { + // coordinator case, fan out to other coordinators! + NetworkFeature const& nf = vocbase.server().getFeature(); + network::ConnectionPool* pool = nf.pool(); + if (pool == nullptr) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } + + std::vector futures; + + network::RequestOptions options; + options.timeout = network::Timeout(30.0); + options.database = vocbase.name(); + options.param("local", "true"); + options.param("all", allDatabases ? "true" : "false"); + + std::string const url = "/_api/control_pregel"; + + auto& ci = vocbase.server().getFeature().clusterInfo(); + for (auto const& coordinator : ci.getCurrentCoordinators()) { + if (coordinator == ServerState::instance()->getId()) { + // ourselves! + continue; + } + + auto f = network::sendRequestRetry(pool, "server:" + coordinator, fuerte::RestVerb::Get, + url, VPackBuffer{}, options, ::buildHeaders()); + futures.emplace_back(std::move(f)); + } + + if (!futures.empty()) { + auto responses = futures::collectAll(futures).get(); + for (auto const& it : responses) { + auto& resp = it.get(); + res.reset(resp.combinedResult()); + if (res.is(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND)) { + // it is expected in a multi-coordinator setup that a coordinator is not + // aware of a database that was created very recently. + res.reset(); + } + if (res.fail()) { + break; + } + auto slice = resp.slice(); + // copy results from other coordinators + if (slice.isArray()) { + for (auto const& entry : VPackArrayIterator(slice)) { + result.add(entry); + } + } + } + } + } + result.close(); + + return res; } diff --git a/arangod/Pregel/PregelFeature.h b/arangod/Pregel/PregelFeature.h index a5cf2b123a06..406f6ae87669 100644 --- a/arangod/Pregel/PregelFeature.h +++ b/arangod/Pregel/PregelFeature.h @@ -95,7 +95,8 @@ class PregelFeature final : public application_features::ApplicationFeature { _softShutdownOngoing.store(true, std::memory_order_relaxed); } - void toVelocyPack(arangodb::velocypack::Builder& result) const; + Result toVelocyPack(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result, + bool allDatabases, bool fanout) const; private: void scheduleGarbageCollection(); diff --git a/arangod/RestHandler/RestControlPregelHandler.cpp b/arangod/RestHandler/RestControlPregelHandler.cpp index 6b913dcf9aad..035f81e89f7c 100644 --- a/arangod/RestHandler/RestControlPregelHandler.cpp +++ b/arangod/RestHandler/RestControlPregelHandler.cpp @@ -190,8 +190,11 @@ void RestControlPregelHandler::getExecutionStatus() { std::vector const& suffixes = _request->decodedSuffixes(); if (suffixes.empty()) { + bool const allDatabases = _request->parsedValue("all", false); + bool const fanout = ServerState::instance()->isCoordinator() && !_request->parsedValue("local", false); + VPackBuilder builder; - _pregel.toVelocyPack(builder); + _pregel.toVelocyPack(_vocbase, builder, allDatabases, fanout); generateResult(rest::ResponseCode::OK, builder.slice()); return; } diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index a573ec0d2ee0..3293d8b33488 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1716,7 +1716,7 @@ static void JS_PregelStatus(v8::FunctionCallbackInfo const& args) { // check the arguments uint32_t const argLength = args.Length(); if (argLength == 0) { - pregel.toVelocyPack(builder); + pregel.toVelocyPack(vocbase, builder, false, true); TRI_V8_RETURN(TRI_VPackToV8(isolate, builder.slice())); return; } diff --git a/tests/js/client/load-balancing/load-balancing-pregel-noauth-cluster.js b/tests/js/client/load-balancing/load-balancing-pregel-noauth-cluster.js index c80fb4bc51d1..f2f4cf0e04b5 100644 --- a/tests/js/client/load-balancing/load-balancing-pregel-noauth-cluster.js +++ b/tests/js/client/load-balancing/load-balancing-pregel-noauth-cluster.js @@ -1,5 +1,5 @@ /* jshint globalstrict:true, strict:true, maxlen: 5000 */ -/* global assertTrue, assertFalse, assertEqual, require*/ +/* global assertTrue, assertFalse, assertEqual, assertNotEqual, require*/ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER @@ -117,6 +117,66 @@ function PregelSuite () { cs = []; coordinators = []; }, + + testPregelAllJobs: function() { + let url = baseUrl; + const task = { + algorithm: "pagerank", + vertexCollections: ["vertices"], + edgeCollections: ["edges"], + params: { + resultField: "result", + store: false + }, + store: false + }; + let ids = []; + for (let i = 0; i < 5; ++i) { + let result = sendRequest('POST', url, task, i % 2 === 0); + + assertFalse(result === undefined || result === {}); + assertEqual(result.status, 200); + assertTrue(result.body > 1); + ids.push(result.body); + } + assertEqual(5, ids.length); + + let result = sendRequest('GET', url, {}, false); + assertEqual(result.status, 200); + assertEqual(5, result.body.length); + result.body.forEach((task) => { + assertEqual("PageRank", task.algorithm); + assertEqual(db._name(), task.database); + assertNotEqual(-1, ids.indexOf(task.id)); + }); + + ids.forEach((id) => { + const taskId = id; + url = `${baseUrl}/${taskId}`; + result = sendRequest('GET', url, {}, false); + + assertFalse(result === undefined || result === {}); + assertEqual(result.status, 200); + assertTrue(result.body.state === 'running' || result.body.state === 'done'); + }); + + require('internal').wait(5.0, false); + + ids.forEach((id) => { + const taskId = id; + url = `${baseUrl}/${taskId}`; + result = sendRequest('DELETE', url, {}, {}, false); + + assertFalse(result === undefined || result === {}); + assertFalse(result.body.error); + assertEqual(result.status, 200); + }); + + url = baseUrl; + result = sendRequest('GET', url, {}, false); + assertEqual(result.status, 200); + assertEqual([], result.body); + }, testPregelForwarding: function() { let url = baseUrl;