8000 Fix invalid assertion and always remove blocker object on replication failure by jsteemann · Pull Request #14498 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Fix invalid assertion and always remove blocker object on replication failure #14498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
v3.8.0 (XXXX-XX-XX)
-------------------

* Always remove blocker object for revision trees in case of replication
failures.

* Fix invalid assertion for insert/removal buffers positioning and internals of
`hasBlockerUpTo` function.

* Updated ArangoDB Starter to 0.15.0-1.

* Updated arangosync to 2.4.0.
Expand Down
8 changes: 7 additions & 1 deletion arangod/Replication/DatabaseInitialSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,10 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
auto context = arangodb::transaction::StandaloneContext::Create(coll->vocbase());
TransactionId blockerId = context->generateId();
physical->placeRevisionTreeBlocker(blockerId);

auto blockerGuard = scopeGuard([&] { // remove blocker afterwards
physical->removeRevisionTreeBlocker(blockerId);
});
std::unique_ptr<arangodb::SingleCollectionTransaction> trx;
transaction::Options options;
TRI_IF_FAILURE("IncrementalReplicationFrequentIntermediateCommit") {
Expand Down Expand Up @@ -1416,7 +1420,9 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
guard.fire();
return fetchCollectionSyncByKeys(coll, leaderColl, maxTick);
}
physical->removeRevisionTreeBlocker(blockerId);
// make sure revision tree blocker is removed
blockerGuard.fire();

std::vector<std::pair<std::uint64_t, std::uint64_t>> ranges =
treeLeader->diff(*treeLocal);
if (ranges.empty()) {
Expand Down
42 changes: 32 additions & 10 deletions arangod/RocksDBEngine/RocksDBMetaCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,10 @@ void RocksDBMetaCollection::bufferUpdates(rocksdb::SequenceNumber seq,
.inRecovery());
return;
}


TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

TRI_ASSERT(!inserts.empty() || !removals.empty());

Expand Down Expand Up @@ -939,12 +942,23 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
}
}
};

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

std::unique_lock<std::mutex> guard(_revisionBufferLock);

auto insertIt = _revisionInsertBuffers.begin();
auto removeIt = _revisionRemovalBuffers.begin();

auto checkIterators = [&]() {
TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() ||
_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq);
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() ||
_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq);
};

auto it = _revisionTruncateBuffer.begin(); // sorted ASC

{
Expand All @@ -968,19 +982,17 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
while (removeIt != _revisionRemovalBuffers.end() && removeIt->first <= ignoreSeq) {
removeIt = _revisionRemovalBuffers.erase(removeIt);
}

TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() || (insertIt == _revisionInsertBuffers.end() && (_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq)));
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() || (removeIt == _revisionRemovalBuffers.end() && (_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq)));

checkIterators();

// we can clear the revision tree without holding the mutex here
guard.unlock();
// clear out any revision structure, now empty
_revisionTree->clear();

guard.lock();

TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() || (insertIt == _revisionInsertBuffers.end() && (_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq)));
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() || (removeIt == _revisionRemovalBuffers.end() && (_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq)));

checkIterators();

// we have applied all changes up to here
bumpSequence(ignoreSeq);
Expand All @@ -990,10 +1002,9 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// still holding the mutex here

while (true) {
// find out if we still have buffers to apply
TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() || (insertIt == _revisionInsertBuffers.end() && (_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq)));
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() || (removeIt == _revisionRemovalBuffers.end() && (_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq)));
checkIterators();

// find out if we still have buffers to apply
bool haveInserts = insertIt != _revisionInsertBuffers.end() &&
insertIt->first <= commitSeq;
bool haveRemovals = removeIt != _revisionRemovalBuffers.end() &&
Expand All @@ -1006,6 +1017,9 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// no inserts or removals left to apply, drop out of loop
if (!applyInserts && !applyRemovals) {
// we have applied all changes up to including commitSeq
TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}
bumpSequence(commitSeq);
return;
}
Expand Down Expand Up @@ -1048,6 +1062,10 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// it is safe to retry it next time.
throw;
}

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

// move iterator forward, we need the mutex for this
guard.lock();
Expand Down Expand Up @@ -1087,6 +1105,10 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// if an exception escapes from here, the same remove will be retried next time.
throw;
}

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

