8000 Cleanup in preparation for Replication 2.0 transaction handling (try #2) by goedderz · Pull Request #14907 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Cleanup in preparation for Replication 2.0 transaction handling (try #2) #14907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions arangod/Cluster/ClusterTrxMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ Future<Result> commitAbortTransaction(arangodb::TransactionState* state,
Result res;
for (Try<arangodb::network::Response> const& tryRes : responses) { 8000
network::Response const& resp = tryRes.get(); // throws exceptions upwards
Result res = ::checkTransactionResult(tidPlus, status, resp);
res = ::checkTransactionResult(tidPlus, status, resp);
if (res.fail()) {
break;
}
Expand Down Expand Up @@ -345,8 +345,7 @@ Future<Result> commitAbortTransaction(transaction::Methods& trx, transaction::St

} // namespace

namespace arangodb {
namespace ClusterTrxMethods {
namespace arangodb::ClusterTrxMethods {
using namespace arangodb::futures;

bool IsServerIdLessThan::operator()(ServerID const& lhs, ServerID const& rhs) const noexcept {
Expand Down Expand Up @@ -575,5 +574,4 @@ bool isElCheapo(TransactionState const& state) {
state.hasHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL));
}

} // namespace ClusterTrxMethods
} // namespace arangodb
} // namespace arangodb::ClusterTrxMethods
6 changes: 6 additions & 0 deletions arangod/Cluster/ClusterTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,10 @@ AnalyzersRevision::Ptr AnalyzersRevision::fromVelocyPack(VPackSlice const& slice
buildingRevisionSlice.getNumber<AnalyzersRevision::Revision>(),
std::move(coordinatorID), rebootID));
}

auto isShardName(std::string_view name) -> bool {
return name.size() > 1 && name[0] == 's' &&
std::all_of(name.cbegin() + 1, name.cend(), static_cast<int(&)(int)>(std::isdigit));
}

} // namespace arangodb
8 changes: 8 additions & 0 deletions arangod/Cluster/ServerState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1190,3 +1190,11 @@ Result ServerState::propagateClusterReadOnly(bool mode) {
setReadOnly(mode ? API_TRUE : API_FALSE);
return Result();
}

#ifdef ARANGODB_USE_GOOGLE_TESTS
bool ServerState::isGoogleTest() const noexcept { return _isGoogleTests; }

