8000 Dedicated thread for Phase 1&2 - devel by maierlars · Pull Request #6412 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Dedicated thread for Phase 1&2 - devel #6412

New issue
8000

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 138 additions & 100 deletions arangod/Cluster/HeartbeatThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,115 @@ static std::chrono::system_clock::time_point deadThreadsPosted; // defaults to

static arangodb::Mutex deadThreadsMutex;

namespace arangodb {


class HeartbeatBackgroundJobThread : public Thread {

public:
HeartbeatBackgroundJobThread(HeartbeatThread *heartbeatThread) :
Thread("Maintenance"),
_heartbeatThread(heartbeatThread),
_stop(false),
_sleeping(false),
_backgroundJobsLaunched(0)
{}

~HeartbeatBackgroundJobThread() { shutdown(); }

//////////////////////////////////////////////////////////////////////////////
/// @brief asks the thread to stop, but does not wait.
//////////////////////////////////////////////////////////////////////////////
void stop() {
std::unique_lock<std::mutex> guard(_mutex);
_stop = true;
_condition.notify_one();
}

//////////////////////////////////////////////////////////////////////////////
/// @brief notifies the background thread: when the thread is sleeping, wakes
/// it up. Otherwise sets a flag to start another round.
//////////////////////////////////////////////////////////////////////////////
void notify() {
std::unique_lock<std::mutex> guard(_mutex);
_anotherRun.store(true, std::memory_order_release);
if (_sleeping.load(std::memory_order_acquire)) {
_condition.notify_one();
}
}

protected:
void run() override {

while (!_stop) {

{
std::unique_lock<std::mutex> guard(_mutex);

if (!_anotherRun.load(std::memory_order_acquire)) {
_sleeping.store(true, std::memory_order_release);

while (true) {
_condition.wait(guard);

if (_stop) {
return ;
} else if (_anotherRun) {
break ;
} // otherwise spurious wakeup
}

_sleeping.store(false, std::memory_order_release);
}

_anotherRun.store(false, std::memory_order_release);
}

// execute schmutz here
uint64_t jobNr = ++_backgroundJobsLaunched;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
{
DBServerAgencySync job(_heartbeatThread);
job.work();
}
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;

}
}

private:
HeartbeatThread *_heartbeatThread;

std::mutex _mutex;

//////////////////////////////////////////////////////////////////////////////
/// @brief used to wake up the background thread
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::condition_variable _condition;

//////////////////////////////////////////////////////////////////////////////
/// @brief Set by the HeartbeatThread when the BackgroundThread should stop
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::atomic<bool> _stop;

//////////////////////////////////////////////////////////////////////////////
/// @brief wether the background thread sleeps or not
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::atomic<bool> _sleeping;

//////////////////////////////////////////////////////////////////////////////
/// @brief when awake, the background thread will execute another round of
/// phase 1 and phase 2, after resetting this to false
/// guarded via _mutex.
//////////////////////////////////////////////////////////////////////////////
std::atomic<bool> _anotherRun;

uint64_t _backgroundJobsLaunched;
};
}

