8000 Cleanup in preparation for Replication 2.0 transaction handling (try #2) by goedderz · Pull Request #14907 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Cleanup in preparation for Replication 2.0 transaction handling (try #2) #14907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Took over some refactoring from a previous branches
  • Loading branch information
goedderz committed Oct 14, 2021
commit 2b27872e472bfc2f326eded7452d543aae2a4a86
8 changes: 3 additions & 5 deletions arangod/Cluster/ClusterTrxMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ Future<Result> commitAbortTransaction(arangodb::TransactionState* state,
Result res;
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& resp = tryRes.get(); // throws exceptions upwards
Result res = ::checkTransactionResult(tidPlus, status, resp);
res = ::checkTransactionResult(tidPlus, status, resp);
if (res.fail()) {
break;
}
Expand Down Expand Up @@ -345,8 +345,7 @@ Future<Result> commitAbortTransaction(transaction::Methods& trx, transaction::St

} // namespace

namespace arangodb {
namespace ClusterTrxMethods {
namespace arangodb::ClusterTrxMethods {
using namespace arangodb::futures;

bool IsServerIdLessThan::operator()(ServerID const& lhs, ServerID const& rhs) const noexcept {
Expand Down Expand Up @@ -575,5 +574,4 @@ bool isElCheapo(TransactionState const& state) {
state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL));
}

} // namespace ClusterTrxMethods
} // namespace arangodb
} // namespace arangodb::ClusterTrxMethods
6 changes: 6 additions & 0 deletions arangod/Cluster/ClusterTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,10 @@ AnalyzersRevision::Ptr AnalyzersRevision::fromVelocyPack(VPackSlice const& slice
buildingRevisionSlice.getNumber<AnalyzersRevision::Revision>(),
std::move(coordinatorID), rebootID));
}