void ServerState::setGoogleTest(bool isGoogleTests) noexcept {
_isGoogleTests = isGoogleTests;
}
#endif
11 changes: 11 additions & 0 deletions arangod/Cluster/ServerState.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ class ServerState {
/// file where the server persists its UUID
std::string getUuidFilename() const;

#ifdef ARANGODB_USE_GOOGLE_TESTS
[[nodiscard]] bool isGoogleTest() const noexcept;
void setGoogleTest(bool isGoogleTests) noexcept;
#else
[[nodiscard]] constexpr bool isGoogleTest() const noexcept { return false; }
#endif

private:
/// @brief atomically fetches the server role
inline RoleEnum loadRole() const noexcept {
Expand Down Expand Up @@ -374,6 +381,10 @@ class ServerState {
TRI_voc_tick_t _foxxmasterSince;

bool _foxxmasterQueueupdate;

#ifdef ARANGODB_USE_GOOGLE_TESTS
bool _isGoogleTests = false;
#endif
};
} // namespace arangodb

Expand Down
4 changes: 4 additions & 0 deletions arangod/ClusterEngine/ClusterTransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ uint64_t ClusterTransactionState::numCommits() const {
return _status == transaction::Status::COMMITTED ? 1 : 0;
}

TRI_voc_tick_t ClusterTransactionState::lastOperationTick() const noexcept {
return 0;
}

std::unique_ptr<TransactionCollection> ClusterTransactionState::createTransactionCollection(
DataSourceId cid, AccessMode::Type accessType) {
return std::make_unique<ClusterTransactionCollection>(this, cid, accessType);
Expand Down
25 changes: 17 additions & 8 deletions arangod/ClusterEngine/ClusterTransactionState.h
F438
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,43 @@
#pragma once

#include "StorageEngine/TransactionState.h"
#include "VocBase/Identifiers/TransactionId.h"

struct TRI_vocbase_t;

namespace arangodb {
class Result;

namespace transaction {
struct Options;
}

/// @brief transaction type
class ClusterTransactionState final : public TransactionState {
public:
ClusterTransactionState(TRI_vocbase_t& vocbase, TransactionId tid,
transaction::Options const& options);
~ClusterTransactionState() = default;
~ClusterTransactionState() override = default;

/// @brief begin a transaction
Result beginTransaction(transaction::Hints hints) override;
[[nodiscard]] Result beginTransaction(transaction::Hints hints) override;

/// @brief commit a transaction
Result commitTransaction(transaction::Methods* trx) override;
[[nodiscard]] Result commitTransaction(transaction::Methods* trx) override;

/// @brief abort a transaction
Result abortTransaction(transaction::Methods* trx) override;
[[nodiscard]] Result abortTransaction(transaction::Methods* trx) override;

/// @brief return number of commits, including intermediate commits
uint64_t numCommits() const override;
[[nodiscard]] uint64_t numCommits() const override;

bool hasFailedOperations() const override { return false; }
[[nodiscard]] bool hasFailedOperations() const override { return false; }

[[nodiscard]] TRI_voc_tick_t lastOperationTick() const noexcept override;

protected:
std::unique_ptr<TransactionCollection> createTransactionCollection(
DataSourceId cid, AccessMode::Type accessType) override;
};

} // namespace arangodb

19 changes: 12 additions & 7 deletions arangod/IResearch/IResearchAqlAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,36 +185,41 @@ bool normalize_slice(VPackSlice const& slice, VPackBuilder& builder) {
class CalculationTransactionState final : public arangodb::TransactionState {
public:
explicit CalculationTransactionState(TRI_vocbase_t& vocbase)
: TransactionState(vocbase, arangodb::TransactionId(0), arangodb::transaction::Options()) {
: TransactionState(vocbase, arangodb::TransactionId(0),
arangodb::transaction::Options()) {
updateStatus(arangodb::transaction::Status::RUNNING); // always running to make ASSERTS happy
}

~CalculationTransactionState() {
~CalculationTransactionState() override {
if (status() == arangodb::transaction::Status::RUNNING) {
updateStatus(arangodb::transaction::Status::ABORTED); // simulate state changes to make ASSERTS happy
}
}
/// @brief begin a transaction
arangodb::Result beginTransaction(arangodb::transaction::Hints) override {
[[nodiscard]] arangodb::Result beginTransaction(arangodb::transaction::Hints) override {
return {};
}

/// @brief commit a transaction
arangodb::Result commitTransaction(arangodb::transaction::Methods*) override {
[[nodiscard]] arangodb::Result commitTransaction(arangodb::transaction::Methods*) override {
updateStatus(arangodb::transaction::Status::COMMITTED); // simulate state changes to make ASSERTS happy
return {};
}

/// @brief abort a transaction
arangodb::Result abortTransaction(arangodb::transaction::Methods*) override {
[[nodiscard]] arangodb::Result abortTransaction(arangodb::transaction::Methods*) override {
updateStatus(arangodb::transaction::Status::ABORTED); // simulate state changes to make ASSERTS happy
return {};
}

bool hasFailedOperations() const override { return false; }
[[nodiscard]] bool hasFailedOperations() const override { return false; }

/// @brief number of commits, including intermediate commits
uint64_t numCommits() const override { return 0; }
[[nodiscard]] uint64_t numCommits() const override { return 0; }

[[nodiscard]] TRI_voc_tick_t lastOperationTick() const noexcept override {
return 0;
}

std::unique_ptr<arangodb::TransactionCollection> createTransactionCollection(
arangodb::DataSourceId cid, arangodb::AccessMode::Type accessType) override {
Expand Down
3 changes: 0 additions & 3 deletions arangod/RocksDBEngine/Methods/RocksDBReadOnlyBaseMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
#include <rocksdb/status.h>

using namespace arangodb;

RocksDBReadOnlyBaseMethods::RocksDBReadOnlyBaseMethods(RocksDBTransactionState* state)
: RocksDBTransactionMethods(state) {}

void RocksDBReadOnlyBaseMethods::prepareOperation(DataSourceId cid, RevisionId rid,
TRI_voc_document_operation_e operationType) {
Expand Down
2 changes: 1 addition & 1 deletion arangod/RocksDBEngine/Methods/RocksDBReadOnlyBaseMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace arangodb {

class RocksDBReadOnlyBaseMethods : public RocksDBTransactionMethods {
public:
explicit RocksDBReadOnlyBaseMethods(RocksDBTransactionState* state);
using RocksDBTransactionMethods::RocksDBTransactionMethods;

TRI_voc_tick_t lastOperationTick() const noexcept override { return 0; }

Expand Down
35 changes: 27 additions & 8 deletions arangod/RocksDBEngine/RocksDBTransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ Result RocksDBTransactionState::commitTransaction(transaction::Methods* activeTr
cleanupTransaction(); // deletes trx
++statistics()._transactionsCommitted;
} else {
abortTransaction(activeTrx); // deletes trx
// what if this fails?
std::ignore = abortTransaction(activeTrx); // deletes trx
}
TRI_ASSERT(!_cacheTx);

Expand Down Expand Up @@ -330,18 +331,36 @@ void RocksDBTransactionState::trackIndexRemove(DataSourceId cid, IndexId idxId,
}
}

bool RocksDBTransactionState::isOnlyExclusiveTransaction() const {
bool RocksDBTransactionState::isOnlyExclusiveTransaction() const noexcept {
if (!AccessMode::isWriteOrExclusive(_type)) {
return false;
}
for (TransactionCollection* coll : _collections) {
if (AccessMode::isWrite(coll->accessType())) {
return false;
}
}
return true;
return std::none_of(_collections.cbegin(), _collections.cend(), [](auto* coll) {
return AccessMode::isWrite(coll->accessType());
});
}

bool RocksDBTransactionState::hasFailedOperations() const {
return (_status == transaction::Status::ABORTED) && hasOperations();
}

RocksDBTransactionState* RocksDBTransactionState::toState(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state);
}

RocksDBTransactionMethods* RocksDBTransactionState::toMethods(transaction::Methods* trx, DataSourceId collectionId) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state)->rocksdbMethods(collectionId);
}

void RocksDBTransactionState::prepareForParallelReads() { _parallel = true; }
bool RocksDBTransactionState::inParallelMode() const { return _parallel; }

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
RocksDBTransactionStateGuard::RocksDBTransactionStateGuard(RocksDBTransactionState* state) noexcept
: _state(state) {
Expand Down
55 changes: 22 additions & 33 deletions arangod/RocksDBEngine/RocksDBTransactionState.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,65 +66,53 @@ class RocksDBTransactionState : public TransactionState {
public:
RocksDBTransactionState(TRI_vocbase_t& vocbase, TransactionId tid,
transaction::Options const& options);
~RocksDBTransactionState();
~RocksDBTransactionState() override;

/// @brief begin a transaction
Result beginTransaction(transaction::Hints hints) override;
[[nodiscard]] Result beginTransaction(transaction::Hints hints) override;

/// @brief commit a transaction
Result commitTransaction(transaction::Methods* trx) override;
[[nodiscard]] Result commitTransaction(transaction::Methods* trx) override;

/// @brief abort a transaction
Result abortTransaction(transaction::Methods* trx) override;
[[nodiscard]] Result abortTransaction(transaction::Methods* trx) override;

virtual bool hasOperations() const noexcept = 0;
[[nodiscard]] virtual bool hasOperations() const noexcept = 0;

virtual uint64_t numOperations() const noexcept = 0;
[[nodiscard]] virtual uint64_t numOperations() const noexcept = 0;

bool hasFailedOperations() const override {
return (_status == transaction::Status::ABORTED) && hasOperations();
}
[[nodiscard]] bool hasFailedOperations() const override;

bool iteratorMustCheckBounds(DataSourceId cid, ReadOwnWrites readOwnWrites) const;
[[nodiscard]] bool iteratorMustCheckBounds(DataSourceId cid, ReadOwnWrites readOwnWrites) const;

void prepareOperation(DataSourceId cid, RevisionId rid,
TRI_voc_document_operation_e operationType);

/// @brief add an operation for a transaction collection
/// sets hasPerformedIntermediateCommit to true if an intermediate commit was
/// performed
Result addOperation(DataSourceId collectionId, RevisionId revisionId,
[[nodiscard]] Result addOperation(DataSourceId collectionId, RevisionId revisionId,
TRI_voc_document_operation_e opType,
bool& hasPerformedIntermediateCommit);

/// @brief return wrapper around rocksdb transaction
virtual RocksDBTransactionMethods* rocksdbMethods(DataSourceId collectionId) const = 0;
[[nodiscard]] virtual RocksDBTransactionMethods* rocksdbMethods(DataSourceId collectionId) const = 0;

/// @brief acquire a database snapshot if we do not yet have one.
/// Returns true if a snapshot was acquired, otherwise false (i.e., if we already had a snapshot)
virtual bool ensureSnapshot() = 0;

static RocksDBTransactionState* toState(transaction::Methods* trx) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state);
}

static RocksDBTransactionMethods* toMethods(transaction::Methods* trx, DataSourceId collectionId) {
TRI_ASSERT(trx != nullptr);
TransactionState* state = trx->state();
TRI_ASSERT(state != nullptr);
return static_cast<RocksDBTransactionState*>(state)->rocksdbMethods(collectionId);
}
[[nodiscard]] virtual bool ensureSnapshot() = 0;

[[nodiscard]] static RocksDBTransactionState* toState(transaction::Methods* trx);

[[nodiscard]] static RocksDBTransactionMethods* toMethods(transaction::Methods* trx, DataSourceId collectionId);

/// @brief make some internal preparations for accessing this state in
/// parallel from multiple threads. READ-ONLY transactions
void prepareForParallelReads() { _parallel = true; }
void prepareForParallelReads();
/// @brief in parallel mode. READ-ONLY transactions
bool inParallelMode() const { return _parallel; }
[[nodiscard]] bool inParallelMode() const;

RocksDBTransactionCollection::TrackedOperations& trackedOperations(DataSourceId cid);
[[nodiscard]] RocksDBTransactionCollection::TrackedOperations& trackedOperations(DataSourceId cid);

/// @brief Track documents inserted to the collection
/// Used to update the revision tree for replication after commit
Expand All @@ -142,9 +130,10 @@ class RocksDBTransactionState : public TransactionState {
/// Used to update the estimate after the trx committed
void trackIndexRemove(DataSourceId cid, IndexId idxObjectId, uint64_t hash);

bool isOnlyExclusiveTransaction() const;
/// @brief whether or not a transaction only has exclusive or read accesses
bool isOnlyExclusiveTransaction() const noexcept;

virtual rocksdb::SequenceNumber beginSeq() const = 0;
[[nodiscard]] virtual rocksdb::SequenceNumber beginSeq() const = 0;

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
/// @brief only needed for RocksDBTransactionStateGuard
Expand Down
2 changes: 1 addition & 1 deletion arangod/StorageEngine/TransactionCollection.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class TransactionCollection {

std::string const& collectionName() const;

AccessMode::Type accessType() const { return _accessType; }
AccessMode::Type accessType() const noexcept { return _accessType; }

Result updateUsage(AccessMode::Type accessType);

Expand Down
Loading
0