8000 fix potential spurious wakeups in scheduler code (#9770) · arangodb/arangodb@df504bb · GitHub
[go: up one dir, main page]

Skip to content

Commit df504bb

Browse files
authored
fix potential spurious wakeups in scheduler code (#9770)
1 parent 49dfb0a commit df504bb

File tree

2 files changed

+33
-27
lines changed

2 files changed

+33
-27
lines changed

arangod/Scheduler/SupervisedScheduler.cpp

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -317,22 +317,24 @@ void SupervisedScheduler::runWorker() {
317317
id = _numWorkers++; // increase the number of workers here, to obtain the id
318318
// copy shared_ptr with worker state
319319
state = _workerStates.back();
320-
// inform the supervisor that this thread is alive
321-
_conditionSupervisor.notify_one();
322-
}
323320

324-
state->_sleepTimeout_ms = 20 * (id + 1);
325-
// cap the timeout to some boundary value
326-
if (state->_sleepTimeout_ms >= 1000) {
327-
state->_sleepTimeout_ms = 1000;
328-
}
321+
state->_sleepTimeout_ms = 20 * (id + 1);
322+
// cap the timeout to some boundary value
323+
if (state->_sleepTimeout_ms >= 1000) {
324+
state->_sleepTimeout_ms = 1000;
325+
}
329326

330-
if (id < 32U) {
331-
// 512 >> 32 => undefined behavior
332-
state->_queueRetryCount = (uint64_t(512) >> id) + 3;
333-
} else {
334-
// we want at least 3 retries
335-
state->_queueRetryCount = 3;
327+
if (id < 32U) {
328+
// 512 >> 32 => undefined behavior
329+
state->_queueRetryCount = (uint64_t(512) >> id) + 3;
330+
} else {
331+
// we want at least 3 retries
332+
state->_queueRetryCount = 3;
333+
}
334+
335+
// inform the supervisor that this thread is alive
336+
state->_ready = true;
337+
_conditionSupervisor.notify_one();
336338
}
337339

338340
while (true) {
@@ -548,16 +550,21 @@ void SupervisedScheduler::startOneThread() {
548550
#pragma warning(pop)
549551
#endif
550552

551-
if (!_workerStates.back()->start()) {
553+
auto& state = _workerStates.back();
554+
555+
if (!state->start()) {
552556
// failed to start a worker
553557
_workerStates.pop_back(); // pop_back deletes shared_ptr, which deletes thread
554558
LOG_TOPIC("913b5", ERR, Logger::THREADS)
555559
<< "could not start additional worker thread";
556-
557-
} else {
558-
LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread";
559-
_conditionSupervisor.wait(guard);
560+
return;
560561
}
562+
563+
// sync with runWorker()
564+
_conditionSupervisor.wait(guard, [&state]() {
565+
return state->_ready;
566+
});
567+
LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread";
561568
}
562569

563570
void SupervisedScheduler::stopOneThread() {
@@ -590,18 +597,12 @@ void SupervisedScheduler::stopOneThread() {
590597
}
591598
}
592599

593-
SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler::WorkerState&& that) noexcept
594-
: _queueRetryCount(that._queueRetryCount),
595-
_sleepTimeout_ms(that._sleepTimeout_ms),
596-
_stop(that._stop.load()),
597-
_working(false),
598-
_thread(std::move(that._thread)) {}
599-
600600
SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler)
601601
: _queueRetryCount(100),
602602
_sleepTimeout_ms(100),
603603
_stop(false),
604604
_working(false),
605+
_ready(false),
605606
_thread(new SupervisedSchedulerWorkerThread(scheduler)) {}
606607

607608
bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }

arangod/Scheduler/SupervisedScheduler.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,17 @@ class SupervisedScheduler final : public Scheduler {
114114
uint64_t _queueRetryCount; // t1
115115
uint64_t _sleepTimeout_ms; // t2
116116
std::atomic<bool> _stop, _working;
117+
// _ready = false means the Worker is not properly initialized
118+
// _ready = true means it is initialized and can be used to dispatch tasks to
119+
// _ready is protected by the Scheduler's condition variable & mutex
120+
bool _ready;
117121
clock::time_point _lastJobStarted;
118122
std::unique_ptr<SupervisedSchedulerWorkerThread> _thread;
119123

120124
// initialize with harmless defaults: spin once, sleep forever
121125
explicit WorkerState(SupervisedScheduler& scheduler);
122-
WorkerState(WorkerState&& that) noexcept;
126+
WorkerState(WorkerState const&) = delete;
127+
WorkerState& operator=(WorkerState const&) = delete;
123128

124129
// cppcheck-suppress missingOverride
125130
bool start();

0 commit comments

Comments
 (0)
0