@@ -335,19 +335,34 @@ std::shared_ptr<Conductor> PregelFeature::conductor(uint64_t executionNumber) {
335
335
336
336
void PregelFeature::garbageCollectConductors () try {
337
337
// iterate over all conductors and remove the ones which can be garbage-collected
338
- MUTEX_LOCKER (guard, _mutex);
339
- for (auto it = _conductors.begin (); it != _conductors.end (); /* no hoisting*/ ) {
340
- if (it->second .conductor ->canBeGarbageCollected ()) {
341
- uint64_t executionNumber = it->first ;
342
-
343
- it->second .conductor ->cancel ();
344
- it = _conductors.erase (it);
345
-
346
- _workers.erase (executionNumber);
347
- } else {
348
- ++it;
338
+ std::vector<std::shared_ptr<Conductor>> conductors;
339
+
340
+ // copy out shared-ptrs of Conductors under the mutex
341
+ {
342
+ MUTEX_LOCKER (guard, _mutex);
343
+ for (auto const & it : _conductors) {
344
+ if (it.second .conductor ->canBeGarbageCollected ()) {
345
+ if (conductors.empty ()) {
346
+ conductors.reserve (8 );
347
+ }
348
+ conductors.emplace_back (it.second .conductor );
349
+ }
349
350
}
350
351
}
352
+
353
+ // cancel and kill conductors without holding the mutex
354
+ // permanently
355
+ for (auto & c : conductors) {
356
+ c->cancel ();
357
+ }
358
+
359
+ MUTEX_LOCKER (guard, _mutex);
360
+ for (auto & c : conductors) {
361
+ uint64_t executionNumber = c->executionNumber ();
362
+
363
+ _conductors.erase (executionNumber);
364
+ _workers.erase (executionNumber);
365
+ }
351
366
} catch (...) {}
352
367
353
368
void PregelFeature::addWorker (std::shared_ptr<IWorker>&& w, uint64_t executionNumber) {
@@ -521,21 +536,30 @@ uint64_t PregelFeature::numberOfActiveConductors() const {
521
536
Result PregelFeature::toVelocyPack (TRI_vocbase_t& vocbase,
522
537
arangodb::velocypack::Builder& result,
523
538
bool allDatabases, bool fanout) const {
524
- Result res ;
539
+ std::vector<std::shared_ptr<Conductor>> conductors ;
525
540
526
- result. openArray ();
541
+ // make a copy of all conductor shared-ptrs under the mutex
527
542
{
528
543
MUTEX_LOCKER (guard, _mutex);
544
+ conductors.reserve (_conductors.size ());
529
545
530
546
for (auto const & p : _conductors) {
531
547
auto const & ce = p.second ;
532
548
if (!::authorized (ce.user )) {
533
549
continue ;
534
550
}
535
551
536
- ce.conductor -> toVelocyPack (result );
552
+ conductors. emplace_back ( ce.conductor );
537
553
}
538
554
}
555
+
556
+ // release lock, and now velocypackify all conductors
557
+ result.openArray ();
558
+ for (auto const & c : conductors) {
559
+ c->toVelocyPack (result);
560
+ }
561
+
562
+ Result res;
539
563
540
564
if (ServerState::instance ()->isCoordinator () && fanout) {
541
565
// coordinator case, fan out to other coordinators!
0 commit comments