@@ -89,12 +89,19 @@ class HeartbeatBackgroundJobThread : public Thread {
89
89
90
90
~HeartbeatBackgroundJobThread () { shutdown (); }
91
91
92
+ // ////////////////////////////////////////////////////////////////////////////
93
+ // / @brief asks the thread to stop, but does not wait.
94
+ // ////////////////////////////////////////////////////////////////////////////
92
95
void stop () {
93
96
std::unique_lock<std::mutex> guard (_mutex);
94
97
_stop = true ;
95
98
_condition.notify_one ();
96
99
}
97
100
101
+ // ////////////////////////////////////////////////////////////////////////////
102
+ // / @brief notifies the background thread: when the thread is sleeping, wakes
103
+ // / it up. Otherwise sets a flag to start another round.
104
+ // ////////////////////////////////////////////////////////////////////////////
98
105
void notify () {
99
106
std::unique_lock<std::mutex> guard (_mutex);
100
107
_anotherRun.store (true , std::memory_order_release);
@@ -146,11 +153,32 @@ class HeartbeatBackgroundJobThread : public Thread {
146
153
HeartbeatThread *_heartbeatThread;
147
154
148
155
std::mutex _mutex;
156
+
157
+ // ////////////////////////////////////////////////////////////////////////////
158
+ // / @brief used to wake up the background thread
159
+ // / guarded via _mutex.
160
+ // ////////////////////////////////////////////////////////////////////////////
149
161
std::condition_variable _condition;
150
162
163
+ // ////////////////////////////////////////////////////////////////////////////
164
+ // / @brief Set by the HeartbeatThread when the BackgroundThread should stop
165
+ // / guarded via _mutex.
166
+ // ////////////////////////////////////////////////////////////////////////////
151
167
std::atomic<bool > _stop;
168
+
169
+ // ////////////////////////////////////////////////////////////////////////////
170
+ // / @brief wether the background thread sleeps or not
171
+ // / guarded via _mutex.
172
+ // ////////////////////////////////////////////////////////////////////////////
152
173
std::atomic<bool > _sleeping;
174
+
175
+ // ////////////////////////////////////////////////////////////////////////////
176
+ // / @brief when awake, the background thread will execute another round of
177
+ // / phase 1 and phase 2, after resetting this to false
178
+ // / guarded via _mutex.
179
+ // ////////////////////////////////////////////////////////////////////////////
153
180
std::atomic<bool > _anotherRun;
181
+
154
182
uint64_t _backgroundJobsLaunched;
155
183
};
156
184
}
@@ -178,9 +206,6 @@ HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry,
178
206
_desiredVersions(std::make_shared<AgencyVersions>(0 , 0 )),
179
207
_wasNotified(false ),
180
208
_backgroundJobsPosted(0 ),
181
- _backgroundJobsLaunched(0 ),
182
- _backgroundJobScheduledOrRunning(false ),
183
- _launchAnotherBackgroundJob(false ),
184
209
_lastSyncTime(0 ),
185
210
_maintenanceThread(nullptr ) {
186
211
}
@@ -196,81 +221,6 @@ HeartbeatThread::~HeartbeatThread() {
196
221
shutdown ();
197
222
}
198
223
199
- // //////////////////////////////////////////////////////////////////////////////
200
- // / @brief running of heartbeat background jobs (in JavaScript), we run
201
- // / these by instantiating an object in class HeartbeatBackgroundJob,
202
- // / which is a std::function<void()> and holds a shared_ptr to the
203
- // / HeartbeatThread singleton itself. This instance is then posted to
204
- // / the io_service for execution in the thread pool. Should the heartbeat
205
- // / thread itself terminate during shutdown, then the HeartbeatThread
206
- // / singleton itself is still kept alive by the shared_ptr in the instance
207
- // / of HeartbeatBackgroundJob. The operator() method simply calls the
208
- // / runBackgroundJob() method of the heartbeat thread. Should this have
209
- // / to schedule another background job, then it can simply create a new
210
- // / HeartbeatBackgroundJob instance, again using shared_from_this() to
211
- // / create a new shared_ptr keeping the HeartbeatThread object alive.
212
- // //////////////////////////////////////////////////////////////////////////////
213
-
214
- /* class HeartbeatBackgroundJob {
215
- std::shared_ptr<HeartbeatThread> _heartbeatThread;
216
- double _startTime;
217
- std::string _schedulerInfo;
218
- public:
219
- explicit HeartbeatBackgroundJob(std::shared_ptr<HeartbeatThread> hbt,
220
- double startTime)
221
- : _heartbeatThread(hbt), _startTime(startTime),_schedulerInfo(SchedulerFeature::SCHEDULER->infoStatus()) {
222
- }
223
-
224
- void operator()() {
225
- // first tell the scheduler that this thread is working:
226
- JobGuard guard(SchedulerFeature::SCHEDULER);
227
- guard.work();
228
-
229
- double now = TRI_microtime();
230
- if (now > _startTime + 5.0) {
231
- LOG_TOPIC(ERR, Logger::HEARTBEAT) << "ALARM: Scheduling background job "
232
- "took " << now - _startTime
233
- << " seconds, scheduler info at schedule time: " << _schedulerInfo
234
- << ", scheduler info now: "
235
- << SchedulerFeature::SCHEDULER->infoStatus();
236
- }
237
- //_heartbeatThread->runBackgroundJob();
238
- }
239
- };*/
240
-
241
-
242
- // //////////////////////////////////////////////////////////////////////////////
243
- // / @brief method runBackgroundJob()
244
- // //////////////////////////////////////////////////////////////////////////////
245
-
246
- /* void HeartbeatThread::runBackgroundJob() {
247
- uint64_t jobNr = ++_backgroundJobsLaunched;
248
- LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
249
- {
250
- DBServerAgencySync job(this);
251
- job.work();
252
- }
253
- LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;
254
-
255
- {
256
- MUTEX_LOCKER(mutexLocker, *_statusLock);
257
- TRI_ASSERT(_backgroundJobScheduledOrRunning);
258
-
259
- if (_launchAnotherBackgroundJob) {
260
- jobNr = ++_backgroundJobsPosted;
261
- LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail " << jobNr;
262
- _launchAnotherBackgroundJob = false;
263
-
264
- // the JobGuard is in the operator() of HeartbeatBackgroundJob
265
- _lastSyncTime = TRI_microtime();
266
- SchedulerFeature::SCHEDULER->post(
267
- HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
268
- } else {
269
- _backgroundJobScheduledOrRunning = false;
270
- _launchAnotherBackgroundJob = false;
271
- }
272
- }
273
- }*/
274
224
275
225
// //////////////////////////////////////////////////////////////////////////////
276
226
// / @brief heartbeat main loop
@@ -1297,20 +1247,11 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
1297
1247
ci->invalidateCurrent ();
1298
1248
}
1299
1249
1300
- /* if (_backgroundJobScheduledOrRunning) {
1301
- _launchAnotherBackgroundJob = true;
1302
- return;
1303
- }*/
1304
-
1305
1250
// schedule a job for the change:
1306
1251
uint64_t jobNr = ++_backgroundJobsPosted;
1307
1252
LOG_TOPIC (DEBUG, Logger::HEARTBEAT) << " dispatching sync " << jobNr;
1308
- _backgroundJobScheduledOrRunning = true ;
1309
1253
1310
- // the JobGuard is in the operator() of HeartbeatBackgroundJob
1311
1254
_lastSyncTime = TRI_microtime ();
1312
- // SchedulerFeature::SCHEDULER->post(
1313
- // HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false);
1314
1255
TRI_ASSERT (_maintenanceThread != nullptr );
1315
1256
_maintenanceThread->notify ();
1316
1257
0 commit comments