// move iterator forward, we need the mutex for this
guard.lock();
Expand Down
43 changes: 38 additions & 5 deletions arangod/RocksDBEngine/RocksDBMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Basics/system-compiler.h"
#include "Random/RandomGenerator.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBColumnFamilyManager.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
Expand All @@ -41,9 +42,13 @@
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBSettingsManager.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "Transaction/Context.h"
#include "VocBase/KeyGenerator.h"
#include "VocBase/LogicalCollection.h"

#include <chrono>
#include <thread>

using namespace arangodb;

RocksDBMetadata::DocCount::DocCount(VPackSlice const& slice)
Expand Down Expand Up @@ -112,6 +117,7 @@ Result RocksDBMetadata::placeBlocker(TransactionId trxId, rocksdb::SequenceNumbe
try {
auto crosslist = _blockersBySeq.emplace(seq, trxId);
if (!crosslist.second) {
_blockers.erase(trxId);
return res.reset(TRI_ERROR_INTERNAL);
}
LOG_TOPIC("1587a", TRACE, Logger::ENGINES)
Expand Down Expand Up @@ -196,14 +202,16 @@ void RocksDBMetadata::removeBlocker(TransactionId trxId) {
bool RocksDBMetadata::hasBlockerUpTo(rocksdb::SequenceNumber seq) const {
READ_LOCKER(locker, _blockerLock);

auto it = _blockersBySeq.lower_bound(std::make_pair(seq, TransactionId(0)));
if (it == _blockersBySeq.end()) {
if (_blockersBySeq.empty()) {
return false;
}
// here, it->first can only be greater or equal to seq, but anyway.
return it->first <= seq;
}

// _blockersBySeq is sorted by sequence number first, then transaction id
// if the seq no in the first item is already less equal to our search
// value, we can abort the search. all following items in _blockersBySeq
// will only have the same or higher sequence numbers.
return _blockersBySeq.begin()->first <= seq;
}

/// @brief returns the l 9E88 argest safe seq to squash updates against
rocksdb::SequenceNumber RocksDBMetadata::committableSeq(rocksdb::SequenceNumber maxCommitSeq) const {
Expand Down Expand Up @@ -335,9 +343,34 @@ Result RocksDBMetadata::serializeMeta(rocksdb::WriteBatch& batch,
}

const rocksdb::SequenceNumber maxCommitSeq = committableSeq(appliedSeq);

#ifdef ARANGODB_ENABLE_FAILURE_TESTS
// simulate another transaction coming along and trying to commit while
// we are serializing
TransactionId trxId = TransactionId::none();

TRI_IF_FAILURE("TransactionChaos::blockerOnSync") {
auto& selector = coll.vocbase().server().getFeature<EngineSelectorFeature>();
auto& engine = selector.engine<RocksDBEngine>();
auto blockerSeq = engine.db()->GetLatestSequenceNumber();
trxId = TransactionId(transaction::Context::makeTransactionId());
placeBlocker(trxId, blockerSeq);
}
auto blockerGuard = scopeGuard([&] { // remove blocker afterwards
if (trxId.isSet()) {
removeBlocker(trxId);
}
});
#endif

TRI_ASSERT(maxCommitSeq <= appliedSeq);
TRI_ASSERT(maxCommitSeq != UINT64_MAX);
TRI_ASSERT(maxCommitSeq > 0);

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

bool didWork = applyAdjustments(maxCommitSeq);
appliedSeq = maxCommitSeq;

Expand Down
3 changes: 3 additions & 0 deletions arangod/RocksDBEngine/RocksDBReplicationContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ RocksDBReplicationContext::DumpResult RocksDBReplicationContext::dumpJson(
<< vocbase.name() << "/" << cIter->logical->name()
<< ", an offet of " << adjustment << " will be applied";
auto adjustSeq = _engine.db()->GetLatestSequenceNumber();
TRI_ASSERT(adjustSeq >= blockerSeq);
if (adjustSeq <= blockerSeq) {
adjustSeq = ::forceWrite(_engine);
TRI_ASSERT(adjustSeq > blockerSeq);
Expand Down Expand Up @@ -476,6 +477,7 @@ RocksDBReplicationContext::DumpResult RocksDBReplicationContext::dumpVPack(
<< vocbase.name() << "/" << cIter->logical->name()
<< ", an offet of " << adjustment << " will be applied";
auto adjustSeq = _engine.db()->GetLatestSequenceNumber();
TRI_ASSERT(adjustSeq >= blockerSeq);
if (adjustSeq <= blockerSeq) {
adjustSeq = ::forceWrite(_engine);
TRI_ASSERT(adjustSeq > blockerSeq);
Expand Down Expand Up @@ -615,6 +617,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(TRI_vocbase_t& vocbase
<< vocbase.name() << "/" << cIter->logical->name()
<< ", an offet of " << adjustment << " will be applied";
auto adjustSeq = _engine.db()->GetLatestSequenceNumber();
TRI_ASSERT(adjustSeq >= blockerSeq);
if (adjustSeq <= blockerSeq) {
adjustSeq = ::forceWrite(_engine);
TRI_ASSERT(adjustSeq > blockerSeq);
Expand Down
5 changes: 5 additions & 0 deletions arangod/RocksDBEngine/RocksDBTransactionCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ void RocksDBTransactionCollection::addOperation(TRI_voc_document_operation_e ope
void RocksDBTransactionCollection::prepareTransaction(TransactionId trxId, uint64_t beginSeq) {
TRI_ASSERT(_collection != nullptr);
if (hasOperations() || !_trackedOperations.empty() || !_trackedIndexOperations.empty()) {

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

auto* coll = static_cast<RocksDBMetaCollection*>(_collection->getPhysical());
TRI_ASSERT(beginSeq > 0);
coll->meta().placeBlocker(trxId, beginSeq);
Expand Down
14 changes: 14 additions & 0 deletions arangod/RocksDBEngine/RocksDBTransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Random/RandomGenerator.h"
#include "RestServer/MetricsFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBSettingsManager.h"
#include "RocksDBEngine/RocksDBSyncThread.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "Statistics/ServerStatistics.h"
Expand Down Expand Up @@ -247,6 +249,7 @@ void RocksDBTransactionState::prepareCollections() {
auto& engine = selector.engine<RocksDBEngine>();
rocksdb::TransactionDB* db = engine.db();
rocksdb::SequenceNumber preSeq = db->GetLatestSequenceNumber();

for (auto& trxColl : _collections) {
auto* coll = static_cast<RocksDBTransactionCollection*>(trxColl);
coll->prepareTransaction(id(), preSeq);
Expand Down Expand Up @@ -355,6 +358,17 @@ arangodb::Result RocksDBTransactionState::internalCommit() {

prepareCollections();

TRI_IF_FAILURE("TransactionChaos::randomSync") {
if (RandomGenerator::interval(uint32_t(1000)) > 950) {
auto& selector = vocbase().server().getFeature<EngineSelectorFeature>();
auto& engine = selector.engine<RocksDBEngine>();
auto* sm = engine.settingsManager();
if (sm) {
sm->sync(true); // force
}
}
}

// if we fail during commit, make sure we remove blockers, etc.
auto cleanupCollTrx = scopeGuard([this]() { cleanupCollections(); });

Expand Down
14 changes: 12 additions & 2 deletions tests/js/client/chaos/test-chaos-load-common.inc
704F
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ function BaseChaosSuite(testOpts) {
debugSetFailAt(getEndpointById(server.id), "Query::setupTimeout");
}
}
if (testOpts.withDelays) {
let servers = getDBServers();
assertTrue(servers.length > 0);
for (const server of servers) {
debugSetFailAt(getEndpointById(server.id), "TransactionChaos::blockerOnSync");
debugSetFailAt(getEndpointById(server.id), "TransactionChaos::randomSleep");
debugSetFailAt(getEndpointById(server.id), "TransactionChaos::randomSync");
debugSetFailAt(getEndpointById(server.id), "RocksDBMetaCollection::forceSerialization");
}
}
},

tearDown: function () {
Expand All @@ -178,7 +188,7 @@ function BaseChaosSuite(testOpts) {
const docs = () => {
let result = [];
const max = 2000;
const r = Math.floor(Math.random() * max) + 1;
let r = Math.floor(Math.random() * max) + 1;
if (r > (max * 0.8)) {
// we want ~20% of all requests to be single document operations
r = 1;
Expand Down Expand Up @@ -291,7 +301,7 @@ function BaseChaosSuite(testOpts) {
}

// truncate is disabled because it does not work reliably ATM
const params = ["IntermediateCommits", "FailurePoints", /*"Truncate",*/ "VaryingOverwriteMode", "StreamingTransactions"];
const params = ["IntermediateCommits", "FailurePoints", "Delays", /*"Truncate",*/ "VaryingOverwriteMode", "StreamingTransactions"];

const makeConfig = (paramValues) => {
let suffix = "";
Expand Down
0