8000 Soft coordinator shutdown by neunhoef · Pull Request #14331 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Soft coordinator shutdown #14331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 10, 2021
Next Next commit
Backport soft coordinator shutdown to 3.7.
  • Loading branch information
neunhoef committed Jun 4, 2021
commit c5ee53c3da032c96a9eca24eeca19976f348a311
13 changes: 13 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
v3.7.12 (XXXX-XX-XX)
--------------------

* Add soft coordinator shutdown: This is a new option `soft=true` for the
DELETE /_admin/shutdown API. Has only meaning for coordinators, otherwise
ignored. A number of things are allowed to finish but no new things are
allowed when in soft coordinator shutdown:
- AQL cursors
- transactions
- asynchronous operations
- Pregel runs
Once all of the ongoing operations of these have finished and all requests
on the low priority queue have been executed, the coordinator shuts down
the normal way. This is supposed to make a coordinator restart less
intrusive for clients.

* Fixed ES-881: ensure that LDAP options for async, referrals and restart set
the off value correctly. Otherwise, this can result in an "operations error".

Expand Down
5 changes: 0 additions & 5 deletions arangod/Aql/QueryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ void QueryOptions::fromVelocyPack(VPackSlice const& slice) {
profile = static_cast<ProfileLevel>(value.getNumber<uint32_t>());
}

value = slice.get("stream");
if (value.isBool()) {
stream = value.getBool();
}

