8000 Soft coordinator shutdown by neunhoef · Pull Request #14330 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Soft coordinator shutdown #14330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
v3.8.1 (XXXX-XX-XX)

* Add soft coordinator shutdown: This is a new option `soft=true` for the
DELETE /_admin/shutdown API. Has only meaning for coordinators, otherwise
ignored. A number of things are allowed to finish but no new things are
allowed when in soft coordinator shutdown:
- AQL cursors
- transactions
- asynchronous operations
- Pregel runs
Once all of the ongoing operations of these have finished and all requests
on the low priority queue have been executed, the coordinator shuts down
the normal way. This is supposed to make a coordinator restart less
intrusive for clients.

v3.8.0 (XXXX-XX-XX)
-------------------

Expand Down
5 changes: 0 additions & 5 deletions arangod/Aql/QueryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,6 @@ void QueryOptions::fromVelocyPack(VPackSlice slice) {
traversalProfile = static_cast<TraversalProfileLevel>(value.getNumber<uint16_t>());
}

value = slice.get("stream");
if (value.isBool()) {
stream = value.getBool();
}

value = slice.get("allPlans");
if (value.isBool()) {
allPlans = value.getBool();
Expand Down
1 change: 1 addition & 0 deletions arangod/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ set(LIB_ARANGOSERVER_SOURCES
RestServer/ScriptFeature.cpp
RestServer/ServerFeature.cpp
RestServer/ServerIdFeature.cpp
RestServer/SoftShutdownFeature.cpp
RestServer/SystemDatabaseFeature.cpp
RestServer/TtlFeature.cpp
RestServer/UpgradeFeature.cpp
Expand Down
2 changes: 2 additions & 0 deletions arangod/FeaturePhases/FinalFeaturePhase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "FeaturePhases/AgencyFeaturePhase.h"
#include "RestServer/ConsoleFeature.h"
#include "RestServer/ScriptFeature.h"
#include "RestServer/SoftShutdownFeature.h"

namespace arangodb {
namespace application_features {
Expand All @@ -39,6 +40,7 @@ FinalFeaturePhase::FinalFeaturePhase(ApplicationServer& server)
startsAfter<ConsoleFeature>();
startsAfter<ScriptFeature>();
startsAfter<ShutdownFeature>();
startsAfter<SoftShutdownFeature>();
}

} // namespace application_features
Expand Down
33 changes: 31 additions & 2 deletions arangod/GeneralServer/AsyncJobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
#include "Basics/WriteLocker.h"
#include "Basics/system-functions.h"
#include "Basics/voc-errors.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/GeneralResponse.h"
#include "RestServer/SoftShutdownFeature.h"
#include "Utils/ExecContext.h"

namespace {
Expand Down Expand Up @@ -60,7 +62,8 @@ AsyncJobResult::AsyncJobResult(IdType jobId, Status status,

AsyncJobResult::~AsyncJobResult() = default;

AsyncJobManager::AsyncJobManager() : _lock(), _jobs() {}
AsyncJobManager::AsyncJobManager()
: _lock(), _jobs(), _softShutdownOngoing(false) {}

AsyncJobManager::~AsyncJobManager() {
// remove all results that haven't been fetched
Expand Down Expand Up @@ -257,8 +260,29 @@ std::vector<AsyncJobResult::IdType> AsyncJobManager::byStatus(AsyncJobResult::St
return jobs;
}

//////////////////////////////////////////////////////////////////////////////
/// @brief get numbers of pending and done jobs.
//////////////////////////////////////////////////////////////////////////////
//
std::pair<uint64_t, uint64_t> AsyncJobManager::getNrPendingAndDone() {
uint64_t pending{0};
uint64_t done{0};
{
READ_LOCKER(readLocker, _lock);
for (auto const& j : _jobs) {
if (j.second.second._status == AsyncJobResult::JOB_PENDING) {
++pending;
} else if (j.second.second._status == AsyncJobResult::JOB_DONE) {
++done;
}
}
}
return std::pair(pending, done);
}

////////////////////////////////////////////////////////////////////////////////
/// @brief initializes an async job
/// @brief initializes an async job, throws if soft shutdown is already
/// ongoing.
////////////////////////////////////////////////////////////////////////////////

void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {
Expand All @@ -270,6 +294,11 @@ void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {

WRITE_LOCKER(writeLocker, _lock);

if (_softShutdownOngoing.load(std::memory_order_relaxed)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN,
"Soft shutdown ongoing.");
}

_jobs.try_emplace(jobId, std::move(user), std::move(ajr));
}

Expand Down
13 changes: 13 additions & 0 deletions arangod/GeneralServer/AsyncJobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,23 @@ class AsyncJobManager {
std::vector<AsyncJobResult::IdType> byStatus(AsyncJobResult::Status, size_t maxCount);
void initAsyncJob(std::shared_ptr<RestHandler>);
void finishAsyncJob(RestHandler*);
std::pair<uint64_t, uint64_t> getNrPendingAndDone();

void initiateSoftShutdown() {
_softShutdownOngoing.store(true, std::memory_order_relaxed);
}

private:
basics::ReadWriteLock _lock;
JobList _jobs;

////////////////////////////////////////////////////////////////////////////
/// @brief flag, if a soft shutdown is ongoing, this is used for the soft
/// shutdown feature in coordinators, it is initially `false` and is set
/// to true once a soft shutdown has begun.
////////////////////////////////////////////////////////////////////////////

std::atomic<bool> _softShutdownOngoing;
};
} // namespace rest
} // namespace arangodb
Expand Down
11 changes: 10 additions & 1 deletion arangod/GeneralServer/CommTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,16 @@ bool CommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,

if (jobId != nullptr) {
auto& jobManager = _server.server().getFeature<GeneralServerFeature>().jobManager();
jobManager.initAsyncJob(handler);
try {
// This will throw if a soft shutdown is already going on on a
// coordinator. But this can also throw if we have an
// out of memory situation, so we better handle this anyway.
jobManager.initAsyncJob(handler);
} catch(std::exception const& exc) {
LOG_TOPIC("fee34", INFO, Logger::STARTUP)
<< "Async job rejected, exception: " << exc.what();
return false;
}
*jobId = handler->handlerId();

// callback will persist the response with the AsyncJobManager
Expand Down
6 changes: 6 additions & 0 deletions arangod/GeneralServer/GeneralServerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ void GeneralServerFeature::start() {
}
}

void GeneralServerFeature::initiateSoftShutdown() {
if (_jobManager != nullptr) {
_jobManager->initiateSoftShutdown();
}
}

void GeneralServerFeature::beginShutdown() {
for (auto& server : _servers) {
server->stopListening();
Expand Down
1 change: 1 addition & 0 deletions arangod/GeneralServer/GeneralServerFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class GeneralServerFeature final : public application_features::ApplicationFeatu
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void initiateSoftShutdown() override final;
10000 void beginShutdown() override final;
void stop() override final;
void unprepare() override final;
Expand Down
21 changes: 18 additions & 3 deletions arangod/Pregel/PregelFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ std::pair<Result, uint64_t> PregelFeature::startExecution(
std::vector<std::string> const& edgeCollections,
std::unordered_map<std::string, std::vector<std::string>> const& edgeCollectionRestrictions,
VPackSlice const& params) {
if (isStopping()) {
if (isStopping() || _softShutdownOngoing.load(std::memory_order_relaxed)) {
return std::make_pair(Result{TRI_ERROR_SHUTTING_DOWN,
"pregel system not available"}, 0);
}
Expand Down Expand Up @@ -218,7 +218,8 @@ uint64_t PregelFeature::createExecutionNumber() {
}

PregelFeature::PregelFeature(application_features::ApplicationServer& server)
: application_features::ApplicationFeature(server, "Pregel") {
: application_features::ApplicationFeature(server, "Pregel"),
_softShutdownOngoing(false) {
setOptional(true);
startsAfter<application_features::V8FeaturePhase>();
}
Expand Down Expand Up @@ -284,7 +285,7 @@ bool PregelFeature::isStopping() const noexcept {
}

void PregelFeature::addConductor(std::shared_ptr<Conductor>&& c, uint64_t executionNumber) {
if (isStopping()) {
if (isStopping() || _softShutdownOngoing.load(std::memory_order_relaxed)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}

Expand Down Expand Up @@ -442,3 +443,17 @@ void PregelFeature::handleWorkerRequest(TRI_vocbase_t& vocbase,
w->aqlResult(outBuilder, withId);
}
}

uint64_t PregelFeature::numberOfActiveConductors() const {
MUTEX_LOCKER(guard, _mutex);
uint64_t nr{0};
for (auto const& p : _conductors) {
std::shared_ptr<Conductor> const& c = p.second.second;
if (c->_state == ExecutionState::DEFAULT ||
c->_state == ExecutionState::RUNNING ||
c->_state == ExecutionState::STORING) {
++nr;
}
}
return nr;
}
10 changes: 9 additions & 1 deletion arangod/Pregel/PregelFeature.h 57AE
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ class PregelFeature final : public application_features::ApplicationFeature {
void handleWorkerRequest(TRI_vocbase_t& vocbase, std::string const& path,
VPackSlice const& body, VPackBuilder& outBuilder);

uint64_t numberOfActiveConductors() const;

void initiateSoftShutdown() override final {
_softShutdownOngoing.store(true, std::memory_order_relaxed);
}

private:
Mutex _mutex;
mutable Mutex _mutex;
std::unique_ptr<RecoveryManager> _recoveryManager;
/// @brief _recoveryManagerPtr always points to the same object as _recoveryManager, but allows
/// the pointer to be read atomically. This is necessary because _recoveryManager is initialized
Expand All @@ -97,6 +103,8 @@ class PregelFeature final : public application_features::ApplicationFeature {

std::unordered_map<uint64_t, std::pair<std::string, std::shared_ptr<Conductor>>> _conductors;
std::unordered_map<uint64_t, std::pair<std::string, std::shared_ptr<IWorker>>> _workers;

std::atomic<bool> _softShutdownOngoing;
};

} // namespace pregel
Expand Down
2 changes: 2 additions & 0 deletions arangod/RestHandler/RestCursorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
CursorRepository* cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
_cursor = cursors->createQueryStream(std::move(query), batchSize, ttl);
// Throws if soft shutdown is ongoing!
_cursor->setWakeupHandler([self = shared_from_this()]() { return self->wakeupHandler(); });

return generateCursorResult(rest::ResponseCode::CREATED);
Expand Down Expand Up @@ -368,6 +369,7 @@ RestStatus RestCursorHandler::handleQueryResult() {
TRI_ASSERT(_queryResult.data.get() != nullptr);
// steal the query result, cursor will take over the ownership
_cursor = cursors->createFromQueryResult(std::move(_queryResult), batchSize, ttl, count);
// throws if a coordinator soft shutdown is ongoing

return generateCursorResult(rest::ResponseCode::CREATED);
}
Expand Down
29 changes: 28 additions & 1 deletion arangod/RestHandler/RestShutdownHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "Agency/AsyncAgencyComm.h"
#include "Cluster/ClusterFeature.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "RestServer/SoftShutdownFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Utils/ExecContext.h"
Expand All @@ -47,7 +48,8 @@ RestShutdownHandler::RestShutdownHandler(application_features::ApplicationServer
////////////////////////////////////////////////////////////////////////////////

RestStatus RestShutdownHandler::execute() {
if (_request->requestType() != rest::RequestType::DELETE_REQ) {
if (_request->requestType() != rest::RequestType::DELETE_REQ &&
_request->requestType() != rest::RequestType::GET) {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
return RestStatus::DONE;
}
Expand All @@ -68,6 +70,21 @@ RestStatus RestShutdownHandler::execute() {
}
}

auto const& softShutdownFeature{server().getFeature<SoftShutdownFeature>()};
auto& softShutdownTracker{softShutdownFeature.softShutdownTracker()};

if (_request->requestType() == rest::RequestType::GET) {
if (!ServerState::instance()->isCoordinator()) {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
return RestStatus::DONE;
}
VPackBuilder builder;
softShutdownTracker.toVelocyPack(builder);
generateResult(rest::ResponseCode::OK, builder.slice());
return RestStatus::DONE;
}

bool removeFromCluster;
std::string const& remove = _request->value("remove_from_cluster", removeFromCluster);
removeFromCluster = removeFromCluster && remove == "1";
Expand All @@ -91,6 +108,16 @@ RestStatus RestShutdownHandler::execute() {
clusterFeature.setUnregisterOnShutdown(true);
}

bool soft = _request->parsedValue("soft", false);

if (ServerState::instance()->isCoordinator() && soft) {
softShutdownTracker.initiateSoftShutdown();
VPackBuilder result;
result.add(VPackValue("OK"));
generateResult(rest::ResponseCode::OK, result.slice());
return RestStatus::DONE;
}

auto self = shared_from_this();
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
// don't block the response for workers waiting on this callback
Expand Down
Loading
0