8000 Feature/new server infra by maierlars · Pull Request #7733 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Feature/new server infra #7733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6f3593b
Decoupled IO handling from Scheduler.
Jul 23, 2018
1bd0db0
Fixed SSL start up bug.
Jul 24, 2018
0a3fe50
Replaced Scheduler with new worker farm implementation.
Jul 25, 2018
0400054
Added minimal statistics and info string for Scheduler.
Jul 25, 2018
bc0f94c
Added support for timed submissions.
Jul 27, 2018
fdd2033
Updated delayed submission api. Updated code that used timers.
Jul 31, 2018
3cb9b62
Extracted new Scheduler into a virtual parent class. The implementati…
Aug 1, 2018
bdc8d9b
Signal handler now working.
Aug 1, 2018
3796c3b
Changed threads names, `_stop` is atomic, check for failure during th…
Aug 2, 2018
5e6a3f2
Commented on source code and added TODOs.
Aug 3, 2018
e9ccd72
Merge branch 'devel' into feature/new-server-infra
Aug 3, 2018
0e99af6
Played around with start-stop-conditions
Aug 8, 2018
6fd7f26
Play around with start stop condition.
Aug 8, 2018
e65585c
start stop cond
Aug 8, 2018
c6ac00f
Sart Stop Conditions
Aug 8, 2018
a5516b2
Removed bad cv_status check.
Aug 9, 2018
3e12cb0
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Aug 10, 2018
aa6672c
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Aug 10, 2018
43fb2d9
Bug fix: now compare the actual objects instead of pointer values. Se…
Aug 10, 2018
c6a54c1
Moved most of the stuff now unrelated to the Scheduler to GeneralServ…
Aug 13, 2018
556f3e9
Instead of waiting for a thread to terminate, put it on a clean up li…
Aug 15, 2018
0ed5f82
Allow detaching long running threads.
Aug 15, 2018
20132a0
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Dec 11, 2018
e2fc7e1
Fixed test mock.
Dec 12, 2018
ebcd63a
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Dec 17, 2018
d186d14
Updated the WorkHandle logic. Removed post functions.
Dec 18, 2018
80c9f6a
Fixed crash when obtaining shared_ptr from this in destructor.
Dec 18, 2018
fdff089
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Dec 18, 2018
a61a809
Added lost mutex.
Dec 18, 2018
8d4c003
Fixed memory leak.
Dec 18, 2018
3ff4806
Fixed merge bug.
Dec 18, 2018
e2fa338
Changed a lot of code to optimize the scheduler.
Dec 19, 2018
090e2cd
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Dec 19, 2018
b37d5fd
Fixed bug of invalidated iterator. Dont remove task on shutdown at di…
Dec 19, 2018
2ccf743
Only by value calls to queue.
Dec 20, 2018
947fe2f
Added options again.
Dec 20, 2018
992e44c
Clean up of code.
Dec 20, 2018
b6bbc8c
UI Request Lane added.
Dec 20, 2018
1e8d3bd
Bug fixes in Scheduler.
Dec 21, 2018
4d6b0a0
Merge remote-tracking branch 'origin/devel' into feature/new-server-i…
Jan 7, 2019
83a775e
Applied reformat.
Jan 7, 2019
55e70a3
Use sigaction.
Jan 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Commented on source code and added TODOs.
  • Loading branch information
Lars Maier committed Aug 3, 2018
commit 5e6a3f26513dcde1a6b5a572c01ee46b38d7adf6
2 changes: 1 addition & 1 deletion arangod/Pregel/Conductor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ void Conductor::startRecovery() {
cancelNoLock();
LOG_TOPIC(ERR, Logger::PREGEL) << "Compensation failed";
}
});
}).detach();
}

// resolves into an ordered list of shards for each collection on each server
Expand Down
2 changes: 1 addition & 1 deletion arangod/Pregel/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ void Worker<V, E, M>::_continueAsync() {
_workerAggregators->resetValues();
_startProcessing();
}
});
}).detach();
}

template <typename V, typename E, typename M>
Expand Down
21 changes: 20 additions & 1 deletion arangod/Scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,17 @@ class Scheduler {
uint64_t _queued;
};

// Enqueues a task at highest priority
virtual void post(std::function<void()> const& callback) = 0;
// Enqueues a task at given priority
virtual bool queue(RequestPriority prio, std::function<void()> const&) = 0;

// A spinning threads looks at the queues in a cyclic way, i.e.
// 0 1 2 0 1 2 0 1 2 0 1 2 ..

typedef std::chrono::steady_clock clock;