////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a heartbeat thread
Expand All @@ -97,93 +206,22 @@ HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
_desiredVersions(std::make_shared<AgencyVersions>(0, 0)),
_wasNotified(false),
_backgroundJobsPosted(0),
_backgroundJobsLaunched(0),
_backgroundJobScheduledOrRunning(false),
_launchAnotherBackgroundJob(false),
_lastSyncTime(0) {
_lastSyncTime(0),
_maintenanceThread(nullptr) {
}

////////////////////////////////////////////////////////////////////////////////
/// @brief destroys a heartbeat thread
////////////////////////////////////////////////////////////////////////////////

HeartbeatThread::~HeartbeatThread() { shutdown(); }

////////////////////////////////////////////////////////////////////////////////
/// @brief running of heartbeat background jobs (in JavaScript), we run
/// these by instantiating an object in class HeartbeatBackgroundJob,
/// which is a std::function<void()> and holds a shared_ptr to the
/// HeartbeatThread singleton itself. This instance is then posted to
/// the io_service for execution in the thread pool. Should the heartbeat
/// thread itself terminate during shutdown, then the HeartbeatThread
/// singleton itself is still kept alive by the shared_ptr in the instance
/// of HeartbeatBackgroundJob. The operator() method simply calls the
/// runBackgroundJob() method of the heartbeat thread. Should this have
/// to schedule another background job, then it can simply create a new
/// HeartbeatBackgroundJob instance, again using shared_from_this() to
/// create a new shared_ptr keeping the HeartbeatThread object alive.
////////////////////////////////////////////////////////////////////////////////

class HeartbeatBackgroundJob {
std::shared_ptr<HeartbeatThread> _heartbeatThread;
double _startTime;
std::string _schedulerInfo;
public:
explicit HeartbeatBackgroundJob(std::shared_ptr<HeartbeatThread> hbt,
double startTime)
: _heartbeatThread(hbt), _startTime(startTime),_schedulerInfo(SchedulerFeature::SCHEDULER->infoStatus()) {
}

void operator()() {
// first tell the scheduler that this thread is working:
JobGuard guard(SchedulerFeature::SCHEDULER);
guard.work();

double now = TRI_microtime();
if (now > _startTime + 5.0) {
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "ALARM: Scheduling background job "
"took " << now - _startTime
<< " seconds, scheduler info at schedule time: " << _schedulerInfo
<< ", scheduler info now: "
<< SchedulerFeature::SCHEDULER->infoStatus();
}
_heartbeatThread->runBackgroundJob();
}
};

////////////////////////////////////////////////////////////////////////////////
/// @brief method runBackgroundJob()
////////////////////////////////////////////////////////////////////////////////

void HeartbeatThread::runBackgroundJob() {
uint64_t jobNr = ++_backgroundJobsLaunched;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
{
DBServerAgencySync job(this);
job.work();
}
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;

{
MUTEX_LOCKER(mutexLocker, *_statusLock);
TRI_ASSERT(_backgroundJobScheduledOrRunning);

if (_launchAnotherBackgroundJob) {
jobNr = ++_backgroundJobsPosted;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail " << jobNr;
_launchAnotherBackgroundJob = false;

// the JobGuard is in the operator() of HeartbeatBackgroundJob
_lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(
HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
} else {
_backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false;
}
HeartbeatThread::~HeartbeatThread() {
if (_maintenanceThread) {
_maintenanceThread->stop();
}
shutdown();
}


////////////////////////////////////////////////////////////////////////////////
/// @brief heartbeat main loop
/// the heartbeat thread constantly reports the current server status to the
Expand All @@ -198,6 +236,7 @@ void HeartbeatThread::runBackgroundJob() {
////////////////////////////////////////////////////////////////////////////////

void HeartbeatThread::run() {

ServerState::RoleEnum role = ServerState::instance()->getRole();

// mop: the heartbeat thread itself is now ready
Expand Down Expand Up @@ -248,6 +287,12 @@ void HeartbeatThread::run() {

void HeartbeatThread::runDBServer() {

_maintenanceThread = std::make_unique<HeartbeatBackgroundJobThread>(this);
if (!_maintenanceThread->start()) {
// WHAT TO DO NOW?
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Failed to start dedicated thread for maintenance";
}

std::function<bool(VPackSlice const& result)> updatePlan =
[=](VPackSlice const& result) {

Expand Down Expand Up @@ -612,7 +657,7 @@ void HeartbeatThread::runSingleServer() {
continue; // try again next time
}
}

TRI_voc_tick_t lastTick = 0; // we always want to set lastTick
auto sendTransient = [&]() {
VPackBuilder builder;
Expand All @@ -631,7 +676,7 @@ void HeartbeatThread::runSingleServer() {
applier->stopAndJoin();
}
lastTick = EngineSelectorFeature::ENGINE->currentTick();

// put the leader in optional read-only mode
auto readOnlySlice = response.get(std::vector<std::string>(
{AgencyCommManager::path(), "Readonly"}));
Expand All @@ -654,7 +699,7 @@ void HeartbeatThread::runSingleServer() {

ServerState::instance()->setFoxxmaster(leaderStr); // leader is foxxmater
ServerState::instance()->setReadOnly(true); // Disable writes with dirty-read header

std::string endpoint = ci->getServerEndpoint(leaderStr);
if (endpoint.empty()) {
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Failed to resolve leader endpoint";
Expand Down Expand Up @@ -748,7 +793,7 @@ void HeartbeatThread::updateServerMode(VPackSlice const& readOnlySlice) {
if (readOnlySlice.isBoolean()) {
readOnly = readOnlySlice.getBool();
}

ServerState::instance()->setReadOnly(readOnly);
}

Expand Down Expand Up @@ -1086,7 +1131,7 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
}
std::string const name = options.value.get("name").copyString();
TRI_ASSERT(!name.empty());

VPackSlice const idSlice = options.value.get("id");
if (!idSlice.isString()) {
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Missing id in agency database plan";
Expand Down Expand Up @@ -1168,7 +1213,7 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {

MUTEX_LOCKER(mutexLocker, *_statusLock);
bool shouldUpdate = false;

if (_desiredVersions->plan > _currentVersions.plan) {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT)
<< "Plan version " << _currentVersions.plan
Expand All @@ -1181,18 +1226,18 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
<< " is lower than desired version " << _desiredVersions->current;
shouldUpdate = true;
}

// 7.4 seconds is just less than half the 15 seconds agency uses to declare dead server,
// perform a safety execution of job in case other plan changes somehow incomplete or undetected
double now = TRI_microtime();
if (now > _lastSyncTime + 7.4 || asyncPush) {
shouldUpdate = true;
}

if (!shouldUpdate) {
return;
}

// First invalidate the caches in ClusterInfo:
auto ci = ClusterInfo::instance();
if (_desiredVersions->plan > ci->getPlanVersion()) {
Expand All @@ -1201,22 +1246,15 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
if (_desiredVersions->current > ci->getCurrentVersion()) {
ci->invalidateCurrent();
}

if (_backgroundJobScheduledOrRunning) {
_launchAnotherBackgroundJob = true;
return;
}


// schedule a job for the change:
uint64_t jobNr = ++_backgroundJobsPosted;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync " << jobNr;
_backgroundJobScheduledOrRunning = true;

// the JobGuard is in the operator() of HeartbeatBackgroundJob

_lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(
HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
TRI_ASSERT(_maintenanceThread != nullptr);
_maintenanceThread->notify();

}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1289,7 +1327,7 @@ void HeartbeatThread::logThreadDeaths(bool force) {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "HeartbeatThread ok.";
std::string buffer;
buffer.reserve(40);

for (auto const& it : deadThreads) {
buffer = date::format("%FT%TZ", date::floor<std::chrono::milliseconds>(it.first));

Expand Down
Loading
0