@@ -98,77 +98,33 @@ RocksDBMetadata::RocksDBMetadata()
9898 *
9999 * @param trxId The identifier for the active transaction
100100 * @param seq The sequence number immediately prior to call
101- * @return May return error if we fail to allocate and place blocker
102101 */
103- Result RocksDBMetadata::placeBlocker (TransactionId trxId, rocksdb::SequenceNumber& seq) {
104- return basics::catchToResult ([&]() -> Result {
105- Result res;
106- WRITE_LOCKER (locker, _blockerLock);
107-
108- seq = std::max (seq, _maxBlockersSequenceNumber);
102+ rocksdb::SequenceNumber RocksDBMetadata::placeBlocker (TransactionId trxId, rocksdb::SequenceNumber seq) {
103+ WRITE_LOCKER (locker, _blockerLock);
109104
110- TRI_ASSERT (_blockers.end () == _blockers.find (trxId));
111- TRI_ASSERT (_blockersBySeq.end () == _blockersBySeq.find (std::make_pair (seq, trxId)));
105+ seq = std::max (seq, _maxBlockersSequenceNumber);
112106
113- auto insert = _blockers.try_emplace (trxId, seq);
114- if (!insert.second ) {
115- return res.reset (TRI_ERROR_INTERNAL);
116- }
117- try {
118- auto crosslist = _blockersBySeq.emplace (seq, trxId);
119- if (!crosslist.second ) {
120- _blockers.erase (trxId);
121- return res.reset (TRI_ERROR_INTERNAL);
122- }
123- LOG_TOPIC (" 1587a" , TRACE, Logger::ENGINES)
124- << " [" << this << " ] placed blocker (" << trxId.id () << " , " << seq << " )" ;
125- } catch (...) {
126- _blockers.erase (trxId);
127- throw ;
128- }
107+ TRI_ASSERT (_blockers.end () == _blockers.find (trxId));
108+ TRI_ASSERT (_blockersBySeq.end () == _blockersBySeq.find (std::make_pair (seq, trxId)));
129109
130- _maxBlockersSequenceNumber = seq;
131- return res;
132- });
133- }
134-
135- /* *
136- * @brief Update a blocker to allow proper commit/serialize semantics
137- *
138- * Should be called after initializing an internal trx.
139- *
140- * @param trxId The identifier for the active transaction (should match input
141- * to earlier `placeBlocker` call)
142- * @param seq The sequence number from the internal snapshot
143- * @return May return error if we fail to allocate and place blocker
144- */
145- Result RocksDBMetadata::updateBlocker (TransactionId trxId, rocksdb::SequenceNumber seq) {
146- return basics::catchToResult ([&]() -> Result {
147- Result res;
148- WRITE_LOCKER (locker, _blockerLock);
149-
150- auto previous = _blockers.find (trxId);
151- if (_blockers.end () == previous ||
152- _blockersBySeq.end () ==
153- _blockersBySeq.find (std::make_pair (previous->second , trxId))) {
154- res.reset (TRI_ERROR_INTERNAL);
155- }
156-
157- auto removed = _blockersBySeq.erase (std::make_pair (previous->second , trxId));
158- if (!removed) {
159- return res.reset (TRI_ERROR_INTERNAL);
110+ auto insert = _blockers.try_emplace (trxId, seq);
111+ if (!insert.second ) {
112+ THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_INTERNAL, " duplicate sequence number in placeBlocker" );
113+ }
114+ try {
115+ if (!_blockersBySeq.emplace (seq, trxId).second ) {
116+ THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_INTERNAL, " duplicate sequence number for crosslist in placeBlocker" );
160117 }
118+ } catch (...) {
119+ _blockers.erase (trxId);
120+ throw ;
121+ }
161122
162- TRI_ASSERT (seq >= _blockers[trxId]);
163- _blockers[trxId] = seq;
164- auto crosslist = _blockersBySeq.emplace (seq, trxId);
165- if (!crosslist.second ) {
166- return res.reset (TRI_ERROR_INTERNAL);
167- }
168- LOG_TOPIC (" 1587c" , TRACE, Logger::ENGINES)
169- << " [" << this << " ] updated blocker (" << trxId.id () << " , " << seq << " )" ;
170- return res;
171- });
123+ _maxBlockersSequenceNumber = seq;
124+
125+ LOG_TOPIC (" 1587a" , TRACE, Logger::ENGINES)
126+ << " [" << this << " ] placed blocker (" << trxId.id () << " , " << seq << " )" ;
127+ return seq;
172128}
173129
174130/* *
@@ -181,7 +137,7 @@ Result RocksDBMetadata::updateBlocker(TransactionId trxId, rocksdb::SequenceNumb
181137 * @param trxId Identifier for active transaction (should match input to
182138 * earlier `placeBlocker` call)
183139 */
184- void RocksDBMetadata::removeBlocker (TransactionId trxId) {
140+ void RocksDBMetadata::removeBlocker (TransactionId trxId) noexcept try {
185141 WRITE_LOCKER (locker, _blockerLock);
186142 auto it = _blockers.find (trxId);
187143
@@ -195,11 +151,11 @@ void RocksDBMetadata::removeBlocker(TransactionId trxId) {
195151 LOG_TOPIC (" 1587b" , TRACE, Logger::ENGINES)
196152 << " [" << this << " ] removed blocker (" << trxId.id () << " )" ;
197153 }
198- }
154+ } catch (...) {}
199155
200156// / @brief check if there is blocker with a seq number lower or equal to
201157// / the specified number
202- bool RocksDBMetadata::hasBlockerUpTo (rocksdb::SequenceNumber seq) const {
158+ bool RocksDBMetadata::hasBlockerUpTo (rocksdb::SequenceNumber seq) const noexcept {
203159 READ_LOCKER (locker, _blockerLock);
204160
205161 if (_blockersBySeq.empty ()) {
@@ -347,20 +303,11 @@ Result RocksDBMetadata::serializeMeta(rocksdb::WriteBatch& batch,
347303#ifdef ARANGODB_ENABLE_FAILURE_TESTS
348304 // simulate another transaction coming along and trying to commit while
349305 // we are serializing
350- TransactionId trxId = TransactionId::none ( );
306+ RocksDBBlockerGuard blocker (&coll );
351307
352308 TRI_IF_FAILURE (" TransactionChaos::blockerOnSync" ) {
353- auto & selector = coll.vocbase ().server ().getFeature <EngineSelectorFeature>();
354- auto & engine = selector.engine <RocksDBEngine>();
355- auto blockerSeq = engine.db ()->GetLatestSequenceNumber ();
356- trxId = TransactionId (transaction::Context::makeTransactionId ());
357- placeBlocker (trxId, blockerSeq);
358<
D306
code class="diff-text syntax-highlighted-line deletion">- }
359- auto blockerGuard = scopeGuard ([&] { // remove blocker afterwards
360- if (trxId.isSet ()) {
361- removeBlocker (trxId);
362- }
363- });
309+ blocker.placeBlocker ();
310+ }
364311#endif
365312
366313 TRI_ASSERT (maxCommitSeq <= appliedSeq);
@@ -805,3 +752,59 @@ void RocksDBMetadata::loadInitialNumberDocuments() {
805752 }
806753 return Result ();
807754}
755+
756+ RocksDBBlockerGuard::RocksDBBlockerGuard (LogicalCollection* collection)
757+ : _collection(collection),
758+ _trxId(TransactionId::none()) {
759+ TRI_ASSERT (_collection != nullptr );
760+ }
761+
762+ RocksDBBlockerGuard::~RocksDBBlockerGuard () {
763+ releaseBlocker ();
764+ }
765+
766+ RocksDBBlockerGuard::RocksDBBlockerGuard (RocksDBBlockerGuard&& other) noexcept
767+ : _collection(other._collection),
768+ _trxId(other._trxId) {
769+ other._trxId = TransactionId::none ();
770+ }
771+
772+ RocksDBBlockerGuard& RocksDBBlockerGuard::operator =(RocksDBBlockerGuard&& other) noexcept {
773+ releaseBlocker ();
774+
775+ _collection = other._collection ;
776+ _trxId = other._trxId ;
777+ other._trxId = TransactionId::none ();
778+ return *this ;
779+ }
780+
781+ rocksdb::SequenceNumber RocksDBBlockerGuard::placeBlocker () {
782+ TransactionId trxId = TransactionId (transaction::Context::makeTransactionId ());
783+ // generated trxId must be > 0
784+ TRI_ASSERT (trxId.isSet ());
785+ return placeBlocker (trxId);
786+ }
787+
788+ rocksdb::SequenceNumber RocksDBBlockerGuard::placeBlocker (TransactionId trxId) {
789+ // note: input trxId can be 0 during unit tests, so we cannot assert trxId.isSet() here!
790+
791+ TRI_ASSERT (!_trxId.isSet ());
792+
793+ auto & engine = _collection->vocbase ().server ().getFeature <EngineSelectorFeature>().engine <RocksDBEngine>();
794+ rocksdb::SequenceNumber blockerSeq = engine.db ()->GetLatestSequenceNumber ();
795+
796+ auto * rcoll = static_cast <RocksDBMetaCollection*>(_collection->getPhysical ());
797+ // placeBlocker() may increase the blockerSeq
798+ blockerSeq = rcoll->meta ().placeBlocker (trxId, blockerSeq);
799+ // only set _trxId if placing the blocker succeeded
800+ _trxId = trxId;
801+ return blockerSeq;
802+ }
803+
804+ void RocksDBBlockerGuard::releaseBlocker () noexcept {
805+ if (_trxId.isSet ()) {
806+ auto * rcoll = static_cast <RocksDBMetaCollection*>(_collection->getPhysical ());
807+ rcoll->meta ().removeBlocker (_trxId);
808+ _trxId = TransactionId::none ();
809+ }
810+ }
0 commit comments