8000 fix JSON statistics (#9385) · Mars2018/arangodb@727bb39 · GitHub
[go: up one dir, main page]

Skip to content

Commit 727bb39

Browse files
authored
fix JSON statistics (arangodb#9385)
1 parent 1f0d24f commit 727bb39

File tree

5 files changed

+48
-55
lines changed

5 files changed

+48
-55
lines changed

arangod/Scheduler/Scheduler.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,15 @@ class Scheduler {
152152
// ---------------------------------------------------------------------------
153153
public:
154154
struct QueueStatistics {
155-
uint64_t _running;
156-
uint64_t _working;
155+
uint64_t _running; // numWorkers
156+
uint64_t _blocked; // obsolete, always 0 now
157157
uint64_t _queued;
158-
uint64_t _fifo1;
159-
uint64_t _fifo2;
160-
uint64_t _fifo3;
158+
uint64_t _working;
159+
uint64_t _directExec;
161160
};
162161

163-
virtual void addQueueStatistics(velocypack::Builder&) const = 0;
162+
virtual void toVelocyPack(velocypack::Builder&) const = 0;
164163
virtual QueueStatistics queueStatistics() const = 0;
165-
virtual std::string infoStatus() const = 0;
166164

167165
// ---------------------------------------------------------------------------
168166
// Start/Stop/IsRunning stuff

arangod/Scheduler/SupervisedScheduler.cpp

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread {
9797
SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThreads,
9898
uint64_t maxQueueSize,
9999
uint64_t fifo1Size, uint64_t fifo2Size)
100-
: _numWorker(0),
100+
: _numWorkers(0),
101101
_stopping(false),
102102
_jobsSubmitted(0),
103103
_jobsDequeued(0),
@@ -138,12 +138,13 @@ bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
138138
TRI_ASSERT(queueNo <= 2);
139139
TRI_ASSERT(isStopping() == false);
140140

141-
WorkItem* work = new WorkItem(std::move(handler));
141+
auto work = std::make_unique<WorkItem>(std::move(handler));
142142

143-
if (!_queue[queueNo].push(work)) {
144-
delete work;
143+
if (!_queue[queueNo].push(work.get())) {
145144
return false;
146145
}
146+
// queue now has ownership for the WorkItem
147+
work.release();
147148

148149
static thread_local uint64_t lastSubmitTime_ns;
149150

@@ -208,7 +209,7 @@ void SupervisedScheduler::shutdown() {
208209
// call the destructor of all threads
209210
_manager.reset();
210211

211-
while (_numWorker > 0) {
212+
while (_numWorkers > 0) {
212213
stopOneThread();
213214
}
214215

@@ -230,7 +231,7 @@ void SupervisedScheduler::runWorker() {
230231

231232
{
232233
std::lock_guard<std::mutex> guard(_mutexSupervisor);
233-
id = _numWorker++; // increase the number of workers here, to obtains the id
234+
id = _numWorkers++; // increase the number of workers here, to obtains the id
234235
// copy shared_ptr with worker state
235236
state = _workerStates.back();
236237
// inform the supervisor that this thread is alive
@@ -266,7 +267,7 @@ void SupervisedScheduler::runWorker() {
266267
}
267268

268269
void SupervisedScheduler::runSupervisor() {
269-
while (_numWorker < _numIdleWorker) {
270+
while (_numWorkers < _numIdleWorker) {
270271
startOneThread();
271272
}
272273

@@ -286,8 +287,8 @@ void SupervisedScheduler::runSupervisor() {
286287

287288
uint64_t queueLength = jobsSubmitted - jobsDequeued;
288289

289-
bool doStartOneThread = (((queueLength >= 3 * _numWorker) &&
290-
((lastQueueLength + _numWorker) < queueLength)) ||
290+
bool doStartOneThread = (((queueLength >= 3 * _numWorkers) &&
291+
((lastQueueLength + _numWorkers) < queueLength)) ||
291292
(lastJobsSubmitted > jobsDone)) &&
292293
(queueLength != 0);
293294

@@ -300,10 +301,10 @@ void SupervisedScheduler::runSupervisor() {
300301
lastQueueLength = queueLength;
301302
lastJobsSubmitted = jobsSubmitted;
302303

303-
if (doStartOneThread && _numWorker < _maxNumWorker) {
304+
if (doStartOneThread && _numWorkers < _maxNumWorker) {
304305
jobsStallingTick = 0;
305306
startOneThread();
306-
} else if (doStopOneThread && _numWorker > _numIdleWorker) {
307+
} else if (doStopOneThread && _numWorkers > _numIdleWorker) {
307308
stopOneThread();
308309
}
309310

@@ -361,7 +362,7 @@ void SupervisedScheduler::sortoutLongRunningThreads() {
361362
// Move that thread to the abandoned thread
362363
_abandonedWorkerStates.push_back(std::move(state));
363364
i = _workerStates.erase(i);
364-
_numWorker--;
365+
_numWorkers--;
365366

366367
// and now start another thread!
367368
startOneThread();
@@ -407,8 +408,8 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
407408
}
408409

409410
void SupervisedScheduler::startOneThread() {
410-
// TRI_ASSERT(_numWorker < _maxNumWorker);
411-
if (_numWorker + _abandonedWorkerStates.size() >= _maxNumWorker) {
411+
// TRI_ASSERT(_numWorkers < _maxNumWorker);
412+
if (_numWorkers + _abandonedWorkerStates.size() >= _maxNumWorker) {
412413
return; // do not add more threads, than maximum allows
413414
}
414415

@@ -439,7 +440,7 @@ void SupervisedScheduler::startOneThread() {
439440
}
440441

441442
void SupervisedScheduler::stopOneThread() {
442-
TRI_ASSERT(_numWorker > 0);
443+
TRI_ASSERT(_numWorkers > 0);
443444

444445
// copy shared_ptr
445446
auto state = _workerStates.back();
@@ -457,7 +458,7 @@ void SupervisedScheduler::stopOneThread() {
457458
// the cleanup list and wait for its termination.
458459
//
459460
// Since the thread is effectively taken out of the pool, decrease the number of worker.
460-
_numWorker--;
461+
_numWorkers--;
461462

462463
if (state->_thread->isRunning()) {
463464
LOG_TOPIC("73365", TRACE, Logger::THREADS) << "Abandon one thread.";
@@ -487,35 +488,32 @@ bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }
487488
// ---------------------------------------------------------------------------
488489
// Statistics Stuff
489490
// ---------------------------------------------------------------------------
490-
std::string SupervisedScheduler::infoStatus() const {
491-
// TODO: compare with old output format
492-
// Does some code rely on that string or is it for humans?
493-
uint64_t numWorker = _numWorker.load(std::memory_order_relaxed);
494-
uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed) -
495-
_jobsDone.load(std::memory_order_relaxed);
496-
497-
return "scheduler threads " + std::to_string(numWorker) + " (" +
498-
std::to_string(_numIdleWorker) + "<" + std::to_string(_maxNumWorker) +
499-
") queued " + std::to_string(queueLength) +
500-
" directly exec " + std::to_string(_jobsDirectExec.load(std::memory_order_relaxed));
501-
}
502491

503492
Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const {
504-
uint64_t numWorker = _numWorker.load(std::memory_order_relaxed);
505-
uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed) -
506-
_jobsDone.load(std::memory_order_relaxed);
493+
// we need to read multiple atomics here. as all atomic reads happen independently
494+
// without a mutex outside, the overall picture may be inconsistent
495+
496+
uint64_t const numWorkers = _numWorkers.load(std::memory_order_relaxed);
497+
498+
// read _jobsDone first, so the differences of the counters cannot get negative
499+
uint64_t const jobsDone = _jobsDone.load(std::memory_order_relaxed);
500+
uint64_t const jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
501+
uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
502+
503+
uint64_t const queued = jobsSubmitted - jobsDone;
504+
uint64_t const working = jobsDequeued - jobsDone;
505+
506+
uint64_t const directExec = _jobsDirectExec.load(std::memory_order_relaxed);
507507

508-
return QueueStatistics{numWorker, numWorker, queueLength, 0, 0, 0};
508+
return QueueStatistics{numWorkers, 0, queued, working, directExec};
509509
}
510510

511-
void SupervisedScheduler::addQueueStatistics(velocypack::Builder& b) const {
512-
uint64_t numWorker = _numWorker.load(std::memory_order_relaxed);
513-
uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed) -
514-
_jobsDone.load(std::memory_order_relaxed);
515-
uint64_t directExec = _jobsDirectExec.load(std::memory_order_relaxed);
511+
void SupervisedScheduler::toVelocyPack(velocypack::Builder& b) const {
512+
QueueStatistics qs = queueStatistics();
516513

517-
// TODO: previous scheduler filled out a lot more fields, relevant?
518-
b.add("scheduler-threads", VPackValue(numWorker));
519-
b.add("queued", VPackValue(queueLength));
520-
b.add("directExec", VPackValue(directExec));
514+
b.add("scheduler-threads", VPackValue(qs._running)); // numWorkers
515+
b.add("blocked", VPackValue(qs._blocked)); // obsolete
516+
b.add("queued", VPackValue(qs._queued));
517+
b.add("in-progress", VPackValue(qs._working));
518+
b.add("direct-exec", VPackValue(qs._directExec));
521519
}

arangod/Scheduler/SupervisedScheduler.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class SupervisedScheduler final : public Scheduler {
4646
bool queue(RequestLane lane, std::function<void()>, bool allowDirectHandling = false) override;
4747

4848
private:
49-
std::atomic<size_t> _numWorker;
49+
std::atomic<size_t> _numWorkers;
5050
std::atomic<bool> _stopping;
5151

5252
protected:
@@ -56,9 +56,8 @@ class SupervisedScheduler final : public Scheduler {
5656
bool start() override;
5757
void shutdown() override;
5858

59-
void addQueueStatistics(velocypack::Builder&) const override;
59+
void toVelocyPack(velocypack::Builder&) const override;
6060
Scheduler::QueueStatistics queueStatistics() const override;
61-
std::string infoStatus() const override;
6261

6362
private:
6463
friend class SupervisedSchedulerManagerThread;

arangod/Statistics/Descriptions.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,9 +370,7 @@ void stats::Descriptions::serverStatistics(velocypack::Builder& b) const {
370370
}
371371

372372
b.add("threads", VPackValue(VPackValueType::Object, true));
373-
374-
SchedulerFeature::SCHEDULER->addQueueStatistics(b);
375-
373+
SchedulerFeature::SCHEDULER->toVelocyPack(b);
376374
b.close();
377375
}
378376

arangod/Statistics/StatisticsWorker.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -939,7 +939,7 @@ void StatisticsWorker::generateRawStatistics(VPackBuilder& builder, double const
939939

940940
// export threads statistics
941941
builder.add("threads", VPackValue(VPackValueType::Object));
942-
SchedulerFeature::SCHEDULER->addQueueStatistics(builder);
942+
SchedulerFeature::SCHEDULER->toVelocyPack(builder);
943943
builder.close();
944944

945945
// export ttl statistics

0 commit comments

Comments
 (0)
0