8000 added a TTL for Pregel conductors, plus GC by jsteemann · Pull Request #14311 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

added a TTL for Pregel conductors, plus GC #14311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
devel
-----

* Added garbage collection for finished and failed Pregel conductors.
Previously, Pregel executions that finished successfully or unsuccessfully
remained in memory until being explicitly canceled. This prevented a
cleanup of abandoned jobs. Such jobs are now automatically cleaned
about 10 minutes after finalization. The time-to-live values can be
overriden per Pregel job by passing a "ttl" value.

* Revive startup parameter `--server.session-timeout` to control the timeout
for web interface sessions and other sessions that are based on JWTs created
by the `/_open/auth` API.
Expand Down
81 changes: 64 additions & 17 deletions arangod/Pregel/Conductor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "Pregel/Recovery.h"
#include "Pregel/Utils.h"

#include "Agency/TimeString.h"
#include "Basics/FunctionUtils.h"
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h"
Expand Down Expand Up @@ -67,6 +68,7 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase,
std::string const& algoName, VPackSlice const& config,
PregelFeature& feature)
: _feature(feature),
_created(std::chrono::system_clock::now()),
_vocbaseGuard(vocbase),
_executionNumber(executionNumber),
_algorithm(AlgoRegistry::createAlgorithm(vocbase.server(), algoName, config)),
Expand Down Expand Up @@ -116,6 +118,11 @@ Conductor::Conductor(uint64_t executionNumber, TRI_vocbase_t& vocbase,
if (!_storeResults) {
LOG_TOPIC("f3817", DEBUG, Logger::PREGEL) << "Will keep results in-memory";
}

// time-to-live for finished/failed Pregel jobs before garbage collection.
// default timeout is 10 minutes for each conductor
uint64_t ttl = 600;
_ttl = std::chrono::seconds(VelocyPackHelper::getNumericValue(config, "ttl", ttl));
}

Conductor::~Conductor() {
Expand All @@ -136,13 +143,13 @@ void Conductor::start() {
_endTimeSecs = _startTimeSecs;

_globalSuperstep = 0;
_state = ExecutionState::RUNNING;
updateState(ExecutionState::RUNNING);

LOG_TOPIC("3a255", DEBUG, Logger::PREGEL)
<< "Telling workers to load the data";
auto res = _initializeWorkers(Utils::startExecutionPath, VPackSlice());
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::CANCELED;
updateState(ExecutionState::CANCELED);
LOG_TOPIC("30171", ERR, Logger::PREGEL)
<< "Not all DBServers started the execution";
}
Expand Down Expand Up @@ -187,7 +194,7 @@ bool Conductor::_startGlobalStep() {
});

