8000 Make scheduler enforce queue limits (#10027) · archerli/arangodb@aa532d6 · GitHub
[go: up one dir, main page]

Skip to content

Commit aa532d6

Browse files
authored
Make scheduler enforce queue limits (arangodb#10027)
1 parent 8a812ec commit aa532d6

File tree

4 files changed

+79
-56
lines changed

4 files changed

+79
-56
lines changed

arangod/GeneralServer/RequestLane.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ enum class RequestLane {
9898
// AGENCY_CALLBACK`
9999
};
100100

101-
enum class RequestPriority { HIGH, MED, LOW };
101+
enum class RequestPriority {
102+
HIGH = 0,
103+
MED = 1,
104+
LOW = 2
105+
};
102106

103107
inline RequestPriority PriorityRequestLane(RequestLane lane) {
104108
switch (lane) {

arangod/Scheduler/SchedulerFeature.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,17 @@ class SchedulerFeature final : public application_features::ApplicationFeature {
4848
private:
4949
uint64_t _nrMinimalThreads = 2;
5050
uint64_t _nrMaximalThreads = 0;
51-
uint64_t _queueSize = 128;
52-
uint64_t _fifo1Size = 1024 * 1024;
51+
uint64_t _queueSize = 4096;
52+
uint64_t _fifo1Size = 4096;
5353
uint64_t _fifo2Size = 4096;
5454

5555
std::unique_ptr<Scheduler> _scheduler;
5656

5757
// -------------------------------------------------------------------------
58-
// UNRELATED SECTION STARS HERE: Singals and other things creeped into Sched
58+
// UNRELATED SECTION STARTS HERE: Signals and other things crept into Sched
5959
// -------------------------------------------------------------------------
6060

6161
public:
62-
/*size_t concurrency() const { return static_cast<size_t>(_nrMaximalThreads); }*/
6362
void buildControlCHandler();
6463
void buildHangupHandler();
6564

arangod/Scheduler/SupervisedScheduler.cpp

Lines changed: 67 additions & 49 deletions
< 10000 tr class="diff-line-row">
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ bool isDirectDeadlockLane(RequestLane lane) {
6868
namespace {
6969
typedef std::chrono::time_point<std::chrono::steady_clock> time_point;
7070

71-
// value initialise these arrays, otherwise mac will crash
71+
// value-initialize these arrays, otherwise mac will crash
7272
thread_local time_point conditionQueueFullSince{};
7373
thread_local uint_fast32_t queueWarningTick{};
7474

@@ -80,7 +80,7 @@ time_point lastQueueFullWarning[3];
8080
int64_t fullQueueEvents[3] = {0, 0, 0};
8181
std::mutex fullQueueWarningMutex[3];
8282

83-
void logQueueWarningEveryNowAndThen(int64_t events) {
83+
void logQueueWarningEveryNowAndThen(int64_t events, uint64_t maxQueueSize) {
8484
auto const now = std::chrono::steady_clock::now();
8585
uint64_t totalEvents;
8686
bool printLog = false;
@@ -99,13 +99,13 @@ void logQueueWarningEveryNowAndThen(int64_t events) {
9999

100100
if (printLog) {
101101
LOG_TOPIC("dead2", WARN, Logger::THREADS)
102-
<< "Scheduler queue"
102+
<< "Scheduler queue with max capacity " << maxQueueSize
103103
<< " is filled more than 50% in last " << sinceLast.count()
104-
<< "s. (happened " << totalEvents << " times since last message)";
104< 8000 span class="diff-text-marker">+
<< "s (happened " << totalEvents << " times since last message)";
105105
}
106106
}
107107

108-
void logQueueFullEveryNowAndThen(int64_t fifo) {
108+
void logQueueFullEveryNowAndThen(int64_t fifo, uint64_t maxQueueSize) {
109109
auto const& now = std::chrono::steady_clock::now();
110110
uint64_t events;
111111
bool printLog = false;
@@ -122,7 +122,8 @@ void logQueueFullEveryNowAndThen(int64_t fifo) {
122122

123123
if (printLog) {
124124
LOG_TOPIC("dead1", WARN, Logger::THREADS)
125-
<< "Scheduler queue " << fifo << " is full. (happened " << events
125+
<< "Scheduler queue " << fifo << " with max capacity " << maxQueueSize
126+
<< " is full (happened " << events
126127
<< " times since last message)";
127128
}
128129
}
@@ -153,7 +154,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread {
153154
explicit SupervisedSchedulerWorkerThread(SupervisedScheduler& scheduler)
154155
: Thread("SchedWorker"), SupervisedSchedulerThread(scheduler) {}
155156
~SupervisedSchedulerWorkerThread() { shutdown(); }
156-
void run() override { _scheduler.runWorker(); };
157+
void run() override { _scheduler.runWorker(); }
157158
};
158159

159160
} // namespace arangodb
@@ -172,28 +173,35 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
172173
_definitiveWakeupTime_ns(100000),
173174
_maxNumWorker(maxThreads),
174175
_numIdleWorker(minThreads),
175-
_maxFifoSize(maxQueueSize) {
176-
_queue[0].reserve(maxQueueSize);
177-
_queue[1].reserve(fifo1Size);
178-
_queue[2].reserve(fifo2Size);
176+
_maxFifoSize(maxQueueSize),
177+
_fifo1Size(fifo1Size),
178+
_fifo2Size(fifo2Size) {
179+
_queues[0].reserve(maxQueueSize);
180+
_queues[1].reserve(fifo1Size);
181+
_queues[2].reserve(fifo2Size);
179182
}
180183

181184
SupervisedScheduler::~SupervisedScheduler() {}
182185

183186
bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
184187
bool allowDirectHandling) {
185-
if (!isDirectDeadlockLane(lane) && allowDirectHandling &&
186-
!ServerState::instance()->isClusterRole() && (_jobsSubmitted - _jobsDone) < 2) {
187-
_jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
188-
_jobsDequeued.fetch_add(1, std::memory_order_relaxed);
189-
_jobsDirectExec.fetch_add(1, std::memory_order_release);
190-
try {
191-
handler();
192-
_jobsDone.fetch_add(1, std::memory_order_release);
193-
return true;
194-
} catch (...) {
195-
_jobsDone.fetch_add(1, std::memory_order_release);
196-
throw;
188+
if (!isDirectDeadlockLane(lane) &&
189+
allowDirectHandling &&
190+
!ServerState::instance()->isClusterRole()) {
191+
uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire);
192+
uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
193+
if (jobsSubmitted - jobsDone < 2) {
194+
_jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
195+
_jobsDequeued.fetch_add(1, std::memory_order_relaxed);
196+
_jobsDirectExec.fetch_add(1, std::memory_order_relaxed);
197+
try {
198+
handler();
199+
_jobsDone.fetch_add(1, std::memory_order_release);
200+
return true;
201+
} catch (...) {
202+
_jobsDone.fetch_add(1, std::memory_order_release);
203+
throw;
204+
}
197205
}
198206
}
199207

@@ -204,8 +212,15 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
204212

205213
auto work = std::make_unique<WorkItem>(std::move(handler));
206214

207-
if (!_queue[queueNo].push(work.get())) {
208-
logQueueFullEveryNowAndThen(queueNo);
215+
if (!_queues[queueNo].bounded_push(work.get())) {
216+
uint64_t maxSize = _maxFifoSize;
217+
if (queueNo == 1) {
218+
maxSize = _fifo1Size;
219+
} else if (queueNo == 2) {
220+
maxSize = _fifo2Size;
221+
}
222+
LOG_TOPIC("98d94", DEBUG, Logger::THREADS) << "unable to push job to scheduler queue: queue is full";
223+
logQueueFullEveryNowAndThen(queueNo, maxSize);
209224
return false;
210225
}
211226
// queue now has ownership for the WorkItem
@@ -214,27 +229,32 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
214229
static thread_local uint64_t lastSubmitTime_ns;
215230

216231
// use memory order release to make sure, pushed item is visible
217-
uint64_t jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_release);
218-
uint64_t approxQueueLength = jobsSubmitted - _jobsDone;
232+
uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire);
233+
uint64_t const jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
234+
uint64_t const approxQueueLength = jobsSubmitted - jobsDone;
235+
236+
// to make sure the queue length hasn't underflowed
237+
TRI_ASSERT(jobsDone <= jobsSubmitted);
238+
219239
uint64_t now_ns = getTickCount_ns();
220240
uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns;
221241
lastSubmitTime_ns = now_ns;
222242

223243
if (approxQueueLength > _maxFifoSize / 2) {
224-
if ((queueWarningTick++ & 0xFF) == 0) {
244+
if ((::queueWarningTick++ & 0xFF) == 0) {
225245
auto const& now = std::chrono::steady_clock::now();
226-
if (conditionQueueFullSince == time_point{}) {
227-
logQueueWarningEveryNowAndThen(queueWarningTick);
228-
conditionQueueFullSince = now;
229-
} else if (now - conditionQueueFullSince > std::chrono::seconds(5)) {
230-
logQueueWarningEveryNowAndThen(queueWarningTick);
231-
queueWarningTick = 0;
232-
conditionQueueFullSince = now;
246+
if (::conditionQueueFullSince == time_point{}) {
247+
logQueueWarningEveryNowAndThen(::queueWarningTick, _maxFifoSize);
248+
::conditionQueueFullSince = now;
249+
} else if (now - ::conditionQueueFullSince > std::chrono::seconds(5)) {
250+
logQueueWarningEveryNowAndThen(::queueWarningTick, _maxFifoSize);
251+
::queueWarningTick = 0;
252+
::conditionQueueFullSince = now;
233253
}
234254
}
235255
} else {
236-
queueWarningTick = 0;
237-
conditionQueueFullSince = time_point{};
256+
::queueWarningTick = 0;
257+
::conditionQueueFullSince = time_point{};
238258
}
239259

240260
bool doNotify = false;
@@ -263,9 +283,6 @@ bool SupervisedScheduler::start() {
263283
}
264284

265285
void SupervisedScheduler::shutdown() {
266-
// THIS IS WHAT WE SHOULD AIM FOR, BUT NOBODY CARES
267-
// TRI_ASSERT(_jobsSubmitted <= _jobsDone);
268-
269286
{
270287
std::unique_lock<std::mutex> guard(_mutex);
271288
_stopping = true;
@@ -275,8 +292,8 @@ void SupervisedScheduler::shutdown() {
275292
Scheduler::shutdown();
276293

277294
while (true) {
278-
auto jobsSubmitted = _jobsSubmitted.load();
279-
auto jobsDone = _jobsDone.load();
295+
auto jobsDone = _jobsDone.load(std::memory_order_acquire);
296+
auto jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
280297

281298
if (jobsSubmitted <= jobsDone) {
282299
break;
@@ -344,7 +361,7 @@ void SupervisedScheduler::runWorker() {
344361
break;
345362
}
346363

347-
_jobsDequeued++;
364+
_jobsDequeued.fetch_add(1, std::memory_order_relaxed);
348365

349366
state->_lastJobStarted = clock::now();
350367
state->_working = true;
@@ -372,8 +389,8 @@ void SupervisedScheduler::runSupervisor() {
372389

373390
while (!_stopping) {
374391
uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire);
375-
uint64_t jobsSubmitted = _jobsSubmitted.load(std::memory_order_acquire);
376-
uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_acquire);
392+
uint64_t jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
393+
uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
377394

378395
if (jobsDone == lastJobsDone && (jobsDequeued < jobsSubmitted)) {
379396
jobsStallingTick++;
@@ -485,8 +502,9 @@ bool SupervisedScheduler::canPullFromQueue(uint64_t queueIndex) const {
485502
// then a job gets done fast (eg dequeued++, done++)
486503
// and then we read done.
487504
uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire);
488-
uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_acquire);
505+
uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
489506
TRI_ASSERT(jobsDequeued >= jobsDone);
507+
490508
switch (queueIndex) {
491509
case 0:
492510
// We can always! pull from high priority
@@ -511,7 +529,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
511529
auto queueIdx = triesCount % 3;
512530
// Order of this if is important! First check if we are allowed to pull,
513531
// then really pull from queue
514-
if (canPullFromQueue(queueIdx) && _queue[queueIdx].pop(work)) {
532+
if (canPullFromQueue(queueIdx) && _queues[queueIdx].pop(work)) {
515533
return std::unique_ptr<WorkItem>(work);
516534
}
517535

@@ -537,7 +555,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
537555
void SupervisedScheduler::startOneThread() {
538556
// TRI_ASSERT(_numWorkers < _maxNumWorker);
539557
if (_numWorkers + _abandonedWorkerStates.size() >= _maxNumWorker) {
540-
return; // do not add more threads, than maximum allows
558+
return; // do not add more threads than maximum allows
541559
}
542560

543561
std::unique_lock<std::mutex> guard(_mutexSupervisor);
@@ -622,7 +640,7 @@ Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const {
622640
uint64_t const numWorkers = _numWorkers.load(std::memory_order_relaxed);
623641

624642
// read _jobsDone first, so the differences of the counters cannot get negative
625-
uint64_t const jobsDone = _jobsDone.load(std::memory_order_relaxed);
643+
uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire);
626644
uint64_t const jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
627645
uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
628646

arangod/Scheduler/SupervisedScheduler.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class SupervisedScheduler final : public Scheduler {
7777

7878
// Since the lockfree queue can only handle PODs, one has to wrap lambdas
7979
// in a container class and store pointers. -- Maybe there is a better way?
80-
boost::lockfree::queue<WorkItem*> _queue[3];
80+
boost::lockfree::queue<WorkItem*> _queues[3];
8181

8282
// aligning required to prevent false sharing - assumes cache line size is 64
8383
alignas(64) std::atomic<uint64_t> _jobsSubmitted;
@@ -144,7 +144,9 @@ class SupervisedScheduler final : public Scheduler {
144144
std::condition_variable _conditionSupervisor;
145145
std::unique_ptr<SupervisedSchedulerManagerThread> _manager;
146146

147-
size_t _maxFifoSize;
147+
uint64_t const _maxFifoSize;
148+
uint64_t const _fifo1Size;
149+
uint64_t const _fifo2Size;
148150

149151
std::unique_ptr<WorkItem> getWork(std::shared_ptr<WorkerState>& state);
150152

0 commit comments

Comments
 (0)
0