private:
struct DelayedWorkItem {
std::function<void(bool cancelled)> _handler;
const clock::time_point _due;
Expand Down Expand Up @@ -103,6 +108,7 @@ class Scheduler {
void detach() { _item.reset(); }
};

public:
class WorkHandle {
std::shared_ptr<WorkGuard> _guard;
public:
Expand All @@ -113,8 +119,14 @@ class Scheduler {
void detach() { _guard->detach(); }
};

// postDelay returns a WorkHandler. You can cancel the job by calling cancel. The job is also
// cancelled if all WorkHandles are destructed. To disable this behavior call detach.
WorkHandle postDelay(clock::duration delay, std::function<void(bool cancelled)> const& callback);

// TODO: This method is unintuitive to use. If you call it and you are not interested in canceling
// you have to call detach(). Otherwise the WorkHandle goes out of scope and the task is canceled
// immediately.

virtual void addQueueStatistics(velocypack::Builder&) const = 0;
virtual QueueStatistics queueStatistics() const = 0;
virtual std::string infoStatus() const = 0;
Expand All @@ -134,6 +146,14 @@ class Sche 10000 duler {
private:
friend class SchedulerCronThread;

// The priority queue is managed by a Cron Thread. It wakes up on a regular basis (10ms currently)
// and looks at queue.top(). It the _due time is smaller than now() and the task is not canceled
// it is posted on the scheduler. The next sleep time is computed depending on queue top.
//
// Note that tasks that have a delay of less than 1ms are posted directly.
// For tasks above 50ms the Cron Thread is woken up to potentially update its sleep time, which
// could now be shorter than before.

std::priority_queue<std::shared_ptr<DelayedWorkItem>,
std::vector<std::shared_ptr<DelayedWorkItem>>,
std::greater<std::shared_ptr<DelayedWorkItem>>> _priorityQueue;
Expand All @@ -142,7 +162,6 @@ class Scheduler {

void runCron();


std::unique_ptr<SchedulerCronThread> _cronThread;
};
}
Expand Down
33 changes: 13 additions & 20 deletions arangod/Scheduler/SupervisedScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,6 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads,
_queue[1].reserve(fifo1Size);
_queue[2].reserve(fifo2Size);
_workerStates.reserve(maxThreads);

#ifdef _WIN32
// Windows does not support POSIX signal handling
#else
struct sigaction action;
memset(&action, 0, sizeof(action));
sigfillset(&action.sa_mask);

// ignore broken pipes
action.sa_handler = SIG_IGN;

int res = sigaction(SIGPIPE, &action, nullptr);

if (res < 0) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "cannot initialize signal handlers for pipe";
}
#endif
}

SupervisedScheduler::~SupervisedScheduler() {}
Expand Down Expand Up @@ -170,6 +152,8 @@ bool SupervisedScheduler::queue(RequestPriority prio, std::function<void()> cons


std::string SupervisedScheduler::infoStatus() const {
// TODO: compare with old output format
// Does some code rely on that string or is it for humans?
uint64_t numWorker = _numWorker.load(std::memory_order_relaxed);
uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed)
- _jobsDone.load(std::memory_order_relaxed);
Expand All @@ -193,6 +177,7 @@ void SupervisedScheduler::addQueueStatistics(velocypack::Builder& b) const {
uint64_t queueLength = _jobsSubmitted.load(std::memory_order_relaxed)
- _jobsDone.load(std::memory_order_relaxed);

// TODO: previous scheduler filled out a lot more fields, relevant?
b.add("scheduler-threads",
VPackValue(static_cast<int32_t>(numWorker)));
b.add("queued", VPackValue(static_cast<int32_t>(queueLength)));
Expand Down 10000 Expand Up @@ -280,12 +265,20 @@ void SupervisedScheduler::runSupervisor()

if (jobsDone == lastJobsDone && (jobsDone < jobsSubmitted)) {
jobsStallingTick++;
} else {
jobsStallingTick = jobsStallingTick == 0 ? 0 : jobsStallingTick - 1;
} else if(jobsStallingTick != 0) {
jobsStallingTick--;
}

queueLength = jobsSubmitted - jobsDone;

// TODO: Currently the conditions to spawn a new thread are not very sophisticated.
// A thread is stoped when the queue is empty. In real life this should rarely happen
// thus a thread that was started once, never terminates.
// A new thread is started when:
// 1. jobsStallingTick > 5, which means that 5 times in a row, the supervisor observed that no
// job finished.
// 2. The queue length is bigger than 50 and 1.5 times of what it was last time.
// (this is a spike detector)
bool doStartOneThread =
(jobsStallingTick > 5) || ((queueLength >= 50) && (1.5 * lastQueueLength < queueLength));

Expand Down
18 changes: 17 additions & 1 deletion arangod/Scheduler/SupervisedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,34 @@ class SupervisedScheduler : public Scheduler {

std::atomic<uint64_t> _jobsSubmitted, _jobsDone;

// During a queue operation there a two reasons to manually wake up a worker
// 1. the queue length is bigger than _wakeupQueueLength and the last submit time
// is bigger than _wakeupTime_ns.
// 2. the last submit time is bigger than _definitiveWakeupTime_ns.
//
// The last submit time is a thread local variable that stores the time of the last
// queue operation.
std::atomic<uint64_t> _wakeupQueueLength; // q1
std::atomic<uint64_t> _wakeupTime_ns, _definitiveWakeupTime_ns; // t3, t4


// each worker thread has a state block which contains configuration values.
// _queueRetryCount is the number of spins this particular thread should perform
// before going to sleep.
// _sleepTimeout_ms is the amount of ms the thread should sleep before waking
// up again. Note that each worker wakes up constantly, even if there is no work.
//
// All those values are maintained by the supervisor thread.
// Currently they are set once and for all the same, however a future
// implementation my alter those values for each thread individually.
struct WorkerState {
uint64_t _queueRetryCount; // t1
uint64_t _sleepTimeout_ms; // t2
std::atomic<bool> _stop;
std::unique_ptr<SupervisedSchedulerWorkerThread> _thread;
char padding[40];

// initialise with harmless defaults: spin once, sleep forever
// initialize with harmless defaults: spin once, sleep forever
WorkerState(SupervisedScheduler &scheduler);
WorkerState(WorkerState &&that);

Expand Down
0