8000 simplify blocker handling (#14508) · arangodb/arangodb@e76f873 · GitHub
[go: up one dir, main page]

Skip to content

Commit e76f873

Browse files
authored
simplify blocker handling (#14508)
1 parent d9e2450 commit e76f873

File tree

8 files changed

+146
-166
lines changed

8 files changed

+146
-166
lines changed

arangod/RocksDBEngine/RocksDBCollection.cpp

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -733,17 +733,14 @@ Result RocksDBCollection::truncate(transaction::Methods& trx, OperationOptions&
733733
}
734734

735735
// pre commit sequence needed to place a blocker
736-
rocksdb::SequenceNumber seq = db->GetLatestSequenceNumber();
737-
auto guard = scopeGuard([&] { // remove blocker afterwards
738-
_meta.removeBlocker(state->id());
739-
});
740-
_meta.placeBlocker(state->id(), seq);
736+
RocksDBBlockerGuard blocker(&_logicalCollection);
737+
blocker.placeBlocker(state->id());
741738

742739
rocksdb::WriteBatch batch;
743740
// delete documents
744741
RocksDBKeyBounds bounds = RocksDBKeyBounds::CollectionDocuments(objectId());
745742
rocksdb::Status s =
746-
batch.DeleteRange(bounds.columnFamily(), bounds.start(), bounds.end());
743+
batch.DeleteRange(bounds.columnFamily(), bounds.start(), bounds.end());
747744
if (!s.ok()) {
748745
return rocksutils::convertStatus(s);
749746
}
@@ -779,7 +776,7 @@ Result RocksDBCollection::truncate(transaction::Methods& trx, OperationOptions&
779776
return rocksutils::convertStatus(s);
780777
}
781778

782-
seq = db->GetLatestSequenceNumber() - 1; // post commit sequence
779+
rocksdb::SequenceNumber seq = db->GetLatestSequenceNumber() - 1; // post commit sequence
783780

784781
uint64_t numDocs = _meta.numberDocuments();
785782
_meta.adjustNumberDocuments(seq, /*revision*/ newRevisionId(),
@@ -793,14 +790,12 @@ Result RocksDBCollection::truncate(transaction::Methods& trx, OperationOptions&
793790
}
794791
bufferTruncate(seq);
795792

796-
guard.fire(); // remove blocker
797-
798793
TRI_ASSERT(!state->hasOperations()); // not allowed
799-
return Result{};
794+
return {};
800795
}
801796

802797
TRI_IF_FAILURE("RocksDBRemoveLargeRangeOff") {
803-
return Result(TRI_ERROR_DEBUG);
798+
return {TRI_ERROR_DEBUG};
804799
}
805800

806801
// normal transactional truncate

arangod/RocksDBEngine/RocksDBMetaCollection.cpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,7 @@ uint64_t RocksDBMetaCollection::recalculateCounts() {
184184
// makes sure collection doesn't get unloaded
185185
CollectionGuard collGuard(&vocbase, _logicalCollection.id());
186186

187-
TransactionId trxId{0};
188-
auto blockerGuard = scopeGuard([&] { // remove blocker afterwards
189-
if (trxId.isSet()) {
190-
_meta.removeBlocker(trxId);
191-
}
192-
});
187+
RocksDBBlockerGuard blocker(&_logicalCollection);
193188

194189
uint64_t snapNumberOfDocuments = 0;
195190
{
@@ -202,12 +197,8 @@ uint64_t RocksDBMetaCollection::recalculateCounts() {
202197
THROW_ARANGO_EXCEPTION(res);
203198
}
204199

205-
// generate a unique transaction id for a blocker
206-
trxId = TransactionId(transaction::Context::makeTransactionId());
207-
208200
// place a blocker. will be removed by blockerGuard automatically
209-
rocksdb::SequenceNumber seqNo = engine.db()->GetLatestSequenceNumber();
210-
_meta.placeBlocker(trxId, seqNo);
201+
blocker.placeBlocker();
211202

212203
snapshot = engine.db()->GetSnapshot();
213204
snapNumberOfDocuments = _meta.numberDocuments();

arangod/RocksDBEngine/RocksDBMetaCollection.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
////////////////////////////////////////////////////////////////////////////////
2323

2424
#pragma once
25-
#ifndef ARANGOD_ROCKSDB_ENGINE_ROCKSDB_META_COLLECTION_H
26-
#define ARANGOD_ROCKSDB_ENGINE_ROCKSDB_META_COLLECTION_H 1
2725

2826
#include "Basics/Common.h"
2927
#include "Basics/ReadWriteLock.h"
@@ -244,5 +242,3 @@ class RocksDBMetaCollection : public PhysicalCollection {
244242
};
245243

246244
} // namespace arangodb
247-
248-
#endif

arangod/RocksDBEngine/RocksDBMetadata.cpp

Lines changed: 83 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -98,77 +98,33 @@ RocksDBMetadata::RocksDBMetadata()
9898
*
9999
* @param trxId The identifier for the active transaction
100100
* @param seq The sequence number immediately prior to call
101-
* @return May return error if we fail to allocate and place blocker
102101
*/
103-
Result RocksDBMetadata::placeBlocker(TransactionId trxId, rocksdb::SequenceNumber& seq) {
104-
return basics::catchToResult([&]() -> Result {
105-
Result res;
106-
WRITE_LOCKER(locker, _blockerLock);
107-
108-
seq = std::max(seq, _maxBlockersSequenceNumber);
102+
rocksdb::SequenceNumber RocksDBMetadata::placeBlocker(TransactionId trxId, rocksdb::SequenceNumber seq) {
103+
WRITE_LOCKER(locker, _blockerLock);
109104

110-
TRI_ASSERT(_blockers.end() == _blockers.find(trxId));
111-
TRI_ASSERT(_blockersBySeq.end() == _blockersBySeq.find(std::make_pair(seq, trxId)));
105+
seq = std::max(seq, _maxBlockersSequenceNumber);
112106

113-
auto insert = _blockers.try_emplace(trxId, seq);
114-
if (!insert.second) {
115-
return res.reset(TRI_ERROR_INTERNAL);
116-
}
117-
try {
118-
auto crosslist = _blockersBySeq.emplace(seq, trxId);
119-
if (!crosslist.second) {
120-
_blockers.erase(trxId);
121-
return res.reset(TRI_ERROR_INTERNAL);
122-
}
123-
LOG_TOPIC("1587a", TRACE, Logger::ENGINES)
124-
<< "[" << this << "] placed blocker (" << trxId.id() << ", " << seq << ")";
125-
} catch (...) {
126-
_blockers.erase(trxId);
127-
throw;
128-
}
107+
TRI_ASSERT(_blockers.end() == _blockers.find(trxId));
108+
TRI_ASSERT(_blockersBySeq.end() == _blockersBySeq.find(std::make_pair(seq, trxId)));
129109

130-
_maxBlockersSequenceNumber = seq;
131-
return res;
132-
});
133-
}
134-
135-
/**
136-
* @brief Update a blocker to allow proper commit/serialize semantics
137-
*
138-
* Should be called after initializing an internal trx.
139-
*
140-
* @param trxId The identifier for the active transaction (should match input
141-
* to earlier `placeBlocker` call)
142-
* @param seq The sequence number from the internal snapshot
143-
* @return May return error if we fail to allocate and place blocker
144-
*/
145-
Result RocksDBMetadata::updateBlocker(TransactionId trxId, rocksdb::SequenceNumber seq) {
146-
return basics::catchToResult([&]() -> Result {
147-
Result res;
148-
WRITE_LOCKER(locker, _blockerLock);
149-
150-
auto previous = _blockers.find(trxId);
151-
if (_blockers.end() == previous ||
152-
_blockersBySeq.end() ==
153-
_blockersBySeq.find(std::make_pair(previous->second, trxId))) {
154-
res.reset(TRI_ERROR_INTERNAL);
155-
}
156-
157-
auto removed = _blockersBySeq.erase(std::make_pair(previous->second, trxId));
158-
if (!removed) {
159-
return res.reset(TRI_ERROR_INTERNAL);
110+
auto insert = _blockers.try_emplace(trxId, seq);
111+
if (!insert.second) {
112+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "duplicate sequence number in placeBlocker");
113+
}
114+
try {
115+
if (!_blockersBySeq.emplace(seq, trxId).second) {
116+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "duplicate sequence number for crosslist in placeBlocker");
160117
}
118+
} catch (...) {
119+
_blockers.erase(trxId);
120+
throw;
121+
}
161122

162-
TRI_ASSERT(seq >= _blockers[trxId]);
163-
_blockers[trxId] = seq;
164-
auto crosslist = _blockersBySeq.emplace(seq, trxId);
165-
if (!crosslist.second) {
166-
return res.reset(TRI_ERROR_INTERNAL);
167-
}
168-
LOG_TOPIC("1587c", TRACE, Logger::ENGINES)
169-
<< "[" << this << "] updated blocker (" << trxId.id() << ", " << seq << ")";
170-
return res;
171-
});
123+
_maxBlockersSequenceNumber = seq;
124+
125+
LOG_TOPIC("1587a", TRACE, Logger::ENGINES)
126+
<< "[" << this << "] placed blocker (" << trxId.id() << ", " << seq << ")";
127+
return seq;
172128
}
173129

174130
/**
@@ -181,7 +137,7 @@ Result RocksDBMetadata::updateBlocker(TransactionId trxId, rocksdb::SequenceNumb
181137
* @param trxId Identifier for active transaction (should match input to
182138
* earlier `placeBlocker` call)
183139
*/
184-
void RocksDBMetadata::removeBlocker(TransactionId trxId) {
140+
void RocksDBMetadata::removeBlocker(TransactionId trxId) noexcept try {
185141
WRITE_LOCKER(locker, _blockerLock);
186142
auto it = _blockers.find(trxId);
187143

@@ -195,11 +151,11 @@ void RocksDBMetadata::removeBlocker(TransactionId trxId) {
195151
LOG_TOPIC("1587b", TRACE, Logger::ENGINES)
196152
<< "[" << this << "] removed blocker (" << trxId.id() << ")";
197153
}
198-
}
154+
} catch (...) {}
199155

200156
/// @brief check if there is blocker with a seq number lower or equal to
201157
/// the specified number
202-
bool RocksDBMetadata::hasBlockerUpTo(rocksdb::SequenceNumber seq) const {
158+
bool RocksDBMetadata::hasBlockerUpTo(rocksdb::SequenceNumber seq) const noexcept {
203159
READ_LOCKER(locker, _blockerLock);
204160

205161
if (_blockersBySeq.empty()) {
@@ -347,20 +303,11 @@ Result RocksDBMetadata::serializeMeta(rocksdb::WriteBatch& batch,
347303
#ifdef ARANGODB_ENABLE_FAILURE_TESTS
348304
// simulate another transaction coming along and trying to commit while
349305
// we are serializing
350-
TransactionId trxId = TransactionId::none();
306+
RocksDBBlockerGuard blocker(&coll);
351307

352308
TRI_IF_FAILURE("TransactionChaos::blockerOnSync") {
353-
auto& selector = coll.vocbase().server().getFeature<EngineSelectorFeature>();
354-
auto& engine = selector.engine<RocksDBEngine>();
355-
auto blockerSeq = engine.db()->GetLatestSequenceNumber();
356-
trxId = TransactionId(transaction::Context::makeTransactionId());
357-
placeBlocker(trxId, blockerSeq);
358< D306 code class="diff-text syntax-highlighted-line deletion">-
}
359-
auto blockerGuard = scopeGuard([&] { // remove blocker afterwards
360-
if (trxId.isSet()) {
361-
removeBlocker(trxId);
362-
}
363-
});
309+
blocker.placeBlocker();
310+
}
364311
#endif
365312

366313
TRI_ASSERT(maxCommitSeq <= appliedSeq);
@@ -805,3 +752,59 @@ void RocksDBMetadata::loadInitialNumberDocuments() {
805752
}
806753
return Result();
807754
}
755+
756+
RocksDBBlockerGuard::RocksDBBlockerGuard(LogicalCollection* collection)
757+
: _collection(collection),
758+
_trxId(TransactionId::none()) {
759+
TRI_ASSERT(_collection != nullptr);
760+
}
761+
762+
RocksDBBlockerGuard::~RocksDBBlockerGuard() {
763+
releaseBlocker();
764+
}
765+
766+
RocksDBBlockerGuard::RocksDBBlockerGuard(RocksDBBlockerGuard&& other) noexcept
767+
: _collection(other._collection),
768+
_trxId(other._trxId) {
769+
other._trxId = TransactionId::none();
770+
}
771+
772+
RocksDBBlockerGuard& RocksDBBlockerGuard::operator=(RocksDBBlockerGuard&& other) noexcept {
773+
releaseBlocker();
774+
775+
_collection = other._collection;
776+
_trxId = other._trxId;
777+
other._trxId = TransactionId::none();
778+
return *this;
779+
}
780+
781+
rocksdb::SequenceNumber RocksDBBlockerGuard::placeBlocker() {
782+
TransactionId trxId = TransactionId(transaction::Context::makeTransactionId());
783+
// generated trxId must be > 0
784+
TRI_ASSERT(trxId.isSet());
785+
return placeBlocker(trxId);
786+
}
787+
788+
rocksdb::SequenceNumber RocksDBBlockerGuard::placeBlocker(TransactionId trxId) {
789+
// note: input trxId can be 0 during unit tests, so we cannot assert trxId.isSet() here!
790+
791+
TRI_ASSERT(!_trxId.isSet());
792+
793+
auto& engine = _collection->vocbase().server().getFeature<EngineSelectorFeature>().engine<RocksDBEngine>();
794+
rocksdb::SequenceNumber blockerSeq = engine.db()->GetLatestSequenceNumber();
795+
796+
auto* rcoll = static_cast<RocksDBMetaCollection*>(_collection->getPhysical());
797+
// placeBlocker() may increase the blockerSeq
798+
blockerSeq = rcoll->meta().placeBlocker(trxId, blockerSeq);
799+
// only set _trxId if placing the blocker succeeded
800+
_trxId = trxId;
801+
return blockerSeq;
802+
}
803+
804+
void RocksDBBlockerGuard::releaseBlocker() noexcept {
805+
if (_trxId.isSet()) {
806+
auto* rcoll = static_cast<RocksDBMetaCollection*>(_collection->getPhysical());
807+
rcoll->meta().removeBlocker(_trxId);
808+
_trxId = TransactionId::none();
809+
}
810+
}

arangod/RocksDBEngine/RocksDBMetadata.h

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
#pragma once
2525

26-
#include <mutex>
2726
#include <map>
27+
#include <mutex>
2828
#include <set>
2929

3030
#include <rocksdb/types.h>
@@ -79,21 +79,8 @@ struct RocksDBMetadata final {
7979
*
8080
* @param trxId The identifier for the active transaction
8181
* @param seq The sequence number immediately prior to call
82-
* @return May return error if we fail to allocate and place blocker
83-
*/
84-
Result placeBlocker(TransactionId trxId, rocksdb::SequenceNumber& seq);
85-
86-
/**
87-
* @brief Update a blocker to allow proper commit/serialize semantics
88-
*
89-
* Should be called after initializing an internal trx.
90-
*
91-
* @param trxId The identifier for the active transaction (should match input
92-
* to earlier `placeBlocker` call)
93-
* @param seq The sequence number from the internal snapshot
94-
* @return May return error if we fail to allocate and place blocker
9582
*/
96-
Result updateBlocker(TransactionId trxId, rocksdb::SequenceNumber seq);
83+
rocksdb::SequenceNumber placeBlocker(TransactionId trxId, rocksdb::SequenceNumber seq);
9784

9885
/**
9986
* @brief Removes an existing transaction blocker
@@ -105,7 +92,11 @@ struct RocksDBMetadata final {
10592
* @param trxId Identifier for active transaction (should match input to
10693
* earlier `placeBlocker` call)
10794
*/
108-
void removeBlocker(TransactionId trxId);
95+
void removeBlocker(TransactionId trxId) noexcept;
96+
97+
/// @brief check if there is blocker with a seq number lower or equal to
98+
/// the specified number
99+
bool hasBlockerUpTo(rocksdb::SequenceNumber seq) const noexcept;
109100

110101
/// @brief returns the largest safe seq to squash updates against
111102
rocksdb::SequenceNumber committableSeq(rocksdb::SequenceNumber maxCommitSeq) const;
@@ -150,10 +141,6 @@ struct RocksDBMetadata final {
150141
/// @brief remove collection index estimate
151142
static Result deleteIndexEstimate(rocksdb::DB*, uint64_t objectId);
152143

153-
/// @brief check if there is blocker with a seq number lower or equal to
154-
/// the specified number
155-
bool hasBlockerUpTo(rocksdb::SequenceNumber seq) const;
156-
157144
private:
158145
/// @brief apply counter adjustments, only call from sync thread
159146
bool applyAdjustments(rocksdb::SequenceNumber commitSeq);
@@ -185,5 +172,38 @@ struct RocksDBMetadata final {
185172
std::atomic<uint64_t> _numberDocuments;
186173
std::atomic<RevisionId> _revisionId;
187174
};
188-
} // namespace arangodb
189175

176+
/// helper class for acquiring and releasing a blocker.
177+
/// constructing an object of this class will do nothing, but once
178+
/// placeBlocker() is called, the object takes care of releasing the
179+
/// blocker upon destruction. An acquired blocker can also be released
180+
/// prematurely by calling releaseBlocker().
181+
class RocksDBBlockerGuard {
182+
public:
183+
explicit RocksDBBlockerGuard(LogicalCollection* collection);
184+
~RocksDBBlockerGuard();
185+
RocksDBBlockerGuard(RocksDBBlockerGuard const&) = delete;
186+
RocksDBBlockerGuard& operator=(RocksDBBlockerGuard const&) = delete;
187+
RocksDBBlockerGuard(RocksDBBlockerGuard&&) noexcept;
188+
RocksDBBlockerGuard& operator=(RocksDBBlockerGuard&&) noexcept;
189+
190+
/// @brief place a blocker without prescribing a transaction id.
191+
/// it is not allowed to call placeBlocker() if a blocker is already
192+
/// acquired by the object.
193+
rocksdb::SequenceNumber placeBlocker();
194+
195+
/// @brief place a blocker for a specific transaction id.
196+
/// it is not allowed to call placeBlocker() if a blocker is already
197+
/// acquired by the object.
198+
rocksdb::SequenceNumber placeBlocker(TransactionId id);
199+
200+
/// @brief releases an acquired blocker. will do nothing if no
201+
/// blocker is currently acquired by the object.
202+
void releaseBlocker() noexcept;
203+
204+
private:
205+
LogicalCollection* _collection;
206+
TransactionId _trxId;
207+
};
208+
209+
} // namespace arangodb

0 commit comments

Comments
 (0)
0