8000 [3.5] Check scheduler queue return value by dhly-etc · Pull Request #9759 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[3.5] Check scheduler queue return value #9759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub&rd 8000 quo;, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
v3.5.1 (XXXX-XX-XX)
-------------------

* Consistently honor the return value of all attempts to queue tasks in the
internal scheduler.

Previously some call sites did not check the return value of internal queueing
operations, and if the scheduler queue was full, operations that were thought
to be requeued were silently dropped. Now, there will be reactions on such
failures. Requeuing an important task with a time offset (Scheduler::queueDelay)
is now also retried on failure (queue full) up to at most five minutes. If after
five minutes such a task still cannot be queued, a fatal error will be logged
and the server process will be aborted.

* Made index selection much more deterministic in case there are
multiple competing indexes.

Expand Down
3 changes: 1 addition & 2 deletions arangod/Aql/SharedQueryState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@ bool SharedQueryState::executeContinueCallback() const {
}
// do NOT use scheduler->post(), can have high latency that
// then backs up libcurl callbacks to other objects
scheduler->queue(RequestLane::CLIENT_AQL, _continueCallback);
return true;
return scheduler->queue(RequestLane::CLIENT_AQL, _continueCallback);
}
7 changes: 5 additions & 2 deletions arangod/Cache/CacheManagerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ void CacheManagerFeature::start() {

auto scheduler = SchedulerFeature::SCHEDULER;
auto postFn = [scheduler](std::function<void()> fn) -> bool {
scheduler->queue(RequestLane::INTERNAL_LOW, fn);
return true;
try {
return scheduler->queue(RequestLane::INTERNAL_LOW, fn);
} catch (...) {
return false;
}
};
_manager.reset(new Manager(postFn, _cacheSize));
MANAGER = _manager.get();
Expand Down
43 changes: 38 additions & 5 deletions arangod/Pregel/Conductor.cpp
8000 9E88
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////

#include <chrono>
#include <thread>

#include "Conductor.h"

#include "Pregel/Aggregator.h"
Expand All @@ -30,6 +33,7 @@
#include "Pregel/Recovery.h"
#include "Pregel/Utils.h"

#include "Basics/FunctionUtils.h"
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
Expand Down Expand Up @@ -305,7 +309,7 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
// don't block the response for workers waiting on this callback
// this should allow workers to go into the IDLE state
scheduler->queue(RequestLane::INTERNAL_LOW, [this] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this] {
MUTEX_LOCKER(guard, _callbackMutex);

if (_state == ExecutionState::RUNNING) {
Expand All @@ -319,6 +323,11 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
<< "No further action taken after receiving all responses";
}
});
if (!queued) {
LOG_TOPIC("038db", ERR, Logger::PREGEL)
<< "No thread available to queue response, canceling execution";
cancel();
}
return VPackBuilder();
}

Expand Down Expand Up @@ -389,7 +398,18 @@ void Conductor::cancel() {
void Conductor::cancelNoLock() {
_callbackMutex.assertLockedByCurrentThread();
_state = ExecutionState::CANCELED;
_finalizeWorkers();
bool ok;
int res;
std::tie(ok, res) = basics::function_utils::retryUntilTimeout<int>(
[this]() -> std::pair<bool, int> {
int res = _finalizeWorkers();
return std::make_pair(res != TRI_ERROR_QUEUE_FULL, res);
},
Logger::PREGEL, "cancel worker execution");
if (!ok) {
LOG_TOPIC("f8b3c", ERR, Logger::PREGEL)
<< "Failed to cancel worker execution for five minutes, giving up.";
}
_workHandle.reset();
}

Expand All @@ -412,7 +432,8 @@ void Conductor::startRecovery() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);