auto isShardName(std::string_view name) -> bool {
return name.size() > 1 && name[0] == 's' &&
std::all_of(name.cbegin() + 1, name.cend(), static_cast<int(&)(int)>(std::isdigit));
}

} // namespace arangodb
19 changes: 12 additions & 7 deletions arangod/IResearch/IResearchAqlAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,36 +185,41 @@ bool normalize_slice(VPackSlice const& slice, VPackBuilder& builder) {
class CalculationTransactionState final : public arangodb::TransactionState {
public:
explicit CalculationTransactionState(TRI_vocbase_t& vocbase)
: TransactionState(vocbase, arangodb::TransactionId(0), arangodb::transaction::Options()) {
: TransactionState(vocbase, arangodb::TransactionId(0),
arangodb::transaction::Options()) {
updateStatus(arangodb::transaction::Status::RUNNING); // always running to make ASSERTS happy
}

~CalculationTransactionState() {
~CalculationTransactionState() override {
if (status() == arangodb::transaction::Status::RUNNING) {
updateStatus(arangodb::transaction::Status::ABORTED); // simulate state changes to make ASSERTS happy
}
}
/// @brief begin a transaction
arangodb::Result beginTransaction(arangodb::transaction::Hints) override {
[[nodiscard]] arangodb::Result beginTransaction(arangodb::transaction::Hints) override {
return {};
}

/// @brief commit a transaction
arangodb::Result commitTransaction(arangodb::transaction::Methods*) override {
[[nodiscard]] arangodb::Result commitTransaction(arangodb::transaction::Methods*) override {
updateStatus(arangodb::transaction::Status::COMMITTED); // simulate state changes to make ASSERTS happy
return {};
}

/// @brief abort a transaction
arangodb::Result abortTransaction(arangodb::transaction::Methods*) override {
[[nodiscard]] arangodb::Result abortTransaction(arangodb::transaction::Methods*) override {
updateStatus(arangodb::transaction::Status::ABORTED); // simulate state changes to make ASSERTS happy
return {};
}

bool hasFailedOperations() const override { return false; }
[[nodiscard]] bool hasFailedOperations() const override { return false; }

/// @brief number of commits, including intermediate commits
uint64_t numCommits() const override { return 0; }
[[nodiscard]] uint64_t numCommits() const override { return 0; }

[[nodiscard]] TRI_voc_tick_t lastOperationTick() const noexcept override {
return 0;
}
};

/// @brief Dummy transaction context which just gives dummy state
Expand Down
3 changes: 0 additions & 3 deletions arangod/RocksDBEngine/Methods/RocksDBReadOnlyBaseMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
#include <rocksdb/status.h>

using namespace arangodb;

RocksDBReadOnlyBaseMethods::RocksDBReadOnlyBaseMethods(RocksDBTransactionState* state)
: RocksDBTransactionMethods(state) {}

void RocksDBReadOnlyBaseMethods::prepareOperation(DataSourceId cid, RevisionId rid,
TRI_voc_document_operation_e operationType) {
Expand Down
2 changes: 1 addition & 1 deletion arangod/RocksDBEngine/Methods/RocksDBReadOnlyBaseMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace arangodb {

class RocksDBReadOnlyBaseMethods : public RocksDBTransactionMethods {
public:
explicit RocksDBReadOnlyBaseMethods(RocksDBTransactionState* state);
using RocksDBTransactionMethods::RocksDBTransactionMethods;

TRI_voc_tick_t lastOperationTick() const noexcept override { return 0; }

Expand Down
25 changes: 24 additions & 1 deletion arangod/RocksDBEngine/RocksDBTransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28 A93C 0,7 +280,8 @@ Result RocksDBTransactionState::commitTransaction(transaction::Methods* activeTr
cleanupTransaction(); // deletes trx
++statistics()._transactionsCommitted;
} else {
abortTransaction(activeTrx); // deletes trx
// what if this fails?
std::ignore = abortTransaction(activeTrx); // deletes trx
}
TRI_ASSERT(!_cacheTx);

Expand Down Expand Up @@ -450,6 +451,28 @@ rocksdb::SequenceNumber RocksDBTransactionState::beginSeq() const {
return _rocksMethods->GetSequenceNumber();
}

bool RocksDBTransactionState::hasFailedOperations() const {
return (_status == transaction::Status::ABORTED) && hasOperations();
}
RocksDBTransactionState* RocksDBTransactionState::toState(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state);
}
RocksDBTransactionMethods* RocksDBTransactionState::toMethods(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state)->rocksdbMethods();
}
void RocksDBTransactionState::prepareForParallelReads() { _parallel = true; }
bool RocksDBTransactionState::inParallelMode() const { return _parallel; }
RocksDBTransactionMethods* RocksDBTransactionState::rocksdbMethods() {
TRI_ASSERT(_rocksMethods);
return _rocksMethods.get();
}

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
RocksDBTransactionStateGuard::RocksDBTransactionStateGuard(RocksDBTransactionState* state) noexcept
: _state(state) {
Expand Down
57 changes: 21 additions & 36 deletions arangod/RocksDBEngine/RocksDBTransactionState.h
10000
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,35 @@ class RocksDBTransactionState final : public F438 TransactionState {
public:
RocksDBTransactionState(TRI_vocbase_t& vocbase, TransactionId tid,
transaction::Options const& options);
~RocksDBTransactionState();
~RocksDBTransactionState() override;

/// @brief begin a transaction
Result beginTransaction(transaction::Hints hints) override;
[[nodiscard]] Result beginTransaction(transaction::Hints hints) override;

/// @brief commit a transaction
Result commitTransaction(transaction::Methods* trx) override;
[[nodiscard]] Result commitTransaction(transaction::Methods* trx) override;

/// @brief abort a transaction
Result abortTransaction(transaction::Methods* trx) override;
[[nodiscard]] Result abortTransaction(transaction::Methods* trx) override;

/// @returns tick of last operation in a transaction
/// @note the value is guaranteed to be valid only after
/// transaction is committed
TRI_voc_tick_t lastOperationTick() const noexcept override;
[[nodiscard]] TRI_voc_tick_t lastOperationTick() const noexcept override;

/// @brief number of commits, including intermediate commits
uint64_t numCommits() const override;
[[nodiscard]] uint64_t numCommits() const override;

bool hasOperations() const noexcept;
[[nodiscard]] bool hasOperations() const noexcept;

uint64_t numOperations() const noexcept;
[[nodiscard]] uint64_t numOperations() const noexcept;

bool hasFailedOperations() const override {
return (_status == transaction::Status::ABORTED) && hasOperations();
}
[[nodiscard]] bool hasFailedOperations() const override;

void beginQuery(bool isModificationQuery) override;
void endQuery(bool isModificationQuery) noexcept override;

bool iteratorMustCheckBounds(ReadOwnWrites readOwnWrites) const;
[[nodiscard]] bool iteratorMustCheckBounds(ReadOwnWrites readOwnWrites) const;

void prepareOperation(DataSourceId cid, RevisionId rid,
TRI_voc_document_operation_e operationType);
Expand All @@ -107,41 +105,28 @@ class RocksDBTransactionState final : public TransactionState {
/// @brief add an operation for a transaction collection
/// sets hasPerformedIntermediateCommit to true if an intermediate commit was
/// performed
Result addOperation(DataSourceId collectionId, RevisionId revisionId,
[[nodiscard]] Result addOperation(DataSourceId collectionId, RevisionId revisionId,
TRI_voc_document_operation_e opType,
bool& hasPerformedIntermediateCommit);

/// @brief return wrapper around rocksdb transaction
RocksDBTransactionMethods* rocksdbMethods() {
TRI_ASSERT(_rocksMethods);
return _rocksMethods.get();
}
[[nodiscard]] RocksDBTransactionMethods* rocksdbMethods();

/// @brief acquire a database snapshot if we do not yet have one.
/// Returns true if a snapshot was acquired, otherwise false (i.e., if we already had a snapshot)
bool ensureSnapshot();
[[nodiscard]] bool ensureSnapshot();

static RocksDBTransactionState* toState(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state);
}

static RocksDBTransactionMethods* toMethods(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state)->rocksdbMethods();
}
[[nodiscard]] static RocksDBTransactionState* toState(transaction::Methods* trx);

[[nodiscard]] static RocksDBTransactionMethods* toMethods(transaction::Methods* trx);

/// @brief make some internal preparations for accessing this state in
/// parallel from multiple threads. READ-ONLY transactions
void prepareForParallelReads() { _parallel = true; }
void prepareForParallelReads();
/// @brief in parallel mode. READ-ONLY transactions
bool inParallelMode() const { return _parallel; }
[[nodiscard]] bool inParallelMode() const;

RocksDBTransactionCollection::TrackedOperations& trackedOperations(DataSourceId cid);
[[nodiscard]] RocksDBTransactionCollection::TrackedOperations& trackedOperations(DataSourceId cid);

/// @brief Track documents inserted to the collection
/// Used to update the revision tree for replication after commit
Expand All @@ -159,9 +144,9 @@ class RocksDBTransactionState final : public TransactionState {
/// Used to update the estimate after the trx committed
void trackIndexRemove(DataSourceId cid, IndexId idxObjectId, uint64_t hash);

bool isOnlyExclusiveTransaction() const;
[[nodiscard]] bool isOnlyExclusiveTransaction() const;

rocksdb::SequenceNumber beginSeq() const;
[[nodiscard]] rocksdb::SequenceNumber beginSeq() const;

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
/// @brief only needed for RocksDBTransactionStateGuard
Expand Down
54 changes: 29 additions & 25 deletions arangod/StorageEngine/TransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "Basics/DebugRaceController.h"
#include "Basics/Exceptions.h"
#include "Basics/StringUtils.h"
#include "Basics/overload.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
Expand Down Expand Up @@ -84,15 +85,18 @@ TransactionCollection* TransactionState::collection(DataSourceId cid,
TRI_ASSERT(_status == transaction::Status::CREATED ||
_status == transaction::Status::RUNNING);

size_t unused;
TransactionCollection* trxCollection = findCollection(cid, unused);

if (trxCollection == nullptr || !trxCollection->canAccess(accessType)) {
// not found or not accessible in the requested mode
return nullptr;
}

return trxCollection;
auto collectionOrPos = findCollectionOrPos(cid);

return std::visit(overload{
[](CollectionNotFound const&) -> TransactionCollection* {
return nullptr;
},
[&](CollectionFound const& colFound) -> TransactionCollection* {
auto* const col = colFound.collection;
return col->canAccess(accessType) ? col : nullptr;
},
},
collectionOrPos);
}

/// @brief return the collection from a transaction
Expand All @@ -114,7 +118,7 @@ TransactionCollection* TransactionState::collection(std::string const& name,
return (*it);
}

TransactionState::Cookie* TransactionState::cookie(void const* key) noexcept {
TransactionState::Cookie* TransactionState::cookie(void const* key) const noexcept {
auto itr = _cookies.find(key);

return itr == _cookies.end() ? nullptr : itr->second.get();
Expand Down Expand Up @@ -194,10 +198,9 @@ Result TransactionState::addCollectionInternal(DataSourceId cid, std::string con
Result res;

// check if we already got this collection in the _collections vector
size_t position = 0;
TransactionCollection* trxColl = findCollection(cid, position);

if (trxColl != nullptr) {
auto colOrPos = findCollectionOrPos(cid);
if (std::holds_alternative<CollectionFound>(colOrPos)) {
auto* const trxColl = std::get<CollectionFound>(colOrPos).collection;
LOG_TRX("ad6d0", TRACE, this)
<< "updating collection usage " << cid << ": '" << cname << "'";

Expand All @@ -212,6 +215,9 @@ Result TransactionState::addCollectionInternal(DataSourceId cid, std::string con
// collection is already contained in vector
return res.reset(trxColl->updateUsage(accessType));
}
TRI_ASSERT(std::holds_alternative<CollectionNotFound>(colOrPos));
auto const position = std::get<CollectionNotFound>(colOrPos).lowerBound;


// collection not found.

Expand Down Expand Up @@ -242,11 +248,10 @@ Result TransactionState::addCollectionInternal(DataSourceId cid, std::string con
}

// collection was not contained. now create and insert it
TRI_ASSERT(trxColl == nullptr);

StorageEngine& engine = vocbase().server().getFeature<EngineSelectorFeature>().engine();

trxColl = engine.createTransactionCollection(*this, cid, accessType).release();
auto* const trxColl = engine.createTransactionCollection(*this, cid, accessType).release();

TRI_ASSERT(trxColl != nullptr);

Expand Down Expand Up @@ -302,21 +307,22 @@ TransactionCollection* TransactionState::findCollection(DataSourceId cid) const
/// The idea is if a collection is found it will be returned.
/// In this case the position is not used.
/// In case the collection is not found. It will return a
/// nullptr and the position will be set. The position
/// lower bound of its position. The position
/// defines where the collection should be inserted,
/// so whenever we want to insert the collection we
/// have to use this position for insert.
TransactionCollection* TransactionState::findCollection(DataSourceId cid,
size_t& position) const {
auto TransactionState::findCollectionOrPos(DataSourceId cid) const
-> std::variant<CollectionNotFound, CollectionFound> {
size_t const n = _collections.size();
size_t i;

// TODO We could do a binary search here.
for (i = 0; i < n; ++i) {
auto trxCollection = _collections[i];
auto* trxCollection = _collections[i];

if (cid == trxCollection->id()) {
// found
return trxCollection;
return CollectionFound{trxCollection};
}

if (cid < trxCollection->id()) {
Expand All @@ -326,10 +332,8 @@ TransactionCollection* TransactionState::findCollection(DataSourceId cid,
// next
}

// update the insert position if required
position = i;

return nullptr;
// return the insert position if required
return CollectionNotFound{i};
}

void TransactionState::setExclusiveAccessType() {
Expand Down
Loading
1CF5
0