8000 Soft coordinator shutdown (#14264) · arangodb/arangodb@6f73a6a · GitHub
[go: up one dir, main page]

Skip to content

Commit 6f73a6a

Browse files
neunhoefjsteemann
andauthored
Soft coordinator shutdown (#14264)
* Initial working version for AQL cursors. * Prevent multiple soft shutdowns. * Finish integration tests for soft shutdown. * Report progress on soft shutdown. * Add tests for soft shutdown progress. * Add test for AQL cursors being blocked by a soft shutdown. * Add transactions to the mix. * Add async job tracking. * Add Pregel tracking. * Add tracking of low prio queue and ongoing. * Apply suggestions from code review Co-authored-by: Jan <jsteemann@users.noreply.github.com>
1 parent 60ddc36 commit 6f73a6a

31 files changed

+1180
-30
lines changed

CHANGELOG

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
devel
22
-----
33

4+
* Add soft coordinator shutdown: This is a new option `soft=true` for the
5+
DELETE /_admin/shutdown API. Has only meaning for coordinators, otherwise
6+
ignored. A number of things are allowed to finish but no new things are
7+
allowed when in soft coordinator shutdown:
8+
- AQL cursors
9+
- transactions
10+
- asynchronous operations
11+
- Pregel runs
12+
Once all of the ongoing operations of these have finished and all requests
13+
on the low priority queue have been executed, the coordinator shuts down
14+
the normal way. This is supposed to make a coordinator restart less
15+
intrusive for clients.
16+
417
* Fix BTS-398: Cannot force index hint for primary index if FILTER has multiple
518
OR conditions that require different indexes.
619

arangod/Aql/QueryOptions.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,6 @@ void QueryOptions::fromVelocyPack(VPackSlice slice) {
159159
traversalProfile = static_cast<TraversalProfileLevel>(value.getNumber<uint16_t>());
160160
}
161161

162-
value = slice.get("stream");
163-
if (value.isBool()) {
164-
stream = value.getBool();
165-
}
166-
167162
value = slice.get("allPlans");
168163
if (value.isBool()) {
169164
allPlans = value.getBool();

arangod/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,7 @@ set(LIB_ARANGOSERVER_SOURCES
750750
RestServer/ScriptFeature.cpp
751751
RestServer/ServerFeature.cpp
752752
RestServer/ServerIdFeature.cpp
753+
RestServer/SoftShutdownFeature.cpp
753754
RestServer/SystemDatabaseFeature.cpp
754755
RestServer/TtlFeature.cpp
755756
RestServer/UpgradeFeature.cpp

arangod/FeaturePhases/FinalFeaturePhase.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "FeaturePhases/AgencyFeaturePhase.h"
2828
#include "RestServer/ConsoleFeature.h"
2929
#include "RestServer/ScriptFeature.h"
30+
#include "RestServer/SoftShutdownFeature.h"
3031

3132
namespace arangodb {
3233
namespace application_features {
@@ -39,6 +40,7 @@ FinalFeaturePhase::FinalFeaturePhase(ApplicationServer& server)
3940
startsAfter<ConsoleFeature>();
4041
startsAfter<ScriptFeature>();
4142
startsAfter<ShutdownFeature>();
43+
startsAfter<SoftShutdownFeature>();
4244
}
4345

4446
} // namespace application_features

arangod/GeneralServer/AsyncJobManager.cpp

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
#include "Basics/WriteLocker.h"
2828
#include "Basics/system-functions.h"
2929
#include "Basics/voc-errors.h"
30+
#include "Cluster/ServerState.h"
3031
#include "GeneralServer/RestHandler.h"
3132
#include "Logger/Logger.h"
3233
#include "Rest/GeneralResponse.h"
34+
#include "RestServer/SoftShutdownFeature.h"
3335
#include "Utils/ExecContext.h"
3436

3537
namespace {
@@ -60,7 +62,8 @@ AsyncJobResult::AsyncJobResult(IdType jobId, Status status,
6062

6163
AsyncJobResult::~AsyncJobResult() = default;
6264

63-
AsyncJobManager::AsyncJobManager() : _lock(), _jobs() {}
65+
AsyncJobManager::AsyncJobManager()
66+
: _lock(), _jobs(), _softShutdownOngoing(false) {}
6467

6568
AsyncJobManager::~AsyncJobManager() {
6669
// remove all results that haven't been fetched
@@ -257,8 +260,29 @@ std::vector<AsyncJobResult::IdType> AsyncJobManager::byStatus(AsyncJobResult::St
257260
return jobs;
258261
}
259262

263+
//////////////////////////////////////////////////////////////////////////////
264+
/// @brief get numbers of pending and done jobs.
265+
//////////////////////////////////////////////////////////////////////////////
266+
//
267+
std::pair<uint64_t, uint64_t> AsyncJobManager::getNrPendingAndDone() {
268+
uint64_t pending{0};
269+
uint64_t done{0};
270+
{
271+
READ_LOCKER(readLocker, _lock);
272+
for (auto const& j : _jobs) {
273+
if (j.second.second._status == AsyncJobResult::JOB_PENDING) {
274+
++pending;
275+
} else if (j.second.second._status == AsyncJobResult::JOB_DONE) {
276+
++done;
277+
}
278+
}
279+
}
280+
return std::pair(pending, done);
281+
}
282+
260283
////////////////////////////////////////////////////////////////////////////////
261-
/// @brief initializes an async job
284+
/// @brief initializes an async job, throws if soft shutdown is already
285+
/// ongoing.
262286
////////////////////////////////////////////////////////////////////////////////
263287

264288
void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {
@@ -270,6 +294,11 @@ void AsyncJobManager::initAsyncJob(std::shared_ptr<RestHandler> handler) {
270294

271295
WRITE_LOCKER(writeLocker, _lock);
272296

297+
if (_softShutdownOngoing.load(std::memory_order_relaxed)) {
298+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_SHUTTING_DOWN,
299+
"Soft shutdown ongoing.");
300+
}
301+
273302
_jobs.try_emplace(jobId, std::move(user), std::move(ajr));
274303
}
275304

arangod/GeneralServer/AsyncJobManager.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,23 @@ class AsyncJobManager {
9494
std::vector<AsyncJobResult::IdType> byStatus(AsyncJobResult::Status, size_t maxCount);
9595
void initAsyncJob(std::shared_ptr<RestHandler>);
9696
void finishAsyncJob(RestHandler*);
97+
std::pair<uint64_t, uint64_t> getNrPendingAndDone();
98+
99+
void initiateSoftShutdown() {
100+
_softShutdownOngoing.store(true, std::memory_order_relaxed);
101+
}
97102

98103
private:
99104
basics::ReadWriteLock _lock;
100105
JobList _jobs;
106+
107+
////////////////////////////////////////////////////////////////////////////
108+
/// @brief flag, if a soft shutdown is ongoing, this is used for the soft
109+
/// shutdown feature in coordinators, it is initially `false` and is set
110+
/// to true once a soft shutdown has begun.
111+
////////////////////////////////////////////////////////////////////////////
112+
113+
std::atomic<bool> _softShutdownOngoing;
101114
};
102115
} // namespace rest
103116
} // namespace arangodb

arangod/GeneralServer/CommTask.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,16 @@ bool CommTask::handleRequestAsync(std::shared_ptr<RestHandler> handler,
528528

529529
if (jobId != nullptr) {
530530
auto& jobManager = _server.server().getFeature<GeneralServerFeature>().jobManager();
531-
jobManager.initAsyncJob(handler);
531+
try {
532+
// This will throw if a soft shutdown is already going on on a
533+
// coordinator. But this can also throw if we have an
534+
// out of memory situation, so we better handle this anyway.
535+
jobManager.initAsyncJob(handler);
536+
} catch(std::exception const& exc) {
537+
LOG_TOPIC("fee34", INFO, Logger::STARTUP)
538+
<< "Async job rejected, exception: " << exc.what();
539+
return false;
540+
}
532541
*jobId = handler->handlerId();
533542

534543
// callback will persist the response with the AsyncJobManager

arangod/GeneralServer/GeneralServerFeature.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ void GeneralServerFeature::start() {
261261
}
262262
}
263263

264+
void GeneralServerFeature::initiateSoftShutdown() {
265+
if (_jobManager != nullptr) {
266+
_jobManager->initiateSoftShutdown();
267+
}
268+
}
269+
264270
void GeneralServerFeature::beginShutdown() {
265271
for (auto& server : _servers) {
266272
server->stopListening();

arangod/GeneralServer/GeneralServerFeature.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class GeneralServerFeature final : public application_features::ApplicationFeatu
3939
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
4040
void prepare() override final;
4141
void start() override final;
42+
void initiateSoftShutdown() override final;
4243
void beginShutdown() override final;
4344
void stop() override final;
4445
void unprepare() override final;

arangod/Pregel/PregelFeature.cpp

Lines changed: 18 additions & 3 deletions
CDB3
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ std::pair<Result, uint64_t> PregelFeature::startExecution(
8181
std::vector<std::string> const& edgeCollections,
8282
std::unordered_map<std::string, std::vector<std::string>> const& edgeCollectionRestrictions,
8383
VPackSlice const& params) {
84-
if (isStopping()) {
84+
if (isStopping() || _softShutdownOngoing.load(std::memory_order_relaxed)) {
8585
return std::make_pair(Result{TRI_ERROR_SHUTTING_DOWN,
8686
"pregel system not available"}, 0);
8787
}
@@ -218,7 +218,8 @@ uint64_t PregelFeature::createExecutionNumber() {
218218
}
219219

220220
PregelFeature::PregelFeature(application_features::ApplicationServer& server)
221-
: application_features::ApplicationFeature(server, "Pregel") {
221+
: application_features::ApplicationFeature(server, "Pregel"),
222+
_softShutdownOngoing(false) {
222223
setOptional(true);
223224
startsAfter<application_features::V8FeaturePhase>();
224225
}
@@ -284,7 +285,7 @@ bool PregelFeature::isStopping() const noexcept {
284285
}
285286

286287
void PregelFeature::addConductor(std::shared_ptr<Conductor>&& c, uint64_t executionNumber) {
287-
if (isStopping()) {
288+
if (isStopping() || _softShutdownOngoing.load(std::memory_order_relaxed)) {
288289
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
289290
}
290291

@@ -442,3 +443,17 @@ void PregelFeature::handleWorkerRequest(TRI_vocbase_t& vocbase,
442443
w->aqlResult(outBuilder, withId);
443444
}
444445
}
446+
447+
uint64_t PregelFeature::numberOfActiveConductors() const {
448+
MUTEX_LOCKER(guard, _mutex);
449+
uint64_t nr{0};
450+
for (auto const& p : _conductors) {
451+
std::shared_ptr<Conductor> const& c = p.second.second;
452+
if (c->_state == ExecutionState::DEFAULT ||
453+
c->_state == ExecutionState::RUNNING ||
454+
c->_state == ExecutionState::STORING) {
455+
++nr;
456+
}
457+
}
458+
return nr;
459+
}

0 commit comments

Comments
 (0)
0