8000 Bug fix/fix transaction errors (#9929) · distrubuted/arangodb@91cdcb8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 91cdcb8

Browse files
authored
Bug fix/fix transaction errors (arangodb#9929)
1 parent 9f078f3 commit 91cdcb8

File tree

5 files changed

+81
-20
lines changed

5 files changed

+81
-20
lines changed

arangod/RestHandler/RestCursorHandler.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,21 @@ RestStatus RestCursorHandler::continueExecute() {
107107

108108
void RestCursorHandler::shutdownExecute(bool isFinalized) noexcept {
109109
TRI_DEFER(RestVocbaseBaseHandler::shutdownExecute(isFinalized));
110-
auto const type = _request->requestType();
111110

112111
// request not done yet
113112
if (_state == HandlerState::PAUSED) {
114113
return;
115114
}
116-
115+
117116
// only trace create cursor requests
118-
if (type != rest::RequestType::POST) {
117+
if (_request->requestType() != rest::RequestType::POST) {
119118
return;
120119
}
120+
121+
// destroy the query context.
122+
// this is needed because the context is managing resources (e.g. leases
123+
// for a managed transaction) that we want to free as early as possible
124+
_queryResult.context.reset();
121125

122126
if (!_isValidForFinalize || _auditLogged) {
123127
// set by RestCursorHandler before
@@ -344,6 +348,11 @@ RestStatus RestCursorHandler::handleQueryResult() {
344348
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
345349
}
346350
generateResult(rest::ResponseCode::CREATED, std::move(buffer), _queryResult.context);
351+
// directly after returning from here, we will free the query's context and free the
352+
// resources it uses (e.g. leases for a managed transaction). this way the server
353+
// can send back the query result to the client and the client can make follow-up
354+
// requests on the same transaction (e.g. trx.commit()) without the server code for
355+
// freeing the resources and the client code racing for who's first
347356
return RestStatus::DONE;
348357
} else {
349358
// result is bigger than batchSize, and a cursor will be created

arangod/RestHandler/RestIndexHandler.cpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "ApplicationFeatures/ApplicationServer.h"
2626
#include "Cluster/ClusterInfo.h"
2727
#include "Cluster/ServerState.h"
28+
#include "Transaction/StandaloneContext.h"
2829
#include "Utils/Events.h"
2930
#include "Utils/SingleCollectionTransaction.h"
3031
#include "VocBase/LogicalCollection.h"
@@ -171,14 +172,30 @@ RestStatus RestIndexHandler::getSelectivityEstimates() {
171172
// .............................................................................
172173

173174
bool found = false;
174-
std::string cName = _request->value("collection", found);
175+
std::string const& cName = _request->value("collection", found);
175176
if (cName.empty()) {
176177
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND);
177178
return RestStatus::DONE;
178179
}
179180

180181
// transaction protects access onto selectivity estimates
181-
auto trx = createTransaction(cName, AccessMode::Type::READ);
182+
std::unique_ptr<SingleCollectionTransaction> trx;
183+
184+
try {
185+
trx = createTransaction(cName, AccessMode::Type::READ);
186+
} catch (basics::Exception const& ex) {
187+
if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) {
188+
// this will happen if the tid of a managed transaction is passed in,
189+
// but the transaction hasn't yet started on the DB server. in
190+
// this case, we create an ad-hoc transaction on the underlying
191+
// collection
192+
trx = std::make_unique<SingleCollectionTransaction>(transaction::StandaloneContext::Create(_vocbase), cName, AccessMode::Type::READ);
193+
} else {
194+
throw;
195+
}
196+
}
197+
198+
TRI_ASSERT(trx != nullptr);
182199

183200
Result res = trx->begin();
184201
if (res.fail()) {

arangod/Transaction/Manager.cpp

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
#include <velocypack/Iterator.h>
5151
#include <velocypack/velocypack-aliases.h>
5252

53+
#include <thread>
54+
5355
namespace {
5456
bool authorized(std::string const& user) {
5557
auto const& exec = arangodb::ExecContext::current();
@@ -478,8 +480,8 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
478480
if (_disallowInserts.load(std::memory_order_acquire)) {
479481
return nullptr;
480482
}
481-
482-
const size_t bucket = getBucket(tid);
483+
484+
size_t const bucket = getBucket(tid);
483485
int i = 0;
484486
TransactionState* state = nullptr;
485487
do {
@@ -511,8 +513,10 @@ std::shared_ptr<transaction::Context> Manager::leaseManagedTrx(TRI_voc_tid_t tid
511513
state = mtrx.state;
512514
break;
513515
}
514-
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
515-
"transaction is already in use");
516+
517+
LOG_TOPIC("abd72", DEBUG, Logger::TRANSACTIONS) << "transaction '" << tid << "' is already in use";
518+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_LOCKED,
519+
std::string("transaction '") + std::to_string(tid) + "' is already in use");
516520
}
517521

518522
writeLocker.unlock(); // failure;
@@ -594,13 +598,34 @@ transaction::Status Manager::getManagedTrxStatus(TRI_voc_tid_t tid) const {
594598
return transaction::Status::ABORTED;
595599
}
596600
}
601+
602+
603+
Result Manager::statusChangeWithTimeout(TRI_voc_tid_t tid, transaction::Status status) {
604+
double startTime = 0.0;
605+
constexpr double maxWaitTime = 2.0;
606+
Result res;
607+
while (true) {
608+
res = updateTransaction(tid, status, false);
609+
if (res.ok() || !res.is(TRI_ERROR_LOCKED)) {
610+
break;
611+
}
612+
if (startTime <= 0.0001) { // fp tolerance
613+
startTime = TRI_microtime();
614+
} else if (TRI_microtime() - startTime > maxWaitTime) {
615+
// timeout
616+
break;
617+
}
618+
std::this_thread::yield();
619+
}
620+
return res;
621+
}
597622

598623
Result Manager::commitManagedTrx(TRI_voc_tid_t tid) {
599-
return updateTransaction(tid, transaction::Status::COMMITTED, false);
624+
return statusChangeWithTimeout(tid, transaction::Status::COMMITTED);
600625
}
601626

602627
Result Manager::abortManagedTrx(TRI_voc_tid_t tid) {
603-
return updateTransaction(tid, transaction::Status::ABORTED, false);
628+
return statusChangeWithTimeout(tid, transaction::Status::ABORTED);
604629
}
605630

606631
Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
@@ -612,7 +637,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
612637
<< "managed trx '" << tid << " updating to '" << status << "'";
613638

614639
Result res;
615-
const size_t bucket = getBucket(tid);
640+
size_t const bucket = getBucket(tid);
616641
bool wasExpired = false;
617642

618643
std::unique_ptr<TransactionState> state;
@@ -629,9 +654,11 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
629654
ManagedTrx& mtrx = it->second;
630655
TRY_WRITE_LOCKER(tryGuard, mtrx.rwlock);
631656
if (!tryGuard.isLocked()) {
632-
return res.reset(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
657+
LOG_TOPIC("dfc30", DEBUG, Logger::TRANSACTIONS) << "transaction '" << tid << "' is in use";
658+
return res.reset(TRI_ERROR_LOCKED,
633659
std::string("transaction '") + std::to_string(tid) + "' is in use");
634660
}
661+
TRI_ASSERT(tryGuard.isLocked());
635662

636663
if (mtrx.type == MetaType::StandaloneAQL) {
637664
return res.reset(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION,
@@ -649,7 +676,7 @@ Result Manager::updateTransaction(TRI_voc_tid_t tid, transaction::Status status,
649676
}
650677
}
651678

652-
if (mtrx.expired()) {
679+
if (mtrx.expired() && status != transaction::Status::ABORTED) {
653680
status = transaction::Status::ABORTED;
654681
wasExpired = true;
655682
}
@@ -742,10 +769,9 @@ bool Manager::garbageCollect(bool abortAll) {
742769
auto it = _transactions[bucket]._managed.begin();
743770
while (it != _transactions[bucket]._managed.end()) {
744771
ManagedTrx& mtrx = it->second;
745-
772+
746773
if (mtrx.type == MetaType::Managed) {
747774
TRI_ASSERT(mtrx.state != nullptr);
748-
749775
if (abortAll || mtrx.expired()) {
750776
TRY_READ_LOCKER(tryGuard, mtrx.rwlock); // needs lock to access state
751777

@@ -777,12 +803,18 @@ bool Manager::garbageCollect(bool abortAll) {
777803
"transaction: '"
778804
<< tid << "'";
779805
Result res = updateTransaction(tid, Status::ABORTED, /*clearSrvs*/ true);
780-
if (res.fail()) {
806+
// updateTransaction can return TRI_ERROR_TRANSACTION_ABORTED when it
807+
// successfully aborts, so ignore this error.
808+
// we can also get the TRI_ERROR_LOCKED error in case we cannot
809+
// immediately acquire the lock on the transaction. this _can_ happen
810+
// infrequently, but is not an error
811+
if (res.fail() &&
812+
!res.is(TRI_ERROR_TRANSACTION_ABORTED) &&
813+
!res.is(TRI_ERROR_LOCKED)) {
781814
LOG_TOPIC("0a07f", INFO, Logger::TRANSACTIONS) << "error while aborting "
782815
"transaction: '"
783816
<< res.errorMessage() << "' F42D ";
784817
}
785-
786818
didWork = true;
787819
}
788820

arangod/Transaction/Manager.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,10 @@ class Manager final {
191191
}
192192

193193
private:
194-
// hashes the transaction id into a bucket
194+
/// @brief performs a status change on a transaction using a timeout
195+
Result statusChangeWithTimeout(TRI_voc_tid_t tid, transaction::Status status);
196+
197+
/// @brief hashes the transaction id into a bucket
195198
inline size_t getBucket(TRI_voc_tid_t tid) const {
196199
return std::hash<TRI_voc_cid_t>()(tid) % numBuckets;
197200
}

tests/Transaction/Manager-test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit_while_in_use) {
261261
OperationOptions opts;
262262
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
263263
ASSERT_TRUE(opRes.ok());
264-
ASSERT_TRUE(mgr->commitManagedTrx(tid).is(TRI_ERROR_TRANSACTION_DISALLOWED_OPERATION));
264+
ASSERT_EQ(TRI_ERROR_LOCKED, mgr->commitManagedTrx(tid).errorNumber());
265265
ASSERT_TRUE(trx.finish(opRes.result).ok());
266266
}
267267
ASSERT_TRUE((mgr->getManagedTrxStatus(tid) == transaction::Status::RUNNING));

0 commit comments

Comments
 (0)
0