@@ -68,7 +68,7 @@ bool isDirectDeadlockLane(RequestLane lane) {
68
68
namespace {
69
69
typedef std::chrono::time_point<std::chrono::steady_clock> time_point;
70
70
71
- // value initialise these arrays, otherwise mac will crash
71
+ // value-initialize these arrays, otherwise mac will crash
72
72
thread_local time_point conditionQueueFullSince{};
73
73
thread_local uint_fast32_t queueWarningTick{};
74
74
@@ -80,7 +80,7 @@ time_point lastQueueFullWarning[3];
80
80
int64_t fullQueueEvents[3 ] = {0 , 0 , 0 };
81
81
std::mutex fullQueueWarningMutex[3 ];
82
82
83
- void logQueueWarningEveryNowAndThen (int64_t events) {
83
+ void logQueueWarningEveryNowAndThen (int64_t events, uint64_t maxQueueSize ) {
84
84
auto const now = std::chrono::steady_clock::now ();
85
85
uint64_t totalEvents;
86
86
bool printLog = false ;
@@ -99,13 +99,13 @@ void logQueueWarningEveryNowAndThen(int64_t events) {
99
99
100
100
if (printLog) {
101
101
LOG_TOPIC (" dead2" , WARN, Logger::THREADS)
102
- << " Scheduler queue"
102
+ << " Scheduler queue with max capacity " << maxQueueSize
103
103
<< " is filled more than 50% in last " << sinceLast.count ()
104
- << " s. (happened " << totalEvents << " times since last message)" ;
104
<
8000
span class="diff-text-marker">+ << " s (happened " << totalEvents << " times since last message)" ;
105
105
}
106
106
}
107
107
108
- void logQueueFullEveryNowAndThen (int64_t fifo) {
108
+ void logQueueFullEveryNowAndThen (int64_t fifo, uint64_t maxQueueSize ) {
109
109
auto const & now = std::chrono::steady_clock::now ();
110
110
uint64_t events;
111
111
bool printLog = false ;
@@ -122,7 +122,8 @@ void logQueueFullEveryNowAndThen(int64_t fifo) {
122
122
123
123
if (printLog) {
124
124
LOG_TOPIC (" dead1" , WARN, Logger::THREADS)
125
- << " Scheduler queue " << fifo << " is full. (happened " << events
125
+ << " Scheduler queue " << fifo << " with max capacity " << maxQueueSize
126
+ << " is full (happened " << events
126
127
<< " times since last message)" ;
127
128
}
128
129
}
@@ -153,7 +154,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread {
153
154
explicit SupervisedSchedulerWorkerThread (SupervisedScheduler& scheduler)
154
155
: Thread(" SchedWorker" ), SupervisedSchedulerThread(scheduler) {}
155
156
~SupervisedSchedulerWorkerThread () { shutdown (); }
156
8000
code>
- void run () override { _scheduler.runWorker (); };
157
+ void run () override { _scheduler.runWorker (); }
157
158
};
158
159
159
160
} // namespace arangodb
@@ -172,28 +173,35 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
172
173
_definitiveWakeupTime_ns(100000 ),
173
174
_maxNumWorker(maxThreads),
174
175
_numIdleWorker(minThreads),
175
- _maxFifoSize(maxQueueSize) {
176
- _queue[0 ].reserve (maxQueueSize);
177
- _queue[1 ].reserve (fifo1Size);
178
- _queue[2 ].reserve (fifo2Size);
176
+ _maxFifoSize(maxQueueSize),
177
+ _fifo1Size(fifo1Size),
178
+ _fifo2Size(fifo2Size) {
179
+ _queues[0 ].reserve (maxQueueSize);
180
+ _queues[1 ].reserve (fifo1Size);
181
+ _queues[2 ].reserve (fifo2Size);
179
182
}
180
183
181
184
SupervisedScheduler::~SupervisedScheduler () {}
182
185
183
186
bool SupervisedScheduler::queue (RequestLane lane, std::function<void ()> handler,
184
187
bool allowDirectHandling) {
185
- if (!isDirectDeadlockLane (lane) && allowDirectHandling &&
186
- !ServerState::instance ()->isClusterRole () && (_jobsSubmitted - _jobsDone) < 2 ) {
187
- _jobsSubmitted.fetch_add (1 , std::memory_order_relaxed);
188
- _jobsDequeued.fetch_add (1 , std::memory_order_relaxed);
189
- _jobsDirectExec.fetch_add (1 , std::memory_order_release);
190
- try {
191
- handler ();
192
- _jobsDone.fetch_add (1 , std::memory_order_release);
193
- return true ;
194
- } catch (...) {
195
- _jobsDone.fetch_add (1 , std::memory_order_release);
196
- throw ;
188
+ if (!isDirectDeadlockLane (lane) &&
189
+ allowDirectHandling &&
190
+ !ServerState::instance ()->isClusterRole ()) {
191
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_acquire);
192
+ uint64_t const jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed);
193
+ if (jobsSubmitted - jobsDone < 2 ) {
194
+ _jobsSubmitted.fetch_add (1 , std::memory_order_relaxed);
195
+ _jobsDequeued.fetch_add (1 , std::memory_order_relaxed);
196
+ _jobsDirectExec.fetch_add (1 , std::memory_order_relaxed);
197
+ try {
198
+ handler ();
199
+ _jobsDone.fetch_add (1 , std::memory_order_release);
200
+ return true ;
201
+ } catch (...) {
202
+ _jobsDone.fetch_add (1 , std::memory_order_release);
203
+ throw ;
204
+ }
197
205
}
198
206
}
199
207
@@ -204,8 +212,15 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
204
212
205
213
auto work = std::make_unique<WorkItem>(std::move (handler));
206
214
207
- if (!_queue[queueNo].push (work.get ())) {
208
- logQueueFullEveryNowAndThen (queueNo);
215
+ if (!_queues[queueNo].bounded_push (work.get ())) {
216
+ uint64_t maxSize = _maxFifoSize;
217
+ if (queueNo == 1 ) {
218
+ maxSize = _fifo1Size;
219
+ } else if (queueNo == 2 ) {
220
+ maxSize = _fifo2Size;
221
+ }
222
+ LOG_TOPIC (" 98d94" , DEBUG, Logger::THREADS) << " unable to push job to scheduler queue: queue is full" ;
223
+ logQueueFullEveryNowAndThen (queueNo, maxSize);
209
224
return false ;
210
225
}
211
226
// queue now has ownership for the WorkItem
@@ -214,27 +229,32 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
214
229
static thread_local uint64_t lastSubmitTime_ns;
215
230
216
231
// use memory order release to make sure, pushed item is visible
217
- uint64_t jobsSubmitted = _jobsSubmitted.fetch_add (1 , std::memory_order_release);
218
- uint64_t approxQueueLength = jobsSubmitted - _jobsDone;
232
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_acquire);
233
+ uint64_t const jobsSubmitted = _jobsSubmitted.fetch_add (1 , std::memory_order_relaxed);
234
+ uint64_t const approxQueueLength = jobsSubmitted - jobsDone;
235
+
236
+ // to make sure the queue length hasn't underflowed
237
+ TRI_ASSERT (jobsDone <= jobsSubmitted);
238
+
219
239
uint64_t now_ns = getTickCount_ns ();
220
240
uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns;
221
241
lastSubmitTime_ns = now_ns;
222
242
<
10000
tr class="diff-line-row">223
243
if (approxQueueLength > _maxFifoSize / 2 ) {
224
- if ((queueWarningTick++ & 0xFF ) == 0 ) {
244
+ if ((:: queueWarningTick++ & 0xFF ) == 0 ) {
225
245
auto const & now = std::chrono::steady_clock::now ();
226
- if (conditionQueueFullSince == time_point{}) {
227
- logQueueWarningEveryNowAndThen (queueWarningTick);
228
- conditionQueueFullSince = now;
229
- } else if (now - conditionQueueFullSince > std::chrono::seconds (5 )) {
230
- logQueueWarningEveryNowAndThen (queueWarningTick);
231
- queueWarningTick = 0 ;
232
- conditionQueueFullSince = now;
246
+ if (:: conditionQueueFullSince == time_point{}) {
247
+ logQueueWarningEveryNowAndThen (:: queueWarningTick, _maxFifoSize );
248
+ :: conditionQueueFullSince = now;
249
+ } else if (now - :: conditionQueueFullSince > std::chrono::seconds (5 )) {
250
+ logQueueWarningEveryNowAndThen (:: queueWarningTick, _maxFifoSize );
251
+ :: queueWarningTick = 0 ;
252
+ :: conditionQueueFullSince = now;
233
253
}
234
254
}
235
255
} else {
236
- queueWarningTick = 0 ;
237
- conditionQueueFullSince = time_point{};
256
+ :: queueWarningTick = 0 ;
257
+ :: conditionQueueFullSince = time_point{};
238
258
}
239
259
240
260
bool doNotify = false ;
@@ -263,9 +283,6 @@ bool SupervisedScheduler::start() {
263
283
}
264
284
265
285
void SupervisedScheduler::shutdown () {
266
- // THIS IS WHAT WE SHOULD AIM FOR, BUT NOBODY CARES
267
- // TRI_ASSERT(_jobsSubmitted <= _jobsDone);
268
-
269
286
{
270
287
std::unique_lock<std::mutex> guard (_mutex);
271
288
_stopping = true ;
@@ -275,8 +292,8 @@ void SupervisedScheduler::shutdown() {
275
292
Scheduler::shutdown ();
276
293
277
294
while (true ) {
278
- auto jobsSubmitted = _jobsSubmitted .load ();
279
- auto jobsDone = _jobsDone .load ();
295
+ auto jobsDone = _jobsDone .load (std::memory_order_acquire );
296
+ auto jobsSubmitted = _jobsSubmitted .load (std::memory_order_relaxed );
280
297
281
298
if (jobsSubmitted <= jobsDone) {
282
299
break ;
@@ -344,7 +361,7 @@ void SupervisedScheduler::runWorker() {
344
361
break ;
345
362
}
346
363
347
- _jobsDequeued++ ;
364
+ _jobsDequeued. fetch_add ( 1 , std::memory_order_relaxed) ;
348
365
349
366
state->_lastJobStarted = clock::now ();
350
367
state->_working = true ;
@@ -372,8 +389,8 @@ void SupervisedScheduler::runSupervisor() {
372
389
373
390
while (!_stopping) {
374
391
uint64_t jobsDone = _jobsDone.load (std::memory_order_acquire);
375
- uint64_t jobsSubmitted = _jobsSubmitted.load (std::memory_order_acquire );
376
- uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_acquire );
392
+ uint64_t jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed );
393
+ uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_relaxed );
377
394
378
395
if (jobsDone == lastJobsDone && (jobsDequeued < jobsSubmitted)) {
379
396
jobsStallingTick++;
@@ -485,8 +502,9 @@ bool SupervisedScheduler::canPullFromQueue(uint64_t queueIndex) const {
485
502
// then a job gets done fast (eg dequeued++, done++)
486
503
// and then we read done.
487
504
uint64_t jobsDone = _jobsDone.load (std::memory_order_acquire);
488
- uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_acquire );
505
+ uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_relaxed );
489
506
TRI_ASSERT (jobsDequeued >= jobsDone);
507
+
490
508
switch (queueIndex) {
491
509
case 0 :
492
510
// We can always! pull from high priority
@@ -511,7 +529,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
511
529
auto queueIdx = triesCount % 3 ;
512
530
// Order of this if is important! First check if we are allowed to pull,
513
531
// then really pull from queue
514
- if (canPullFromQueue (queueIdx) && _queue [queueIdx].pop (work)) {
532
+ if (canPullFromQueue (queueIdx) && _queues [queueIdx].pop (work)) {
515
533
return std::unique_ptr<WorkItem>(work);
516
534
}
517
535
@@ -537,7 +555,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
537
555
void SupervisedScheduler::startOneThread () {
538
556
// TRI_ASSERT(_numWorkers < _maxNumWorker);
539
557
if (_numWorkers + _abandonedWorkerStates.size () >= _maxNumWorker) {
540
- return ; // do not add more threads, than maximum allows
558
+ return ; // do not add more threads than maximum allows
541
559
}
542
560
543
561
std::unique_lock<std::mutex> guard (_mutexSupervisor);
@@ -622,7 +640,7 @@ Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const {
622
640
uint64_t const numWorkers = _numWorkers.load (std::memory_order_relaxed);
623
641
624
642
// read _jobsDone first, so the differences of the counters cannot get negative
625
- uint64_t const jobsDone = _jobsDone.load (std::memory_order_relaxed );
643
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_acquire );
626
644
uint64_t const jobsDequeued = _jobsDequeued.load (std::memory_order_relaxed);
627
645
uint64_t const jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed);
628
646
0 commit comments