@@ -97,7 +97,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread {
97
97
SupervisedScheduler::SupervisedScheduler (uint64_t minThreads, uint64_t maxThreads,
98
98
uint64_t maxQueueSize,
99
99
uint64_t fifo1Size, uint64_t fifo2Size)
100
- : _numWorker (0 ),
100
+ : _numWorkers (0 ),
101
101
_stopping(false ),
102
102
_jobsSubmitted(0 ),
103
103
_jobsDequeued(0 ),
@@ -138,12 +138,13 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
138
138
TRI_ASSERT (queueNo <= 2 );
139
139
TRI_ASSERT (isStopping () == false );
140
140
141
- WorkItem* work = new WorkItem (std::move (handler));
141
+ auto work = std::make_unique< WorkItem> (std::move (handler));
142
142
143
- if (!_queue[queueNo].push (work)) {
144
- delete work;
143
+ if (!_queue[queueNo].push (work.get ())) {
145
144
return false ;
146
145
}
146
+ // queue now has ownership for the WorkItem
147
+ work.release ();
147
148
148
149
static thread_local uint64_t lastSubmitTime_ns;
149
150
@@ -208,7 +209,7 @@ void SupervisedScheduler::shutdown() {
208
209
// call the destructor of all threads
209
210
_manager.reset ();
210
211
211
- while (_numWorker > 0 ) {
212
+ while (_numWorkers > 0 ) {
212
213
stopOneThread ();
213
214
}
214
215
@@ -230,7 +231,7 @@ void SupervisedScheduler::runWorker() {
230
231
231
232
{
232
233
std::lock_guard<std::mutex> guard (_mutexSupervisor);
233
- id = _numWorker ++; // increase the number of workers here, to obtains the id
234
+ id = _numWorkers ++; // increase the number of workers here, to obtains the id
234
235
// copy shared_ptr with worker state
235
236
state = _workerStates.back ();
236
237
// inform the supervisor that this thread is alive
@@ -266,7 +267,7 @@ void SupervisedScheduler::runWorker() {
266
267
}
267
268
268
269
void SupervisedScheduler::runSupervisor () {
269
- while (_numWorker < _numIdleWorker) {
270
+ while (_numWorkers < _numIdleWorker) {
270
271
startOneThread ();
271
272
}
272
273
@@ -286,8 +287,8 @@ void SupervisedScheduler::runSupervisor() {
286
287
287
288
uint64_t queueLength = jobsSubmitted - jobsDequeued;
288
289
289
- bool doStartOneThread = (((queueLength >= 3 * _numWorker ) &&
290
- ((lastQueueLength + _numWorker ) < queueLength)) ||
290
+ bool doStartOneThread = (((queueLength >= 3 * _numWorkers ) &&
291
+ ((lastQueueLength + _numWorkers ) < queueLength)) ||
291
292
(lastJobsSubmitted > jobsDone)) &&
292
293
(queueLength != 0 );
293
294
@@ -300,10 +301,10 @@ void SupervisedScheduler::runSupervisor() {
300
301
lastQueueLength = queueLength;
301
302
lastJobsSubmitted = jobsSubmitted;
302
303
303
- if (doStartOneThread && _numWorker < _maxNumWorker) {
304
+ if (doStartOneThread && _numWorkers < _maxNumWorker) {
304
305
jobsStallingTick = 0 ;
305
306
startOneThread ();
306
- } else if (doStopOneThread && _numWorker > _numIdleWorker) {
307
+ } else if (doStopOneThread && _numWorkers > _numIdleWorker) {
307
308
stopOneThread ();
308
309
}
309
310
@@ -361,7 +362,7 @@ void SupervisedScheduler::sortoutLongRunningThreads() {
361
362
// Move that thread to the abandoned thread
362
363
_abandonedWorkerStates.push_back (std::move (state));
363
364
i = _workerStates.erase (i);
364
- _numWorker --;
365
+ _numWorkers --;
365
366
366
367
// and now start another thread!
367
368
startOneThread ();
@@ -407,8 +408,8 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
407
408
}
408
409
409
410
void SupervisedScheduler::startOneThread () {
410
- // TRI_ASSERT(_numWorker < _maxNumWorker);
411
- if (_numWorker + _abandonedWorkerStates.size () >= _maxNumWorker) {
411
+ // TRI_ASSERT(_numWorkers < _maxNumWorker);
412
+ if (_numWorkers + _abandonedWorkerStates.size () >= _maxNumWorker) {
412
413
return ; // do not add more threads, than maximum allows
413
414
}
414
415
@@ -439,7 +440,7 @@ void SupervisedScheduler::startOneThread() {
439
440
}
440
441
441
442
void SupervisedScheduler::stopOneThread () {
442
- TRI_ASSERT (_numWorker > 0 );
443
+ TRI_ASSERT (_numWorkers > 0 );
443
444
444
445
// copy shared_ptr
445
446
auto state = _workerStates.back ();
@@ -457,7 +458,7 @@ void SupervisedScheduler::stopOneThread() {
457
458
// the cleanup list and wait for its termination.
458
459
//
459
460
// Since the thread is effectively taken out of the pool, decrease the number of worker.
460
- _numWorker --;
461
+ _numWorkers --;
461
462
462
463
if (state->_thread ->isRunning ()) {
463
464
LOG_TOPIC (" 73365" , TRACE, Logger::THREADS) << " Abandon one thread." ;
@@ -487,35 +488,32 @@ bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }
487
488
// ---------------------------------------------------------------------------
488
489
// Statistics Stuff
489
490
// ---------------------------------------------------------------------------
490
- std::string SupervisedScheduler::infoStatus () const {
491
- // TODO: compare with old output format
492
- // Does some code rely on that string or is it for humans?
493
- uint64_t numWorker = _numWorker.load (std::memory_order_relaxed);
494
- uint64_t queueLength = _jobsSubmitted.load (std::memory_order_relaxed) -
495
- _jobsDone.load (std::memory_order_relaxed);
496
-
497
- return " scheduler threads " + std::to_string (numWorker) + " (" +
498
- std::to_string (_numIdleWorker) + " <" + std::to_string (_maxNumWorker) +
499
- " ) queued " + std::to_string (queueLength) +
500
- " directly exec " + std::to_string (_jobsDirectExec.load (std::memory_order_relaxed));
501
- }
502
491
503
492
Scheduler::QueueStatistics SupervisedScheduler::queueStatistics () const {
504
- uint64_t numWorker = _numWorker.load (std::memory_order_relaxed);
505
- uint64_t queueLength = _jobsSubmitted.load (std::memory_order_relaxed) -
506
- _jobsDone.load (std::memory_order_relaxed);
493
+ // we need to read multiple atomics here. as all atomic reads happen independently
494
+ // without a mutex outside, the overall picture may be inconsistent
495
+
496
+ uint64_t const numWorkers = _numWorkers.load (std::memory_order_relaxed);
497
+
498
+ // read _jobsDone first, so the differences of the counters cannot get negative
499
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_relaxed);
500
+ uint64_t const jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
501
+ uint64_t const jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed);
502
+
503
+ uint64_t const queued = jobsSubmitted - jobsDone;
504
+ uint64_t const working = jobsDequeued - jobsDone;
505
+
506
+ uint64_t const directExec = _jobsDirectExec.load (std::memory_order_relaxed);
507
507
508
- return QueueStatistics{numWorker, numWorker, queueLength, 0 , 0 , 0 };
508
+ return QueueStatistics{numWorkers, 0 , queued, working, directExec };
509
509
}
510
510
511
- void SupervisedScheduler::addQueueStatistics (velocypack::Builder& b) const {
512
- uint64_t numWorker = _numWorker.load (std::memory_order_relaxed);
513
- uint64_t queueLength = _jobsSubmitted.load (std::memory_order_relaxed) -
514
- _jobsDone.load (std::memory_order_relaxed);
515
- uint64_t directExec = _jobsDirectExec.load (std::memory_order_relaxed);
511
+ void SupervisedScheduler::toVelocyPack (velocypack::Builder& b) const {
512
+ QueueStatistics qs = queueStatistics ();
516
513
517
- // TODO: previous scheduler filled out a lot more fields, relevant?
518
- b.add (" scheduler-threads" , VPackValue (numWorker));
519
- b.add (" queued" , VPackValue (queueLength));
520
- b.add (" directExec" , VPackValue (directExec));
514
+ b.add (" scheduler-threads" , VPackValue (qs._running )); // numWorkers
515
+ b.add (" blocked" , VPackValue (qs._blocked )); // obsolete
516
+ b.add (" queued" , VPackValue (qs._queued ));
517
+ b.add (" in-progress" , VPackValue (qs._working ));
518
+ b.add (" direct-exec" , VPackValue (qs._directExec ));
521
519
}
0 commit comments