value = slice.get("allPlans");
if (value.isBool()) {
allPlans = value.getBool();
Expand Down
1 change: 1 addition & 0 deletions arangod/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ set(LIB_ARANGOSERVER_SOURCES
RestServer/ScriptFeature.cpp
RestServer/ServerFeature.cpp
RestServer/ServerIdFeature.cpp
RestServer/SoftShutdownFeature.cpp
RestServer/SystemDatabaseFeature.cpp
RestServer/TtlFeature.cpp
RestServer/UpgradeFeature.cpp
Expand Down
2 changes: 2 additions & 0 deletions arangod/FeaturePhases/FinalFeaturePhase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "FeaturePhases/AgencyFeaturePhase.h"
#include "RestServer/ConsoleFeature.h"
#include "RestServer/ScriptFeature.h"
#include "RestServer/SoftShutdownFeature.h"

namespace arangodb {
namespace application_features {
Expand All @@ -38,6 +39,7 @@ FinalFeaturePhase::FinalFeaturePhase(ApplicationServer& server)
startsAfter<ConsoleFeature>();
startsAfter<ScriptFeature>();
startsAfter<ShutdownFeature>();
startsAfter<SoftShutdownFeature>();
}

} // namespace application_features
Expand Down
32 changes: 30 additions & 2 deletions arangod/GeneralServer/AsyncJobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
#include "Basics/WriteLocker.h"
#include "Basics/system-functions.h"
#include "Basics/voc-errors.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/GeneralResponse.h"
#include "RestServer/SoftShutdownFeature.h"
#include "Utils/ExecContext.h"

namespace {
Expand Down Expand Up @@ -60,7 +62,8 @@ AsyncJobResult::AsyncJobResult(IdType jobId, Status status,

AsyncJobResult::~AsyncJobResult() = default;

AsyncJobManager::AsyncJobManager() : _lock(), _jobs() {}
AsyncJobManager::AsyncJobManager()
: _lock(), _jobs(), _softShutdownOngoing(false) {}

AsyncJobManager::~AsyncJobManager() {
// remove all results that haven't been fetched
Expand Down Expand Up @@ -257,11 +260,36 @@ std::vector<AsyncJobResult::IdType> AsyncJobManager::byStatus(AsyncJobResult::St
return jobs;
}

//////////////////////////////////////////////////////////////////////////////
/// @brief get numbers of pending and done jobs.
//////////////////////////////////////////////////////////////////////////////
//
std::pair<uint64_t, uint64_t> AsyncJobManager::getNrPendingAndDone() {
uint64_t pending{0};
uint64_t done{0};
{
READ_LOCKER(readLocker, _lock);
for (auto const& j : _jobs) {
if (j.second.second._status == AsyncJobResult::JOB_PENDING) {
++pending;
} else if (j.second.second._status == AsyncJobResult::JOB_DONE) {
++done;
}
}
}
return std::pair(pending, done);
}

////////////////////////////////////////////////////////////////////////////////
/// @brief initializes an async job
/// @brief initializes an async job, throws if soft shutdown is already
/// ongoing.
////////////////////////////////////////////////////////////////////////////////

void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {
if (_softShutdownOngoing.load(std::memory_order_relaxed)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN,
"Soft shutdown ongoing.");
}
handler->assignHandlerId();
AsyncJobResult::IdType jobId = handler->handlerId();

Expand Down
13 changes: 13 additions & 0 deletions arangod/GeneralServer/AsyncJobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,23 @@ class AsyncJobManager {
std::vector<AsyncJobResult::IdType> byStatus(AsyncJobResult::Status, size_t maxCount);
void initAsyncJob(std::shared_ptr<RestHandler>);
void finishAsyncJob(RestHandler*);
std::pair<uint64_t, uint64_t> getNrPendingAndDone();

void initiateSoftShutdown() {
_softShutdownOngoing.store(true, std::memory_order_relaxed);
}

private:
basics::ReadWriteLock _lock;
JobList _jobs;

////////////////////////////////////////////////////////////////////////////
/// @brief flag, if a soft shutdown is ongoing, this is used for the soft
/// shutdown feature in coordinators, it is initially `false` and is set
/// to true once a soft shutdown has begun.
////////////////////////////////////////////////////////////////////////////

std::atomic<bool> _softShutdownOngoing;
};
} // namespace rest
} // namespace arangodb
Expand Down
16 changes: 16 additions & 0 deletions arangod/GeneralServer/CommTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,22 @@ bool CommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,

if (jobId != nullptr) {
GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler);
try {
// This will throw if a soft shutdown is already going on on a
// coordinator. But this can also throw if we have an
// out of memory situation, so we better handle this anyway.
GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler);
} catch(arangodb::basics::Exception const& exc) {
LOG_TOPIC("fee33", INFO, Logger::STARTUP)
<< "Async job rejected due to soft shutdown, exception:"
<< exc.what();
return false;
} catch(std::exception const& exc) {
LOG_TOPIC("fee34", INFO, Logger::STARTUP)
<< "Async job rejected due to out of memory, exception:"
<< exc.what();
return false;
}
*jobId = handler->handlerId();

// callback will persist the response with the AsyncJobManager
Expand Down
20 changes: 19 additions & 1 deletion arangod/Pregel/PregelFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ uint64_t PregelFeature::createExecutionNumber() {
}

PregelFeature::PregelFeature(application_features::ApplicationServer& server)
: application_features::ApplicationFeature(server, "Pregel") {
: application_features::ApplicationFeature(server, "Pregel"),
_softShutdownOngoing(false) {
setOptional(true);
startsAfter<application_features::V8FeaturePhase>();
}
Expand Down Expand Up @@ -267,6 +268,10 @@ void PregelFeature::unprepare() {
}

void PregelFeature::addConductor(std::shared_ptr<Conductor>&& c, uint64_t executionNumber) {
if (_softShutdownOngoing.load(std::memory_order_relaxed)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}

std::string user = ExecContext::current().user();

MUTEX_LOCKER(guard, _mutex);
Expand Down Expand Up @@ -447,3 +452,16 @@ void PregelFeature::cleanupAll() {
w->aqlResult(outBuilder, withId);
}
}

uint64_t PregelFeature::numberOfActiveConductors() const {
MUTEX_LOCKER(guard, _mutex);
uint64_t nr{0};
for (auto const& p : _conductors) {
std::shared_ptr<Conductor> c = p.second.second;
if (c->_state == ExecutionState::RUNNING ||
c->_state == ExecutionState::STORING) {
++nr;
}
}
return nr;
}
10 changes: 9 additions & 1 deletion arangod/Pregel/PregelFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,19 @@ class PregelFeature final : public application_features::ApplicationFeature {
static void handleWorkerRequest(TRI_vocbase_t& vocbase, std::string const& path,
VPackSlice const& body, VPackBuilder& outBuilder);

uint64_t numberOfActiveConductors() const;

void initiateSoftShutdown() {
_softShutdownOngoing.store(true, std::memory_order_relaxed);
}

private:
Mutex _mutex;
mutable Mutex _mutex;
std::unique_ptr<RecoveryManager> _recoveryManager;
std::unordered_map<uint64_t, std::pair<std::string, std::shared_ptr<Conductor>>> _conductors;
std::unordered_map<uint64_t, std::pair<std::string, std::shared_ptr<IWorker>>> _workers;

std::atomic<bool> _softShutdownOngoing;
};

} // namespace pregel
Expand Down
10 changes: 10 additions & 0 deletions arangod/RestHandler/RestCursorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
CursorRepository* cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
_cursor = cursors->createQueryStream(std::move(query), batchSize, ttl);
if (_cursor == nullptr) { // soft shutdown, no longer allowed
generateError(Result(TRI_ERROR_SHUTTING_DOWN,
"coordinator soft shutdown, new cursors blocked"));
return RestStatus::DONE;
}
_cursor->setWakeupHandler([self = shared_from_this()]() { return self->wakeupHandler(); });

return generateCursorResult(rest::ResponseCode::CREATED);
Expand Down Expand Up @@ -412,6 +417,11 @@ RestStatus RestCursorHandler::handleQueryResult() {
TRI_ASSERT(_queryResult.data.get() != nullptr);
// steal the query result, cursor will take over the ownership
_cursor = cursors->createFromQueryResult(std::move(_queryResult), batchSize, ttl, count);
if (_cursor == nullptr) {
generateError(Result(TRI_ERROR_SHUTTING_DOWN,
"coordinator soft shutdown, new cursors blocked"));
return RestStatus::DONE;
}

return generateCursorResult(rest::ResponseCode::CREATED);
}
Expand Down
29 changes: 28 additions & 1 deletion arangod/RestHandler/RestShutdownHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "Agency/AsyncAgencyComm.h"
#include "Cluster/ClusterFeature.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "RestServer/SoftShutdownFeature.h"
#include "Scheduler/Scheduler.h"
#include "Scheduler/SchedulerFeature.h"
#include "Utils/ExecContext.h"
Expand All @@ -47,7 +48,8 @@ RestShutdownHandler::RestShutdownHandler(application_features::ApplicationServer
////////////////////////////////////////////////////////////////////////////////

RestStatus RestShutdownHandler::execute() {
if (_request->requestType() != rest::RequestType::DELETE_REQ) {
if (_request->requestType() != rest::RequestType::DELETE_REQ &&
_request->requestType() != rest::RequestType::GET) {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, 405);
return RestStatus::DONE;
}
Expand All @@ -68,6 +70,16 @@ RestStatus RestShutdownHandler::execute() {
}
}

auto const& softShutdownFeature{server().getFeature<SoftShutdownFeature>()};
auto& softShutdownTracker{softShutdownFeature.softShutdownTracker()};

if (_request->requestType() == rest::RequestType::GET) {
VPackBuilder builder;
softShutdownTracker.toVelocyPack(builder, softShutdownTracker.getStatus());
generateResult(rest::ResponseCode::OK, builder.slice());
return RestStatus::DONE;
}

bool removeFromCluster;
std::string const& remove = _request->value("remove_from_cluster", removeFromCluster);
removeFromCluster = removeFromCluster && remove == "1";
Expand All @@ -91,6 +103,21 @@ RestStatus RestShutdownHandler::execute() {
clusterFeature.setUnregisterOnShutdown(true);
}

bool soft = false;
bool softFound = false;
std::string const& softString = _request->value("soft", softFound);
if (softFound && softString.compare("true") == 0) {
soft = true;
}

if (ServerState::instance()->isCoordinator() && soft) {
softShutdownTracker.initiateSoftShutdown();
VPackBuilder result;
result.add(VPackValue("OK"));
generateResult(rest::ResponseCode::OK, result.slice());
return RestStatus::DONE;
}

auto self = shared_from_this();
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
// don't block the response for workers waiting on this callback
Expand Down
Loading
0