8000 fix potential spurious wakeups in scheduler code by jsteemann · Pull Request #9770 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

fix potential spurious wakeups in scheduler code #9770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix potential spurious wakeups in scheduler code
  • Loading branch information
jsteemann committed Aug 20, 2019
commit 865e864b618acf5474177b06c83d1102c1999711
53 changes: 27 additions & 26 deletions arangod/Scheduler/SupervisedScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,22 +317,24 @@ void SupervisedScheduler::runWorker() {
id = _numWorkers++; // increase the number of workers here, to obtain the id
// copy shared_ptr with worker state
state = _workerStates.back();
// inform the supervisor that this thread is alive
_conditionSupervisor.notify_one();
}

state->_sleepTimeout_ms = 20 * (id + 1);
// cap the timeout to some boundary value
if (state->_sleepTimeout_ms >= 1000) {
state->_sleepTimeout_ms = 1000;
}
state->_sleepTimeout_ms = 20 * (id + 1);
// cap the timeout to some boundary value
if (state->_sleepTimeout_ms >= 1000) {
state->_sleepTimeout_ms = 1000;
}

if (id < 32U) {
// 512 >> 32 => undefined behavior
state->_queueRetryCount = (uint64_t(512) >> id) + 3;
} else {
// we want at least 3 retries
state->_queueRetryCount = 3;
if (id < 32U) {
// 512 >> 32 => undefined behavior
state->_queueRetryCount = (uint64_t(512) >> id) + 3;
} else {
// we want at least 3 retries
state->_queueRetryCount = 3;
}

// inform the supervisor that this thread is alive
state->_ready = true;
_conditionSupervisor.notify_one();
}

while (true) {
Expand Down Expand Up @@ -548,16 +550,21 @@ void SupervisedScheduler::startOneThread() {
#pragma warning(pop)
#endif

if (!_workerStates.back()->start()) {
auto& state = _workerStates.back();

if (!state->start()) {
// failed to start a worker
_workerStates.pop_back(); // pop_back deletes shared_ptr, which deletes thread
LOG_TOPIC("913b5", ERR, Logger::THREADS)
<< "could not start additional worker thread";

} else {
LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread";
_conditionSupervisor.wait(guard);
return;
}

// sync with runWorker()
_conditionSupervisor.wait(guard, [&state]() {
return state->_ready;
});
LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread";
}

void SupervisedScheduler::stopOneThread() {
Expand Down Expand Up @@ -590,18 +597,12 @@ void SupervisedScheduler::stopOneThread() {
}
}

SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler::WorkerState&& that) noexcept
: _queueRetryCount(that._queueRetryCount),
_sleepTimeout_ms(that._sleepTimeout_ms),
_stop(that._stop.load()),
_working(false),
_thread(std::move(that._thread)) {}

SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler)
: _queueRetryCount(100),
_sleepTimeout_ms(100),
_stop(false),
_working(false),
_ready(false),
_thread(new SupervisedSchedulerWorkerThread(scheduler)) {}

bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }
Expand Down
7 changes: 6 additions & 1 deletion arangod/Scheduler/SupervisedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,17 @@ class SupervisedScheduler final : public Scheduler {
uint64_t _queueRetryCount; // t1
uint64_t _sleepTimeout_ms; // t2
std::atomic<bool> _stop, _working;
// _ready = false means the Worker is not properly initialized
// _ready = true means it is initialized and can be used to dispatch tasks to
// _ready is protected by the Scheduler's condition variable & mutex
bool _ready;
clock::time_point _lastJobStarted;
std::unique_ptr<SupervisedSchedulerWorkerThread> _thread;

// initialize with harmless defaults: spin once, sleep forever
explicit WorkerState(SupervisedScheduler& scheduler);
WorkerState(WorkerState&& that) noexcept;
WorkerState(WorkerState const&) = delete;
WorkerState& operator=(WorkerState const&) = delete;

// cppcheck-suppress missingOverride
bool start();
Expand Down
0