// let's wait for a final state in the cluster
_workHandle = SchedulerFeature::SCHEDULER->queueDelay(
bool queued = false;
std::tie(queued, _workHandle) = SchedulerFeature::SCHEDULER->queueDelay(
RequestLane::CLUSTER_AQL, std::chrono::seconds(2), [this](bool cancelled) {
if (cancelled || _state != ExecutionState::RECOVERING) {
return; // seems like we are canceled
Expand Down Expand Up @@ -460,6 +481,10 @@ void Conductor::startRecovery() {
LOG_TOPIC("fefc6", ERR, Logger::PREGEL) << "Compensation failed";
}
});
if (!queued) {
LOG_TOPIC("92a8d", ERR, Logger::PREGEL)
<< "No thread available to queue recovery, may be in dirty state.";
}
}

// resolves into an ordered list of shards for each collection on each server
Expand Down Expand Up @@ -691,12 +716,17 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) {
auto* scheduler = SchedulerFeature::SCHEDULER;
if (scheduler) {
uint64_t exe = _executionNumber;
scheduler->queue(RequestLane::CLUSTER_AQL, [exe] {
bool queued = scheduler->queue(RequestLane::CLUSTER_AQL, [exe] {
auto pf = PregelFeature::instance();
if (pf) {
pf->cleanupConductor(exe);
}
});
if (!queued) {
LOG_TOPIC("038da", ERR, Logger::PREGEL)
<< "No thread available to queue cleanup, canceling execution";
cancel();
}
}
}
}
Expand Down Expand Up @@ -766,7 +796,7 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const&
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
uint64_t exe = _executionNumber;
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->queue(RequestLane::INTERNAL_LOW, [path, message, exe] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [path, message, exe] {
auto pf = PregelFeature::instance();
if (!pf) {
return;
Expand All @@ -778,6 +808,9 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const&
PregelFeature::handleWorkerRequest(vocbase, path, message.slice(), response);
}
});
if (!queued) {
return TRI_ERROR_QUEUE_FULL;
}
}
return TRI_ERROR_NO_ERROR;
}
Expand Down
43 changes: 31 additions & 12 deletions arangod/Pregel/GraphStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,25 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
_runningThreads++;
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
TRI_ASSERT(scheduler);
scheduler->queue(RequestLane::INTERNAL_LOW,
[this, vertexShard, edges] {
TRI_DEFER(_runningThreads--); // exception safe
try {
_loadVertices(vertexShard, edges);
} catch (std::exception const& ex) {
LOG_TOPIC("c87c9", WARN, Logger::PREGEL) << "caught exception while "
<< "loading pregel graph: " << ex.what();
}
});
bool queued =
scheduler->queue(RequestLane::INTERNAL_LOW, [this, vertexShard, edges] {
TRI_DEFER(_runningThreads--); // exception safe
try {
_loadVertices(vertexShard, edges);
} catch (std::exception const& ex) {
LOG_TOPIC("c87c9", WARN, Logger::PREGEL)
<< "caught exception while "
<< "loading pregel graph: " << ex.what();
}
});
if (!queued) {
LOG_TOPIC("38da2", WARN, Logger::PREGEL)
<< "No thread available to queue vertex loading";
}
} catch (basics::Exception const& ex) {
LOG_TOPIC("3f283", WARN, Logger::PREGEL)
<< "unhandled exception while "
<< "loading pregel graph: " << ex.what();
} catch (...) {
LOG_TOPIC("3f282", WARN, Logger::PREGEL) << "unhandled exception while "
<< "loading pregel graph";
Expand All @@ -151,7 +160,12 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
}

Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->queue(RequestLane::INTERNAL_LOW, cb);
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, cb);
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL,
"No thread available to queue callback, "
"canceling execution");
}
}

