326 |
| - if (state->_sleepTimeout_ms >= 1000) { | 327 |
| - state->_sleepTimeout_ms = 1000; |
328 |
| - } |
| 321 | + state->_sleepTimeout_ms = 20 * (id + 1); |
| 322 | + // cap the timeout to some boundary value |
| 323 | + if (state->_sleepTimeout_ms >= 1000) { |
| 324 | + state->_sleepTimeout_ms = 1000; |
| 325 | + } |
329 | 326 |
|
330 |
| - if (id < 32U) { |
331 |
| - // 512 >> 32 => undefined behavior |
332 |
| - state->_queueRetryCount = (uint64_t(512) >> id) + 3; |
333 |
| - } else { |
334 |
| - // we want at least 3 retries |
335 |
| - state->_queueRetryCount = 3; |
| 327 | + if (id < 32U) { |
| 328 | + // 512 >> 32 => undefined behavior |
| 329 | + state->_queueRetryCount = (uint64_t(512) >> id) + 3; |
| 330 | + } else { |
| 331 | + // we want at least 3 retries |
| 332 | + state->_queueRetryCount = 3; |
| 333 | + } |
| 334 | + |
| 335 | + // inform the supervisor that this thread is alive |
| 336 | + state->_ready = true; |
| 337 | + _conditionSupervisor.notify_one(); |
336 | 338 | }
|
337 | 339 |
|
338 | 340 | while (true) {
|
@@ -548,16 +550,21 @@ void SupervisedScheduler::startOneThread() {
|
548 | 550 | #pragma warning(pop)
|
549 | 551 | #endif
|
550 | 552 |
|
551 |
| - if (!_workerStates.back()->start()) { |
| 553 | + auto& state = _workerStates.back(); |
| 554 | + |
| 555 | + if (!state->start()) { |
552 | 556 | // failed to start a worker
|
553 | 557 | _workerStates.pop_back(); // pop_back deletes shared_ptr, which deletes thread
|
554 | 558 | LOG_TOPIC("913b5", ERR, Logger::THREADS)
|
555 | 559 | << "could not start additional worker thread";
|
556 |
| - |
557 |
| - } else { |
558 |
| - LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread"; |
559 |
| - _conditionSupervisor.wait(guard); |
| 560 | + return; |
560 | 561 | }
|
| 562 | + |
| 563 | + // sync with runWorker() |
| 564 | + _conditionSupervisor.wait(guard, [&state]() { |
| 565 | + return state->_ready; |
| 566 | + }); |
| 567 | + LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread"; |
561 | 568 | }
|
562 | 569 |
|
563 | 570 | void SupervisedScheduler::stopOneThread() {
|
@@ -590,18 +597,12 @@ void SupervisedScheduler::stopOneThread() {
|
590 | 597 | }
|
591 | 598 | }
|
592 | 599 |
|
593 |
| -SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler::WorkerState&& that) noexcept |
594 |
| - : _queueRetryCount(that._queueRetryCount), |
595 |
| - _sleepTimeout_ms(that._sleepTimeout_ms), |
596 |
| - _stop(that._stop.load()), |
597 |
| - _working(false), |
598 |
| - _thread(std::move(that._thread)) {} |
599 |
| - |
600 | 600 | SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler)
|
601 | 601 | : _queueRetryCount(100),
|
602 | 602 | _sleepTimeout_ms(100),
|
603 | 603 | _stop(false),
|
604 | 604 | _working(false),
|
| 605 | + _ready(false), |
605 | 606 | _thread(new SupervisedSchedulerWorkerThread(scheduler)) {}
|
606 | 607 |
|
607 | 608 | bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }
|
|
0 commit comments