@@ -68,7 +68,7 @@ bool isDirectDeadlockLane(RequestLane lane) {
68
68
namespace {
69
69
typedef std::chrono::time_point<std::chrono::steady_clock> time_point;
70
70
71
- // value initialise these arrays, otherwise mac will crash
71
+ // value-initialize these arrays, otherwise mac will crash
72
72
thread_local time_point conditionQueueFullSince{};
73
73
thread_local uint_fast32_t queueWarningTick{};
74
74
@@ -80,7 +80,7 @@ time_point lastQueueFullWarning[3];
80
80
int64_t fullQueueEvents[3 ] = {0 , 0 , 0 };
81
81
std::mutex fullQueueWarningMutex[3 ];
82
82
83
- void logQueueWarningEveryNowAndThen (int64_t events) {
83
+ void logQueueWarningEveryNowAndThen (int64_t events, uint64_t maxQueueSize ) {
84
84
auto const now = std::chrono::steady_clock::now ();
85
85
uint64_t totalEvents;
86
86
bool printLog = false ;
@@ -99,13 +99,13 @@ void logQueueWarningEveryNowAndThen(int64_t events) {
99
99
100
100
if (printLog) {
101
101
LOG_TOPIC (" dead2" , WARN, Logger::THREADS)
102
- << " Scheduler queue"
102
+ << " Scheduler queue with max capacity " << maxQueueSize
103
103
<< " is filled more than 50% in last " << sinceLast.count ()
104
- << " s. (happened " << totalEvents << " times since last message)" ;
104
+ << " s (happened " << totalEvents << " times since last message)" ;
105
105
}
106
106
}
107
107
108
- void logQueueFullEveryNowAndThen (int64_t fifo) {
108
+ void logQueueFullEveryNowAndThen (int64_t fifo, uint64_t maxQueueSize ) {
109
109
auto const & now = std::chrono::steady_clock::now ();
110
110
uint64_t events;
111
111
bool printLog = false ;
@@ -122,7 +122,8 @@ void logQueueFullEveryNowAndThen(int64_t fifo) {
122
122
123
123
if (printLog) {
124
124
LOG_TOPIC (" dead1" , WARN, Logger::THREADS)
125
- << " Scheduler queue " << fifo << " is full. (happened " << events
125
+ << " Scheduler queue " << fifo << " with max capacity " << maxQueueSize
126
+ << " is full (happened " << events
126
127
<< " times since last message)" ;
127
128
}
128
129
}
@@ -153,7 +154,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread {
153
154
explicit SupervisedSchedulerWorkerThread (SupervisedScheduler& scheduler)
154
155
: Thread(" SchedWorker" ), SupervisedSchedulerThread(scheduler) {}
155
156
~SupervisedSchedulerWorkerThread () { shutdown (); }
156
- void run () override { _scheduler.runWorker (); };
157
+ void run () override { _scheduler.runWorker (); }
157
158
};
158
159
159
160
} // namespace arangodb
@@ -172,69 +173,92 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
172
173
_definitiveWakeupTime_ns(100000 ),
173
174
_maxNumWorker(maxThreads),
174
175
_numIdleWorker(minThreads),
175
- _maxFifoSize(maxQueueSize) {
176
- _queue[0 ].reserve (maxQueueSize);
177
- _queue[1 ].reserve (fifo1Size);
178
- _queue[2 ].reserve (fifo2Size);
176
+ _maxFifoSize(maxQueueSize),
177
+ _fifo1Size(fifo1Size),
178
+ _fifo2Size(fifo2Size) {
179
+ _queues[0 ].reserve (maxQueueSize);
180
+ _queues[1 ].reserve (fifo1Size);
181
+ _queues[2 ].reserve (fifo2Size);
179
182
}
180
183
181
184
SupervisedScheduler::~SupervisedScheduler () {}
182
185
183
186
bool SupervisedScheduler::queue (RequestLane lane, std::function<void ()> handler,
184
187
bool allowDirectHandling) {
185
- if (!isDirectDeadlockLane (lane) && allowDirectHandling &&
186
- !ServerState::instance ()->isClusterRole () && (_jobsSubmitted - _jobsDone) < 2 ) {
187
- _jobsSubmitted.fetch_add (1 , std::memory_order_relaxed);
188
- _jobsDequeued.fetch_add (1 , std::memory_order_relaxed);
189
- _jobsDirectExec.fetch_add (1 , std::memory_order_release);
190
- try {
191
- handler ();
192
- _jobsDone.fetch_add (1 , std::memory_order_release);
193
- return true ;
194
- } catch (...) {
195
- _jobsDone.fetch_add (1 , std::memory_order_release);
196
- throw ;
188
+ if (!isDirectDeadlockLane (lane) &&
189
+ allowDirectHandling &&
190
+ !ServerState::instance ()->isClusterRole ()) {
191
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_acquire);
192
+ uint64_t const jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed);
193
+ if (jobsSubmitted - jobsDone < 2 ) {
194
+ _jobsSubmitted.fetch_add (1 , std::memory_order_relaxed);
195
+ _jobsDequeued.fetch_add (1 , std::memory_order_relaxed);
196
+ _jobsDirectExec.fetch_add (1 , std::memory_order_relaxed);
197
+ try {
198
+ handler ();
199
+ _jobsDone.fetch_add (1 , std::memory_order_release);
200
+ return true ;
201
+ } catch (...) {
202
+ _jobsDone.fetch_add (1 , std::memory_order_release);
203
+ throw ;
204
+ }
197
205
}
198
206
}
199
-
200
- size_t queueNo = static_cast <size_t >(PriorityRequestLane (lane));
207
+
208
+ auto work = std::make_unique<WorkItem>(std::move (handler));
209
+
210
+ // use memory order acquire to make sure, pushed item is visible
211
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_acquire);
212
+ uint64_t const jobsSubmitted = _jobsSubmitted.fetch_add (1 , std::memory_order_relaxed);
213
+
214
+ // to make sure the queue length hasn't underflowed
215
+ TRI_ASSERT (jobsDone <= jobsSubmitted);
216
+
217
+ uint64_t const approxQueueLength = jobsSubmitted - jobsDone;
218
+
219
+ size_t const queueNo = static_cast <size_t >(PriorityRequestLane (lane));
201
220
202
221
TRI_ASSERT (queueNo <= 2 );
203
222
TRI_ASSERT (isStopping () == false );
204
223
205
- auto work = std::make_unique<WorkItem>(std::move (handler));
224
+ if (!_queues[queueNo].bounded_push (work.get ())) {
225
+ _jobsSubmitted.fetch_sub (1 , std::memory_order_release);
206
226
207
- if (!_queue[queueNo].push (work.get ())) {
208
- logQueueFullEveryNowAndThen (queueNo);
227
+ uint64_t maxSize = _maxFifoSize;
228
+ if (queueNo == 1 ) {
229
+ maxSize = _fifo1Size;
230
+ } else if (queueNo == 2 ) {
231
+ maxSize = _fifo2Size;
232
+ }
233
+ LOG_TOPIC (" 98d94" , DEBUG, Logger::THREADS) << " unable to push job to scheduler queue: queue is full" ;
234
+ logQueueFullEveryNowAndThen (queueNo, maxSize);
209
235
return false ;
210
236
}
237
+
211
238
// queue now has ownership for the WorkItem
212
239
work.release ();
213
240
214
- static thread_local uint64_t lastSubmitTime_ns;
241
+ static thread_local uint64_t lastSubmitTime_ns = 0 ;
215
242
216
- // use memory order release to make sure, pushed item is visible
217
- uint64_t jobsSubmitted = _jobsSubmitted.fetch_add (1 , std::memory_order_release);
218
- uint64_t approxQueueLength = jobsSubmitted - _jobsDone;
219
243
uint64_t now_ns = getTickCount_ns ();
220
244
uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns;
221
245
lastSubmitTime_ns = now_ns;
222
246
223
247
if (approxQueueLength > _maxFifoSize / 2 ) {
224
- if ((queueWarningTick++ & 0xFF ) == 0 ) {
248
+ if ((:: queueWarningTick++ & 0xFF ) == 0 ) {
225
249
auto const & now = std::chrono::steady_clock::now ();
226
- if (conditionQueueFullSince == time_point{}) {
227
- logQueueWarningEveryNowAndThen (queueWarningTick);
228
- conditionQueueFullSince = now;
229
- } else if (now - conditionQueueFullSince > std::chrono::seconds (5 )) {
230
- logQueueWarningEveryNowAndThen (queueWarningTick);
231
- queueWarningTick = 0 ;
232
- conditionQueueFullSince = now;
250
+ if (:: conditionQueueFullSince == time_point{}) {
251
+ logQueueWarningEveryNowAndThen (:: queueWarningTick, _maxFifoSize );
252
+ :: conditionQueueFullSince = now;
253
+ } else if (now - :: conditionQueueFullSince > std::chrono::seconds (5 )) {
254
+ logQueueWarningEveryNowAndThen (:: queueWarningTick, _maxFifoSize );
255
+ :: queueWarningTick = 0 ;
256
+ :: conditionQueueFullSince = now;
233
257
}
234
258
}
235
259
} else {
236
- queueWarningTick = 0 ;
237
- conditionQueueFullSince = time_point{};
260
+ :: queueWarningTick = 0 ;
261
+ :: conditionQueueFullSince = time_point{};
238
262
}
239
263
240
264
bool doNotify = false ;
@@ -263,9 +287,6 @@ bool SupervisedScheduler::start() {
263
287
}
264
288
265
289
void SupervisedScheduler::shutdown () {
266
- // THIS IS WHAT WE SHOULD AIM FOR, BUT NOBODY CARES
267
- // TRI_ASSERT(_jobsSubmitted <= _jobsDone);
268
-
269
290
{
270
291
std::unique_lock<std::mutex> guard (_mutex);
271
292
_stopping = true ;
@@ -275,8 +296,8 @@ void SupervisedScheduler::shutdown() {
275
296
Scheduler::shutdown ();
276
297
277
298
while (true ) {
278
- auto jobsSubmitted = _jobsSubmitted .load ();
279
- auto jobsDone = _jobsDone .load ();
299
+ auto jobsDone = _jobsDone .load (std::memory_order_acquire );
300
+ auto jobsSubmitted = _jobsSubmitted .load (std::memory_order_relaxed );
280
301
281
302
if (jobsSubmitted <= jobsDone) {
282
303
break ;
@@ -344,7 +365,7 @@ void SupervisedScheduler::runWorker() {
344
365
break ;
345
366
}
346
367
347
-
F438
_jobsDequeued++ ;
368
+ _jobsDequeued. fetch_add ( 1 , std::memory_order_relaxed) ;
348
369
349
370
state->_lastJobStarted = clock::now ();
350
371
state->_working = true ;
@@ -372,8 +393,8 @@ void SupervisedScheduler::runSupervisor() {
372
393
373
394
while (!_stopping) {
374
395
uint64_t jobsDone = _jobsDone.load (std::memory_order_acquire);
375
- uint64_t jobsSubmitted = _jobsSubmitted.load (std::memory_order_acquire );
376
- uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_acquire );
396
+ uint64_t jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed );
397
+ uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_relaxed );
377
398
378
399
if (jobsDone == lastJobsDone && (jobsDequeued < jobsSubmitted)) {
379
400
jobsStallingTick++;
@@ -485,8 +506,9 @@ bool SupervisedScheduler::canPullFromQueue(uint64_t queueIndex) const {
485
506
// then a job gets done fast (eg dequeued++, done++)
486
507
// and then we read done.
487
508
uint64_t jobsDone = _jobsDone.load (std::memory_order_acquire);
488
- uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_acquire );
509
+ uint64_t jobsDequeued = _jobsDequeued.load (std::memory_order_relaxed );
489
510
TRI_ASSERT (jobsDequeued >= jobsDone);
511
+
490
512
switch (queueIndex) {
491
513
case 0 :
492
514
// We can always! pull from high priority
@@ -511,7 +533,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
511
533
auto queueIdx = triesCount % 3 ;
512
534
// Order of this if is important! First check if we are allowed to pull,
513
535
// then really pull from queue
514
- if (canPullFromQueue (queueIdx) && _queue [queueIdx].pop (work)) {
536
+ if (canPullFromQueue (queueIdx) && _queues [queueIdx].pop (work)) {
515
537
return std::unique_ptr<WorkItem>(work);
516
538
}
517
539
@@ -537,7 +559,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
537
559
void SupervisedScheduler::startOneThread () {
538
560
// TRI_ASSERT(_numWorkers < _maxNumWorker);
539
561
if (_numWorkers + _abandonedWorkerStates.size () >= _maxNumWorker) {
540
- return ; // do not add more threads, than maximum allows
562
+ return ; // do not add more threads than maximum allows
541
563
}
542
564
543
565
std::unique_lock<std::mutex> guard (_mutexSupervisor);
@@ -622,7 +644,7 @@ Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const {
622
644
uint64_t const numWorkers = _numWorkers.load (std::memory_order_relaxed);
623
645
624
646
// read _jobsDone first, so the differences of the counters cannot get negative
625
- uint64_t const jobsDone = _jobsDone.load (std::memory_order_relaxed );
647
+ uint64_t const jobsDone = _jobsDone.load (std::memory_order_acquire );
626
648
uint64_t const jobsDequeued = _jobsDequeued.load (std::memory_order_relaxed);
627
649
uint64_t const jobsSubmitted = _jobsSubmitted.load (std::memory_order_relaxed);
628
650
0 commit comments