8000 Soft coordinator shutdown by neunhoef · Pull Request #14331 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Soft coordinator shutdown #14331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for Gi 8000 tHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 10, 2021
13 changes: 13 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
v3.7.12 (XXXX-XX-XX)
--------------------

* Add soft coordinator shutdown: This is a new option `soft=true` for the DELETE
/_admin/shutdown API. Has only meaning for coordinators, otherwise ignored. A
number of things are allowed to finish but no new things are allowed when in
soft coordinator shutdown:
- AQL cursors
- transactions
- asynchronous operations
- Pregel runs
Once all of the ongoing operations of these have finished and all requests on
the low priority queue have been executed, the coordinator shuts down the
normal way. This is supposed to make a coordinator restart less intrusive for
clients.

* Updated arangosync to 1.5.0.

* Fixed ES-881: ensure that LDAP options for async, referrals and restart set
Expand Down
5 changes: 0 additions & 5 deletions arangod/Aql/QueryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ void QueryOptions::fromVelocyPack(VPackSlice const& slice) {
profile = static_cast<ProfileLevel>(value.getNumber<uint32_t>());
}

value = slice.get("stream");
if (value.isBool()) {
stream = value.getBool();
}

value = slice.get("allPlans");
if (value.isBool()) {
allPlans = value.getBool();
Expand Down
1 change: 1 addition & 0 deletions arangod/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ set(LIB_ARANGOSERVER_SOURCES
RestServer/ScriptFeature.cpp
RestServer/ServerFeature.cpp
RestServer/ServerIdFeature.cpp
RestServer/SoftShutdownFeature.cpp
RestServer/SystemDatabaseFeature.cpp
RestServer/TtlFeature.cpp
RestServer/UpgradeFeature.cpp
Expand Down
2 changes: 2 additions & 0 deletions arangod/FeaturePhases/FinalFeaturePhase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "FeaturePhases/AgencyFeaturePhase.h"
#include "RestServer/ConsoleFeature.h"
#include "RestServer/ScriptFeature.h"
#include "RestServer/SoftShutdownFeature.h"

namespace arangodb {
namespace application_features {
Expand All @@ -38,6 +39,7 @@ FinalFeaturePhase::FinalFeaturePhase(ApplicationServer& server)
startsAfter<ConsoleFeature>();
startsAfter<ScriptFeature>();
startsAfter<ShutdownFeature>();
startsAfter<SoftShutdownFeature>();
}

} // namespace application_features
Expand Down
33 changes: 31 additions & 2 deletions arangod/GeneralServer/AsyncJobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
#include "Basics/WriteLocker.h"
#include "Basics/system-functions.h"
#include "Basics/voc-errors.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/RestHandler.h"
#include "Logger/Logger.h"
#include "Rest/GeneralResponse.h"
#include "RestServer/SoftShutdownFeature.h"
#include "Utils/ExecContext.h"

namespace {
Expand Down Expand Up @@ -60,7 +62,8 @@ AsyncJobResult::AsyncJobResult(IdType jobId, Status status,

AsyncJobResult::~AsyncJobResult() = default;

AsyncJobManager::AsyncJobManager() : _lock(), _jobs() {}
AsyncJobManager::AsyncJobManager()
: _lock(), _jobs(), _softShutdownOngoing(false) {}

AsyncJobManager::~AsyncJobManager() {
// remove all results that haven't been fetched
Expand Down Expand Up @@ -257,8 +260,29 @@ std::vector<AsyncJobResult::IdType> AsyncJobManager::byStatus(AsyncJobResult::St
return jobs;
}

//////////////////////////////////////////////////////////////////////////////
/// @brief get numbers of pending and done jobs.
//////////////////////////////////////////////////////////////////////////////
//
std::pair<uint64_t, uint64_t> AsyncJobManager::getNrPendingAndDone() {
uint64_t pending{0};
uint64_t done{0};
{
READ_LOCKER(readLocker, _lock);
for (auto const& j : _jobs) {
if (j.second.second._status == AsyncJobResult::JOB_PENDING) {
++pending;
} else if (j.second.second._status == AsyncJobResult::JOB_DONE) {
++done;
}
}
}
return std::pair(pending, done);
}

////////////////////////////////////////////////////////////////////////////////
/// @brief initializes an async job
/// @brief initializes an async job, throws if soft shutdown is already
/// ongoing.
////////////////////////////////////////////////////////////////////////////////

void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {
Expand All @@ -270,6 +294,11 @@ void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {

WRITE_LOCKER(writeLocker, _lock);

if (_softShutdownOngoing.load(std::memory_order_relaxed)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN,
"Soft shutdown ongoing.");
}

_jobs.try_emplace(jobId, std::move(user), std::move(ajr));
}

Expand Down
13 changes: 13 additions & 0 deletions arangod/GeneralServer/AsyncJobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,23 @@ class AsyncJobManager {
std::vector<AsyncJobResult::IdType> byStatus(AsyncJobResult::Status, size_t maxCount);
void initAsyncJob(std::shared_ptr<RestHandler>);
void finishAsyncJob(RestHandler*);
std::pair<uint64_t, uint64_t> getNrPendingAndDone();

void initiateSoftShutdown() {
_softShutdownOngoing.store(true, std::memory_order_relaxed);
}

private:
basics::ReadWriteLock _lock;
JobList _jobs;

////////////////////////////////////////////////////////////////////////////
/// @brief flag, if a soft shutdown is ongoing, this is used for the soft
/// shutdown feature in coordinators, it is initially `false` and is set
/// to true once a soft shutdown has begun.
////////////////////////////////////////////////////////////////////////////

std::atomic<bool> _softShutdownOngoing;
};
} // namespace rest
} // namespace arangodb
Expand Down
21 changes: 19 additions & 2 deletions arangod/GeneralServer/CommTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,10 @@ bool CommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
// and there is currently only a single client handled by the IoContext
auto cb = [self = shared_from_this(), handler = std::move(handler)]() mutable {
handler->statistics().SET_QUEUE_END();
handler->trackTaskStart();

handler->runHandler([self = std::move(self)](rest::RestHandler* handler) {
handler->trackTaskEnd();
try {
// Pass the response to the io context
self->sendResponse(handler->stealResponse(), handler->stealStatistics());
Expand Down Expand Up @@ -529,19 +532,33 @@ bool CommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
auto const lane = handler->getRequestLane();

if (jobId != nullptr) {
GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler);
try {
// This will throw if a soft shutdown is already going on on a
// coordinator. But this can also throw if we have an
// out of memory situation, so we better handle this anyway.
GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler);
} catch(std::exception const& exc) {
LOG_TOPIC("fee34", INFO, Logger::STARTUP)
<< "Async job rejected, exception: " << exc.what();
return false;
}
*jobId = handler->handlerId();

// callback will persist the response with the AsyncJobManager
return SchedulerFeature::SCHEDULER->queue(lane, [handler = std::move(handler)] {
handler->trackTaskStart();
handler->runHandler([](RestHandler* h) {
h->trackTaskEnd();
GeneralServerFeature::JOB_MANAGER->finishAsyncJob(h);
});
});
} else {
// here the response will just be ignored
return SchedulerFeature::SCHEDULER->queue(lane, [handler = std::move(handler)] {
handler->runHandler([](RestHandler*) {});
handler->trackTaskStart();
handler->runHandler([](RestHandler* h) {
h->trackTaskEnd();
});
});
}
}
Expand Down
6 changes: 6 additions & 0 deletions arangod/GeneralServer/GeneralServerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ void GeneralServerFeature::start() {
}
}

void GeneralServerFeature::initiateSoftShutdown() {
if (_jobManager != nullptr) {
_jobManager->initiateSoftShutdown();
}
}

void GeneralServerFeature::beginShutdown() {
for (auto& server : _servers) {
server->stopListening();
Expand Down
1 change: 1 addition & 0 deletions arangod/GeneralServer/GeneralServerFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class GeneralServerFeature final : public application_features::ApplicationFeatu
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void initiateSoftShutdown() override final;
void beginShutdown() override final;
void stop() override final;
void unprepare() override final;
Expand Down
16 changes: 16 additions & 0 deletions arangod/GeneralServer/RestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "Network/Utils.h"
#include "Rest/GeneralRequest.h"
#include "Rest/HttpResponse.h"
#include "Scheduler/SchedulerFeature.h"
#include "Statistics/RequestStatistics.h"
#include "Utils/ExecContext.h"
#include "VocBase/ticks.h"
Expand Down Expand Up @@ -93,6 +94,21 @@ uint64_t RestHandler::messageId() const {
return messageId;
}

void RestHandler::trackTaskStart() noexcept {
if (PriorityRequestLane(getRequestLane()) == RequestPriority::LOW) {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
SchedulerFeature::SCHEDULER->trackBeginOngoingLowPriorityTask();
}
}

void RestHandler::trackTaskEnd() noexcept {
if (PriorityRequestLane(getRequestLane()) == RequestPriority::LOW) {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
SchedulerFeature::SCHEDULER->trackEndOngoingLowPriorityTask();
}
}


RequestStatistics::Item&& RestHandler::stealStatistics() {
return std::move(_statistics);
}
Expand Down
7 changes: 7 additions & 0 deletions arangod/GeneralServer/RestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class RestHandler : public std::enable_shared_from_this<RestHandler> {
uint64_t handlerId() const { return _handlerId; }
uint64_t messageId() const;

/// @brief called when the handler execution is started
void trackTaskStart() noexcept;

/// @brief called when the handler execution is finalized
void trackTaskEnd() noexcept;


GeneralRequest const* request() const { return _request.get(); }
GeneralResponse* response() const { return _response.get(); }
std::unique_ptr<GeneralResponse> stealResponse() {
Expand Down
23 changes: 20 additions & 3 deletions arangod/Pregel/PregelFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

#include "PregelFeature.h"

#include <atomic>

#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/MutexLocker.h"
#include "Basics/NumberOfCores.h"
Expand Down Expand Up @@ -222,7 +220,8 @@ uint64_t PregelFeature::createExecutionNumber() {
}

PregelFeature::PregelFeature(application_features::ApplicationServer& server)
: application_features::ApplicationFeature(server, "Pregel") {
: application_features::ApplicationFeature(server, "Pregel"),
_softShutdownOngoing(false) {
setOptional(true);
startsAfter<application_features::V8FeaturePhase>();
}
Expand Down Expand Up @@ -267,6 +266,10 @@ void PregelFeature::unprepare() {
}

void PregelFeature::addConductor(std::shared_ptr<Conductor>&& c, uint64_t executionNumber) {
if (_softShutdownOngoing.load(std::memory_order_relaxed)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}

std::string user = ExecContext::current().user();

MUTEX_LOCKER(guard, _mutex);
Expand Down Expand Up @@ -447,3 +450,17 @@ void PregelFeature::cleanupAll() {
w->aqlResult(outBuilder, withId);
}
}

uint64_t PregelFeature::numberOfActiveConductors() const {
MUTEX_LOCKER(guard, _mutex);
uint64_t nr{0};
for (auto const& p : _conductors) {
std::shared_ptr<Conductor> const& c = p.second.second;
if (c->_state == ExecutionState::DEFAULT ||
c->_state == ExecutionState::RUNNING ||
c->_state == ExecutionState::STORING) {
++nr;
}
}
return nr;
}
11 changes: 10 additions & 1 deletion arangod/Pregel/PregelFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#define ARANGODB_PREGEL_FEATURE_H 1

#include <cstdint>
#include <atomic>

#include <velocypack/Builder.h>
#include <velocypack/Slice.h>
Expand Down Expand Up @@ -86,11 +87,19 @@ class PregelFeature final : public application_features::ApplicationFeature {
static void handleWorkerRequest(TRI_vocbase_t& vocbase, std::string const& path,
VPackSlice const& body, VPackBuilder& outBuilder);

uint64_t numberOfActiveConductors() const;

void initiateSoftShutdown() override final {
_softShutdownOngoing.store(true, std::memory_order_relaxed);
}

private:
Mutex _mutex;
mutable Mutex _mutex;
std::unique_ptr<RecoveryManager> _recoveryManager;
std::unordered_map<uint64_t, std::pair<std::string, std::shared_ptr<Conductor>>> _conductors;
std::unordered_map<uint64_t, std::pair<std::string, std::shared_ptr<IWorker>>> _workers;

std::atomic<bool> _softShutdownOngoing;
};

} // namespace pregel
Expand Down
2 changes: 2 additions & 0 deletions arangod/RestHandler/RestCursorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
CursorRepository* cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
_cursor = cursors->createQueryStream(std::move(query), batchSize, ttl);
// Throws if soft shutdown is ongoing!
_cursor->setWakeupHandler([self = shared_from_this()]() { return self->wakeupHandler(); });

return generateCursorResult(rest::ResponseCode::CREATED);
Expand Down Expand Up @@ -412,6 +413,7 @@ RestStatus RestCursorHandler::handleQueryResult() {
TRI_ASSERT(_queryResult.data.get() != nullptr);
// steal the query result, cursor will take over the ownership
_cursor = cursors->createFromQueryResult(std::move(_queryResult), batchSize, ttl, count);
// throws if a coordinator soft shutdown is ongoing

return generateCursorResult(rest::ResponseCode::CREATED);
}
Expand Down
Loading
0