template <typename V, typename E>
Expand Down Expand Up @@ -563,7 +577,7 @@ void GraphStore<V, E>::storeResults(WorkerConfig* config,
numT << " threads";

for (size_t i = 0; i < numT; i++) {
SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [=]{
bool queued = SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [=] {
size_t startI = i * (numSegments / numT);
size_t endI = (i + 1) * (numSegments / numT);
TRI_ASSERT(endI <= numSegments);
Expand All @@ -584,6 +598,11 @@ void GraphStore<V, E>::storeResults(WorkerConfig* config,
cb();
}
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL,
"No thread available to queue vertex "
"storage, canceling execution");
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion arangod/Pregel/PregelFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,14 @@ void PregelFeature::cleanupWorker(uint64_t executionNumber) {
// unmapping etc might need a few seconds
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->queue(RequestLane::INTERNAL_LOW, [this, executionNumber, instance] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this, executionNumber, instance] {
MUTEX_LOCKER(guard, _mutex);
_workers.erase(executionNumber);
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL,
"No thread available to queue cleanup.");
}
}

void PregelFeature::cleanupAll() {
Expand Down
9 changes: 7 additions & 2 deletions arangod/Pregel/Recovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ void RecoveryManager::updatedFailedServers(std::vector<ServerID> const& failed)

TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
scheduler->queue(RequestLane::INTERNAL_LOW,
[this, shard] { _renewPrimaryServer(shard); });
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this, shard] {
_renewPrimaryServer(shard);
});
if (!queued) {
LOG_TOPIC("038de", ERR, Logger::PREGEL)
<< "No thread available to queue pregel recovery manager request";
}
}
}
}
Expand Down
31 changes: 26 additions & 5 deletions arangod/Pregel/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ void Worker<V, E, M>::setupWorker() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
auto self = shared_from_this();
scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, cb] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, cb] {
_graphStore->loadShards(&_config, cb);
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL,
"No available thread to load shards");
}
}
}

Expand Down Expand Up @@ -329,7 +333,7 @@ void Worker<V, E, M>::_startProcessing() {

auto self = shared_from_this();
for (size_t i = 0; i < numT; i++) {
scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, i, numT, numSegments] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, i, numT, numSegments] {
if (_state != WorkerState::COMPUTING) {
LOG_TOPIC("f0e3d", WARN, Logger::PREGEL) << "Execution aborted prematurely.";
return;
Expand All @@ -344,6 +348,10 @@ void Worker<V, E, M>::_startProcessing() {
_finishedProcessing(); // last thread turns the lights out
}
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL,
"No thread available to start processing");
}
}

// TRI_ASSERT(_runningThreads == i);
Expand Down Expand Up @@ -565,7 +573,8 @@ void Worker<V, E, M>::_continueAsync() {
// wait for new messages before beginning to process
int64_t milli = _writeCache->containedMessageCount() & FA3B lt; _messageBatchSize ? 50 : 5;
// start next iteration in $milli mseconds.
_workHandle = SchedulerFeature::SCHEDULER->queueDelay(
bool queued = false;
std::tie(queued, _workHandle) = SchedulerFeature::SCHEDULER->queueDelay(
RequestLane::INTERNAL_LOW, std::chrono::milliseconds(milli), [this](bool cancelled) {
if (!cancelled) {
{ // swap these pointers atomically
Expand All @@ -583,6 +592,10 @@ void Worker<V, E, M>::_continueAsync() {
_startProcessing();
}
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUEUE_FULL, "No thread available to continue execution.");
}
}

template <typename V, typename E, typename M>
Expand Down Expand Up @@ -703,7 +716,7 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
auto self = shared_from_this();
scheduler->queue(RequestLane::INTERNAL_LOW, [self, this] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this] {
if (_state != WorkerState::RECOVERING) {
LOG_TOPIC("554e2", WARN, Logger::PREGEL) << "Compensation aborted prematurely.";
return;
Expand Down Expand Up @@ -740,6 +753,10 @@ void Worker<V, E, M>::compensateStep(VPackSlice const& data) {
package.close();
_callConductor(Utils::finishedRecoveryPath, package);
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_QUEUE_FULL, "No thread available to queue compensation.");
}
}

template <typename V, typename E, typename M>
Expand Down Expand Up @@ -768,10 +785,14 @@ void Worker<V, E, M>::_callConductor(std::string const& path, VPackBuilder const
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
auto self = shared_from_this();
scheduler->queue(RequestLane::INTERNAL_LOW, [self, path, message] {
bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, path, message] {
VPackBuilder response;
PregelFeature::handleConductorRequest(path, message.slice(), response);
});
if (!queued) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL,
"No thread available to call conductor");
}
} else {
std::shared_ptr<ClusterComm> cc = ClusterComm::instance();
std::string baseUrl = Utils::baseUrl(_config.database(), Utils::conductorPrefix);
Expand Down
Loading
0