if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
updateState(ExecutionState::IN_ERROR);
LOG_TOPIC("04189", ERR, Logger::PREGEL)
<< "Seems there is at least one worker out of order";
// the recovery mechanisms should take care of this
Expand Down Expand Up @@ -235,10 +242,10 @@ bool Conductor::_startGlobalStep() {
if (!proceed || done || _globalSuperstep >= _maxSuperstep) {
// tells workers to store / discard results
if (_storeResults) {
_state = ExecutionState::STORING;
updateState(ExecutionState::STORING);
_finalizeWorkers();
} else { // just stop the timer
_state = _inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE;
updateState(_inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE);
_endTimeSecs = TRI_microtime();
LOG_TOPIC("9e82c", INFO, Logger::PREGEL)
<< "Done, execution took: " << totalRuntimeSecs() << " s";
Expand All @@ -253,7 +260,7 @@ bool Conductor::_startGlobalStep() {
_masterContext->_edgeCount = _totalEdgesCount;
_masterContext->_reports = &_reports;
if (!_masterContext->preGlobalSuperstepWithResult()) {
_state = ExecutionState::FATAL_ERROR;
updateState(ExecutionState::FATAL_ERROR);
_endTimeSecs = TRI_microtime();
return false;
}
Expand Down Expand Up @@ -282,7 +289,7 @@ bool Conductor::_startGlobalStep() {
// start vertex level operations, does not get a response
auto res = _sendToAllDBServers(Utils::startGSSPath, b); // call me maybe
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
updateState(ExecutionState::IN_ERROR);
LOG_TOPIC("f34bb", ERR, Logger::PREGEL)
<< "Conductor could not start GSS " << _globalSuperstep;
// the recovery mechanisms should take care od this
Expand Down Expand Up @@ -449,7 +456,7 @@ void Conductor::finishedRecoveryStep(VPackSlice const& data) {
b.close();
res = _sendToAllDBServers(Utils::finalizeRecoveryPath, b);
if (res == TRI_ERROR_NO_ERROR) {
_state = ExecutionState::RUNNING;
updateState(ExecutionState::RUNNING);
_startGlobalStep();
}
}
Expand All @@ -466,7 +473,7 @@ void Conductor::cancel() {

void Conductor::cancelNoLock() {
_callbackMutex.assertLockedByCurrentThread();
_state = ExecutionState::CANCELED;
updateState(ExecutionState::CANCELED);
bool ok = basics::function_utils::retryUntilTimeout(
[this]() -> bool { return (_finalizeWorkers() != TRI_ERROR_QUEUE_FULL); },
Logger::PREGEL, "cancel worker execution");
Expand All @@ -490,7 +497,7 @@ void Conductor::startRecovery() {

// we lost a DBServer, we need to reconfigure all remainging servers
// so they load the data for the lost machine
_state = ExecutionState::RECOVERING;
updateState(ExecutionState::RECOVERING);
_statistics.reset();

TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
Expand Down Expand Up @@ -788,7 +795,7 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) {
// do not swap an error state to done
bool didStore = false;
if (_state == ExecutionState::STORING) {
_state = _inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE;
updateState(_inErrorAbort ? ExecutionState::FATAL_ERROR : ExecutionState::DONE);
didStore = true;
}
_endTimeSecs = TRI_microtime(); // offically done
Expand Down Expand Up @@ -837,6 +844,25 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) {
}
}

bool Conductor::canBeGarbageCollected() const {
// we don't want to block other operations for longer, so if we can't immediately
// acuqire the mutex here, we assume a conductor cannot be garbage-collected.
// the same conductor will be probed later anyway, so we should be fine
TRY_MUTEX_LOCKER(guard, _callbackMutex);

if (guard.isLocked()) {
if (_state == ExecutionState::CANCELED ||
_state == ExecutionState::DONE ||
_state == ExecutionState::IN_ERROR ||
_state == ExecutionState::FATAL_ERROR) {
return (_expires != std::chrono::system_clock::time_point{} &&
_expires <= std::chrono::system_clock::now());
}
}

return false;
}

void Conductor::collectAQLResults(VPackBuilder& outBuilder, bool withId) {
MUTEX_LOCKER(guard, _callbackMutex);

Expand All @@ -863,11 +889,20 @@ void Conductor::collectAQLResults(VPackBuilder& outBuilder, bool withId) {
}
}

VPackBuilder Conductor::toVelocyPack() const {
void Conductor::toVelocyPack(VPackBuilder& result) const {
MUTEX_LOCKER(guard, _callbackMutex);

VPackBuilder result;
result.openObject();
result.add("id", VPackValue(std::to_string(_executionNumber)));
result.add("database", VPackValue(_vocbaseGuard.database().name()));
if (_algorithm != nullptr) {
result.add("algorithm", VPackValue(_algorithm->name()));
}
result.add("created", VPackValue(timepointToString(_created)));
if (_expires != std::chrono::system_clock::time_point{}) {
result.add("expires", VPackValue(timepointToString(_expires)));
}
result.add("ttl", VPackValue(_ttl.count()));
result.add("state", VPackValue(pregel::ExecutionStateNames[_state]));
result.add("gss", VPackValue(_globalSuperstep));
result.add("totalRuntime", VPackValue(totalRuntimeSecs()));
Expand All @@ -884,13 +919,15 @@ VPackBuilder Conductor::toVelocyPack() const {
result.add("vertexCount", VPackValue(_totalVerticesCount));
result.add("edgeCount", VPackValue(_totalEdgesCount));
}
result.add("parallelism", _userParams.slice().get(Utils::parallelismKey));
VPackSlice p = _userParams.slice().get(Utils::parallelismKey);
if (!p.isNone()) {
result.add("parallelism", p);
}
if (_masterContext) {
VPackObjectBuilder ob(&result, "masterContext");
_masterContext->serializeValues(result);
}
result.close();
return result;
}

ErrorCode Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const& message) {
Expand All @@ -903,7 +940,7 @@ ErrorCode Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder c
_respondedServers.clear();

// to support the single server case, we handle it without optimizing it
if (ServerState::instance()->isRunningInCluster() == false) {
if (!ServerState::instance()->isRunningInCluster()) {
if (handle) {
VPackBuilder response;
_feature.handleWorkerRequest(_vocbaseGuard.database(), path, message.slice(), response);
Expand All @@ -925,7 +962,7 @@ ErrorCode Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder c
return TRI_ERROR_NO_ERROR;
} F438

if (_dbServers.size() == 0) {
if (_dbServers.empty()) {
LOG_TOPIC("a14fa", WARN, Logger::PREGEL) << "No servers registered";
return TRI_ERROR_FAILED;
}
Expand Down Expand Up @@ -999,3 +1036,13 @@ std::vector<ShardID> Conductor::getShardIds(ShardID const& collection) const {

return result;
}

void Conductor::updateState(ExecutionState state) {
_state = state;
if (_state == ExecutionState::CANCELED ||
_state == ExecutionState::DONE ||
_state == ExecutionState::IN_ERROR ||
_state == ExecutionState::FATAL_ERROR) {
_expires = std::chrono::system_clock::now() + _ttl;
}
}
10 changes: 9 additions & 1 deletion arangod/Pregel/Conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include "Scheduler/Scheduler.h"
#include "Utils/DatabaseGuard.h"

#include <chrono>

namespace arangodb {
namespace pregel {

Expand Down Expand Up @@ -63,6 +65,9 @@ class Conductor : public std::enable_shared_from_this<Conductor> {

ExecutionState _state = ExecutionState::DEFAULT;
PregelFeature& _feature;
std::chrono::system_clock::time_point _created;
std::chrono::system_clock::time_point _expires;
std::chrono::seconds _ttl = std::chrono::seconds(300);
const DatabaseGuard _vocbaseGuard;
const uint64_t _executionNumber;
VPackBuilder _userParams;
Expand Down Expand Up @@ -139,14 +144,17 @@ class Conductor : public std::enable_shared_from_this<Conductor> {
void cancel();
void startRecovery();
void collectAQLResults(velocypack::Builder& outBuilder, bool withId);
VPackBuilder toVelocyPack() const;
void toVelocyPack(arangodb::velocypack::Builder& result) const;

double totalRuntimeSecs() const {
return _endTimeSecs == 0.0 ? TRI_microtime() - _startTimeSecs : _endTimeSecs - _startTimeSecs;
}

bool canBeGarbageCollected() const;

private:
void cancelNoLock();
void updateState(ExecutionState state);
};
} // namespace pregel
} // namespace arangodb
Loading
0