50
50
#include < velocypack/Iterator.h>
51
51
#include < velocypack/velocypack-aliases.h>
52
52
53
+ #include < thread>
54
+
53
55
namespace {
54
56
bool authorized (std::string const & user) {
55
57
auto const & exec = arangodb::ExecContext::current ();
@@ -478,8 +480,8 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
478
480
if (_disallowInserts.load (std::memory_order_acquire)) {
479
481
return nullptr ;
480
482
}
481
-
482
- const size_t bucket = getBucket (tid);
483
+
484
+ size_t const bucket = getBucket (tid);
483
485
int i = 0 ;
484
486
TransactionState* state = nullptr ;
485
487
do {
@@ -511,8 +513,10 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
511
513
state = mtrx.state ;
512
514
break ;
513
515
}
514
- THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
515
- " transaction is already in use" );
516
+
517
+ LOG_TOPIC (" abd72" , DEBUG, Logger::TRANSACTIONS) << " transaction '" << tid << " ' is already in use" ;
518
+ THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_LOCKED,
519
+ std::string (" transaction '" ) + std::to_string (tid) + " ' is already in use" );
516
520
}
517
521
518
522
writeLocker.unlock (); // failure;
@@ -594,13 +598,34 @@ transaction::Status Manager::getManagedTrxStatus(TRI_voc_tid_t tid) const {
594
598
return transaction::Status::ABORTED;
595
599
}
596
600
}
601
+
602
+
603
+ Result Manager::statusChangeWithTimeout (TRI_voc_tid_t tid, transaction::Status status) {
604
+ double startTime = 0.0 ;
605
+ constexpr double maxWaitTime = 2.0 ;
606
+ Result res;
607
+ while (true ) {
608
+ res = updateTransaction (tid, status, false );
609
+ if (res.ok () || !res.is (TRI_ERROR_LOCKED)) {
610
+ break ;
611
+ }
612
+ if (startTime <= 0.0001 ) { // fp tolerance
613
+ startTime = TRI_microtime ();
614
+ } else if (TRI_microtime () - startTime > maxWaitTime) {
615
+ // timeout
616
+ break ;
617
+ }
618
+ std::this_thread::yield ();
619
+ }
620
+ return res;
621
+ }
597
622
598
623
Result Manager::commitManagedTrx (TRI_voc_tid_t tid) {
599
- return updateTransaction (tid, transaction::Status::COMMITTED, false );
624
+ return statusChangeWithTimeout (tid, transaction::Status::COMMITTED);
600
625
}
601
626
602
627
Result Manager::abortManagedTrx (TRI_voc_tid_t tid) {
603
- return updateTransaction (tid, transaction::Status::ABORTED, false );
628
+ return statusChangeWithTimeout (tid, transaction::Status::ABORTED);
604
629
}
605
630
606
631
Result Manager::updateTransaction (TRI_voc_tid_t tid, transaction::Status status,
@@ -612,7 +637,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
612
637
<< " managed trx '" << tid << " updating to '" << status << " '" ;
613
638
614
639
Result res;
615
- const size_t bucket = getBucket (tid);
640
+ size_t const bucket = getBucket (tid);
616
641
bool wasExpired = false ;
617
642
618
643
std::unique_ptr<TransactionState> state;
@@ -629,9 +654,11 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
629
654
ManagedTrx& mtrx = it->second ;
630
655
TRY_WRITE_LOCKER (tryGuard, mtrx.rwlock );
631
656
if (!tryGuard.isLocked ()) {
632
- return res.reset (TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
657
+ LOG_TOPIC (" dfc30" , DEBUG, Logger::TRANSACTIONS) << " transaction '" << tid << " ' is in use" ;
658
+ return res.reset (TRI_ERROR_LOCKED,
633
659
std::string (" transaction '" ) + std::to_string (tid) + " ' is in use" );
634
660
}
661
+ TRI_ASSERT (tryGuard.isLocked ());
635
662
636
663
if (mtrx.type == MetaType::StandaloneAQL) {
637
664
return res.reset (TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
@@ -649,7 +676,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
649
676
}
650
677
}
651
678
652
- if (mtrx.expired ()) {
679
+ if (mtrx.expired () && status != transaction::Status::ABORTED ) {
653
680
status = transaction::Status::ABORTED;
654
681
wasExpired = true ;
655
682
}
@@ -742,10 +769,9 @@ bool Manager::garbageCollect(bool abortAll) {
742
769
auto it = _transactions[bucket]._managed .begin ();
743
770
while (it != _transactions[bucket]._managed .end ()) {
744
771
ManagedTrx& mtrx = it->second ;
745
-
772
+
746
773
if (mtrx.type == MetaType::Managed) {
747
774
TRI_ASSERT (mtrx.state != nullptr );
748
-
749
775
if (abortAll || mtrx.expired ()) {
750
776
TRY_READ_LOCKER (tryGuard, mtrx.rwlock ); // needs lock to access state
751
777
@@ -777,12 +803,18 @@ bool Manager::garbageCollect(bool abortAll) {
777
803
" transaction: '"
778
804
<< tid << " '" ;
779
805
Result res = updateTransaction (tid, Status::ABORTED, /* clearSrvs*/ true );
780
- if (res.fail ()) {
806
+ // updateTransaction can return TRI_ERROR_TRANSACTION_ABORTED when it
807
+ // successfully aborts, so ignore this error.
808
+ // we can also get the TRI_ERROR_LOCKED error in case we cannot
809
+ // immediately acquire the lock on the transaction. this _can_ happen
810
+ // infrequently, but is not an error
811
+ if (res.fail () &&
812
+ !res.is (TRI_ERROR_TRANSACTION_ABORTED) &&
813
+ !res.is (TRI_ERROR_LOCKED)) {
781
814
LOG_TOPIC (" 0a07f" , INFO, Logger::TRANSACTIONS) << " error while aborting "
782
815
" transaction: '"
783
816
<< res.errorMessage () << " '
F42D
" ;
784
817
}
785
-
786
818
didWork = true ;
787
819
}
788
820
0 commit comments