@@ -402,8 +402,20 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
402
402
ErrorCode cleanupReason = TRI_ERROR_CLUSTER_TIMEOUT;
403
403
404
404
auto cleanupGuard = scopeGuard ([this , &serverToQueryId, &cleanupReason]() {
405
- // Fire and forget
406
- std::ignore = cleanupEngines (cleanupReason, _query.vocbase ().name (), serverToQueryId);
405
+ try {
406
+ transaction::Methods& trx = _query.trxForOptimization ();
407
+ auto requests = cleanupEngines (cleanupReason, _query.vocbase ().name (), serverToQueryId);
408
+ if (!trx.isMainTransaction ()) {
409
+ // for AQL queries in streaming transactions, we will wait for the
410
+ // complete shutdown to have finished before we return to the caller.
411
+ // this is done so that there will be no 2 AQL queries in the same
412
+ // streaming transaction at the same time
413
+ futures::collectAll (requests).wait ();
414
+ }
415
+ } catch (std::exception const & ex) {
416
+ LOG_TOPIC (" 2a9fe" , WARN, Logger::AQL)
417
+ << " unable to clean up query snippets: " << ex.what ();
418
+ }
407
419
});
408
420
409
421
NetworkFeature const & nf = _query.vocbase ().server ().getFeature <NetworkFeature>();
@@ -412,6 +424,9 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
412
424
// nullptr only happens on controlled shutdown
413
425
return {TRI_ERROR_SHUTTING_DOWN};
414
426
}
427
+
428
+ // remember which servers we add during our setup request
429
+ ::arangodb::containers::HashSet<std::string> serversAdded;
415
430
416
431
transaction::Methods& trx = _query.trxForOptimization ();
417
432
std::vector<arangodb::futures::Future<Result>> networkCalls{};
@@ -427,9 +442,15 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
427
442
}
428
443
429
444
TRI_IF_FAILURE (" Query::setupTimeoutFailSequence" ) {
430
- options.timeout = network::Timeout (0.5 );
445
+ double t = 0.5 ;
446
+ TRI_IF_FAILURE (" Query::setupTimeoutFailSequenceRandom" ) {
447
+ if (RandomGenerator::interval (uint32_t (100 )) >= 95 ) {
448
+ t = 3.0 ;
449
+ }
450
+ }
451
+ options.timeout = network::Timeout (t);
431
452
}
432
-
453
+
433
454
// / cluster global query id, under which the query will be registered
434
455
// / on DB servers from 3.8 onwards.
435
456
QueryId clusterQueryId = _query.vocbase ().server ().getFeature <ClusterFeature>().clusterInfo ().uniqid ();
@@ -450,6 +471,13 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
450
471
continue ;
451
472
}
452
473
474
+ if (!trx.state ()->knownServers ().contains (server)) {
475
+ // we are about to add this server to the transaction.
476
+ // remember it, so we can roll the addition back for
477
+ // the second setup request if we need to
478
+ serversAdded.emplace (server);
479
+ }
480
+
453
481
networkCalls.emplace_back (
454
482
buildSetupRequest (trx, server, infoSlice, didCreateEngine, snippetIds,
455
483
serverToQueryId, serverToQueryIdLock, pool, options));
@@ -463,7 +491,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
463
491
// We can directly report a non TRI_ERROR_LOCK_TIMEOUT
464
492
// error as we need to abort after.
465
493
// Otherwise we need to report
466
- Result res{TRI_ERROR_NO_ERROR} ;
494
+ Result res;
467
495
for (auto const & tryRes : responses) {
468
496
auto response = tryRes.get ();
469
497
if (response.fail ()) {
@@ -478,7 +506,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
478
506
}
479
507
// Return what we have, this will be ok() if and only
480
508
// if none of the requests failed.
481
- // If will be LOCK_TIMEOUT if and only if the only error
509
+ // It will be LOCK_TIMEOUT if and only if the only error
482
510
// we see was LOCK_TIMEOUT.
483
511
return res;
484
512
});
@@ -490,26 +518,59 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
490
518
return fastPathResult.get ();
491
519
}
492
520
521
+ // we got a lock timeout response for the fast path locking...
493
522
{
494
523
// in case of fast path failure, we need to cleanup engines
495
524
auto requests = cleanupEngines (fastPathResult.get ().errorNumber (), _query.vocbase ().name (), serverToQueryId);
496
- // Wait for all requests to complete.
525
+ // Wait for all cleanup requests to complete.
497
526
// So we know that all Transactions are aborted.
498
- // We do NOT care for the actual result.
499
- futures::collectAll (requests).wait ();
500
- snippetIds.clear ();
527
+ Result res;
528
+ for (auto & tryRes : requests) {
529
+ network::Response const & response = tryRes.get ();
530
+ if (response.fail ()) {
531
+ // note first error, but continue iterating over all results
532
+ LOG_TOPIC (" 2d319" , DEBUG, Logger::AQL)
533
+ << " received error from server " << response.destination
534
+ << " during query cleanup: " << response.combinedResult ().errorMessage ();
535
+ res.reset (response.combinedResult ());
536
+ }
537
+ }
538
+ if (res.fail ()) {
539
+ // unable to do a proper cleanup.
540
+ // it is not safe to go on here.
541
+ cleanupGuard.cancel ();
542
+ cleanupReason = res.errorNumber ();
543
+ return res;
544
+ }
501
545
}
502
-
546
+
547
+ // fast path locking rolled back successfully!
548
+ snippetIds.clear ();
549
+
550
+ // revert the addition of servers by us
551
+ for (auto const & s : serversAdded) {
552
+ trx.state ()->removeKnownServer (s);
553
+ }
554
+
503
555
// we must generate a new query id, because the fast path setup has failed
504
556
clusterQueryId = _query.vocbase ().server ().getFeature <ClusterFeature>().clusterInfo ().uniqid ();
505
557
558
+ if (trx.isMainTransaction () && !trx.state ()->isReadOnlyTransaction ()) {
559
+ // when we are not in a streaming transaction, it is ok to roll a new trx id.
560
+ // it is not ok to change the trx id inside a streaming transaction,
561
+ // because then the caller would not be able to "talk" to the transaction
562
+ // any further.
563
+ // note: read-only transactions do not need to reroll their id, as there will
564
+ // be no locks taken.
565
+ trx.state ()->coordinatorRerollTransactionId ();
566
+ }
567
+
506
568
// set back to default lock timeout for slow path fallback
507
569
_query.setLockTimeout (oldLockTimeout);
508
570
LOG_TOPIC (" f5022" , DEBUG, Logger::AQL)
509
571
<< " Potential deadlock detected, using slow path for locking. This "
510
572
" is expected if exclusive locks are used." ;
511
573
512
- trx.state ()->coordinatorRerollTransactionId ();
513
574
514
575
// Make sure we always use the same ordering on servers
515
576
std::sort (engineInformation.begin (), engineInformation.end (),
@@ -566,7 +627,7 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
566
627
QueryId& globalQueryId) const {
567
628
if (!response.isObject () || !response.get (" result" ).isObject ()) {
568
629
LOG_TOPIC (" 0c3f2" , WARN, Logger::AQL) << " Received error information from "
569
- << server << " : " << response.toJson ();
630
+ << server << " : " << response.toJson ();
570
631
if (response.hasKey (StaticStrings::ErrorNum) &&
571
632
response.hasKey (StaticStrings::ErrorMessage)) {
572
633
return network::resultFromBody (response, TRI_ERROR_CLUSTER_AQL_COMMUNICATION)
@@ -680,7 +741,7 @@ std::vector<arangodb::network::FutureRes> EngineInfoContainerDBServerServerBased
680
741
VPackBuffer<uint8_t > body;
681
742
VPackBuilder builder (body);
682
743
builder.openObject ();
683
- builder.add (" code" , VPackValue (to_string ( errorCode) ));
744
+ builder.add (" code" , VPackValue (errorCode));
684
745
builder.close ();
685
746
requests.reserve (serverQueryIds.size ());
686
747
for (auto const & [server, queryId] : serverQueryIds) {
0 commit comments