10000 Fix invalid assertion and always remove blocker object on replication failure by jsteemann · Pull Request #14498 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Fix invalid assertion and always remove blocker object on replication failure #14498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion arangod/Replication/DatabaseInitialSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,10 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
auto context = arangodb::transaction::StandaloneContext::Create(coll->vocbase());
TransactionId blockerId = context->generateId();
physical->placeRevisionTreeBlocker(blockerId);

auto blockerGuard = scopeGuard([&] { // remove blocker afterwards
physical->removeRevisionTreeBlocker(blockerId);
});
std::unique_ptr<arangodb::SingleCollectionTransaction> trx;
transaction::Options options;
TRI_IF_FAILURE("IncrementalReplicationFrequentIntermediateCommit") {
Expand Down Expand Up @@ -1416,7 +1420,9 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
guard.fire();
return fetchCollectionSyncByKeys(coll, leaderColl, maxTick);
}
physical->removeRevisionTreeBlocker(blockerId);
// make sure revision tree blocker is removed
blockerGuard.fire();

std::vector<std::pair<std::uint64_t, std::uint64_t>> ranges =
treeLeader->diff(*treeLocal);
if (ranges.empty()) {
Expand Down
44 changes: 34 additions & 10 deletions arangod/RocksDBEngine/RocksDBMetaCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,10 @@ void RocksDBMetaCollection::bufferUpdates(rocksdb::SequenceNumber seq,
.inRecovery());
return;
}


TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

TRI_ASSERT(!inserts.empty() || !removals.empty());

Expand Down Expand Up @@ -939,12 +942,25 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
}
}
};

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

std::unique_lock<std::mutex> guard(_revisionBufferLock);

auto insertIt = _revisionInsertBuffers.begin();
auto removeIt = _revisionRemovalBuffers.begin();

auto checkIterators = [&]() {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() ||
_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq);
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() ||
_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq);
#endif
};

auto it = _revisionTruncateBuffer.begin(); // sorted ASC

{
Expand All @@ -968,19 +984,17 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
while (removeIt != _revisionRemovalBuffers.end() && removeIt->first <= ignoreSeq) {
removeIt = _revisionRemovalBuffers.erase(removeIt);
}

TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() || (insertIt == _revisionInsertBuffers.end() && (_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq)));
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() || (removeIt == _revisionRemovalBuffers.end() && (_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq)));

checkIterators();

// we can clear the revision tree without holding the mutex here
guard.unlock();
// clear out any revision structure, now empty
_revisionTree->clear();

guard.lock();

TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() || (insertIt == _revisionInsertBuffers.end() && (_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq)));
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() || (removeIt == _revisionRemovalBuffers.end() && (_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq)));

checkIterators();

// we have applied all changes up to here
bumpSequence(ignoreSeq);
Expand All @@ -990,10 +1004,9 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// still holding the mutex here

while (true) {
// find out if we still have buffers to apply
TRI_ASSERT(insertIt == _revisionInsertBuffers.begin() || (insertIt == _revisionInsertBuffers.end() && (_revisionInsertBuffers.empty() || _revisionInsertBuffers.begin()->first > commitSeq)));
TRI_ASSERT(removeIt == _revisionRemovalBuffers.begin() || (removeIt == _revisionRemovalBuffers.end() && (_revisionRemovalBuffers.empty() || _revisionRemovalBuffers.begin()->first > commitSeq)));
checkIterators();

// find out if we still have buffers to apply
bool haveInserts = insertIt != _revisionInsertBuffers.end() &&
insertIt->first <= commitSeq;
bool haveRemovals = removeIt != _revisionRemovalBuffers.end() &&
Expand All @@ -1006,6 +1019,9 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// no inserts or removals left to apply, drop out of loop
if (!applyInserts && !applyRemovals) {
// we have applied all changes up to including commitSeq
TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}
bumpSequence(commitSeq);
return;
}
Expand Down Expand Up @@ -1048,6 +1064,10 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// it is safe to retry it next time.
throw;
}

D7AE TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

// move iterator forward, we need the mutex for this
guard.lock();
Expand Down Expand Up @@ -1087,6 +1107,10 @@ void RocksDBMetaCollection::applyUpdates(rocksdb::SequenceNumber commitSeq) {
// if an exception escapes from here, the same remove will be retried next time.
throw;
}

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

// move iterator forward, we need the mutex for this
guard.lock();
Expand Down
43 changes: 38 additions & 5 deletions arangod/RocksDBEngine/RocksDBMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Basics/system-compiler.h"
#include "Random/RandomGenerator.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBColumnFamilyManager.h"
#include "RocksDBEngine/RocksDBCuckooIndexEstimator.h"
Expand All @@ -41,9 +42,13 @@
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBSettingsManager.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "Transaction/Context.h"
#include "VocBase/KeyGenerator.h"
#include "VocBase/LogicalCollection.h"

#include <chrono>
#include <thread>

using namespace arangodb;

RocksDBMetadata::DocCount::DocCount(VPackSlice const& slice)
Expand Down Expand Up @@ -112,6 +117,7 @@ Result RocksDBMetadata::placeBlocker(TransactionId trxId, rocksdb::SequenceNumbe
try {
auto crosslist = _blockersBySeq.emplace(seq, trxId);
if (!crosslist.second) {
_blockers.erase(trxId);
return res.reset(TRI_ERROR_INTERNAL);
}
LOG_TOPIC("1587a", TRACE, Logger::ENGINES)
Expand Down Expand Up @@ -196,14 +202,16 @@ void RocksDBMetadata::removeBlocker(TransactionId trxId) {
bool RocksDBMetadata::hasBlockerUpTo(rocksdb::SequenceNumber seq) const {
READ_LOCKER(locker, _blockerLock);

auto it = _blockersBySeq.lower_bound(std::make_pair(seq, TransactionId(0)));
if (it == _blockersBySeq.end()) {
if (_blockersBySeq.empty()) {
return false;
}
// here, it->first can only be greater or equal to seq, but anyway.
return it->first <= seq;
}

// _blockersBySeq is sorted by sequence number first, then transaction id
// if the seq no if the first item is already less equal to our search
// value, we can abort the search. all following items in _blockersBySeq
// will only have the same or higher sequence numbers.
return _blockersBySeq.begin()->first <= seq;
}

/// @brief returns the largest safe seq to squash updates against
rocksdb::SequenceNumber RocksDBMetadata::committableSeq(rocksdb::SequenceNumber maxCommitSeq) const {
Expand Down Expand Up @@ -335,9 +343,34 @@ Result RocksDBMetadata::serializeMeta(rocksdb::WriteBatch& batch,
}

const rocksdb::SequenceNumber maxCommitSeq = committableSeq(appliedSeq);

#ifdef ARANGODB_ENABLE_FAILURE_TESTS
// simulate another transaction coming along and trying to commit while
// we are serializing
TransactionId trxId = TransactionId::none();

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
auto& selector = coll.vocbase().server().getFeature<EngineSelectorFeature>();
auto& engine = selector.engine<RocksDBEngine>();
auto blockerSeq = engine.db()->GetLatestSequenceNumber();
trxId = TransactionId(transaction::Context::makeTransactionId());
placeBlocker(trxId, blockerSeq);
}
auto blockerGuard = scopeGuard([&] { // remove blocker afterwards
if (trxId.isSet()) {
removeBlocker(trxId);
}
});
#endif

TRI_ASSERT(maxCommitSeq <= appliedSeq);
TRI_ASSERT(maxCommitSeq != UINT64_MAX);
TRI_ASSERT(maxCommitSeq > 0);

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

bool didWork = applyAdjustments(maxCommitSeq);
appliedSeq = maxCommitSeq;

Expand Down
3 changes: 3 additions & 0 deletions arangod/RocksDBEngine/RocksDBReplicationContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ RocksDBReplicationContext::DumpResult RocksDBReplicationContext::dumpJson(
<< vocbase.name() << "/" << cIter->logical->name()
<< ", an offet of " << adjustment << " will be applied";
auto adjustSeq = _engine.db()->GetLatestSequenceNumber();
TRI_ASSERT(adjustSeq >= blockerSeq);
if (adjustSeq <= blockerSeq) {
adjustSeq = ::forceWrite(_engine);
TRI_ASSERT(adjustSeq > blockerSeq);
Expand Down Expand Up @@ -476,6 +477,7 @@ RocksDBReplicationContext::DumpResult RocksDBReplicationContext::dumpVPack(
<< vocbase.name() << "/" << cIter->logical->name()
<< ", an offet of " << adjustment << " will 10000 be applied";
auto adjustSeq = _engine.db()->GetLatestSequenceNumber();
TRI_ASSERT(adjustSeq >= blockerSeq);
if (adjustSeq <= blockerSeq) {
adjustSeq = ::forceWrite(_engine);
TRI_ASSERT(adjustSeq > blockerSeq);
Expand Down Expand Up @@ -615,6 +617,7 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(TRI_vocbase_t& vocbase
<< vocbase.name() << "/" << cIter->logical->name()
<< ", an offet of " << adjustment << " will be applied";
auto adjustSeq = _engine.db()->GetLatestSequenceNumber();
TRI_ASSERT(adjustSeq >= blockerSeq);
if (adjustSeq <= blockerSeq) {
adjustSeq = ::forceWrite(_engine);
TRI_ASSERT(adjustSeq > blockerSeq);
Expand Down
5 changes: 5 additions & 0 deletions arangod/RocksDBEngine/RocksDBTransactionCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ void RocksDBTransactionCollection::addOperation(TRI_voc_document_operation_e ope
void RocksDBTransactionCollection::prepareTransaction(TransactionId trxId, uint64_t beginSeq) {
TRI_ASSERT(_collection != nullptr);
if (hasOperations() || !_trackedOperations.empty() || !_trackedIndexOperations.empty()) {

TRI_IF_FAILURE("TransactionChaos::randomSleep") {
std::this_thread::sleep_for(std::chrono::milliseconds(RandomGenerator::interval(uint32_t(5))));
}

auto* coll = static_cast<RocksDBMetaCollection*>(_collection->getPhysical());
TRI_ASSERT(beginSeq > 0);
coll->meta().placeBlocker(trxId, beginSeq);
Expand Down
14 changes: 14 additions & 0 deletions arangod/RocksDBEngine/RocksDBTransactionState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Random/RandomGenerator.h"
#include "RestServer/MetricsFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBLogValue.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBSettingsManager.h"
#include "RocksDBEngine/RocksDBSyncThread.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
#include "Statistics/ServerStatistics.h"
Expand Down Expand Up @@ -247,6 +249,7 @@ void RocksDBTransactionState::prepareCollections() {
auto& engine = selector.engine<RocksDBEngine>();
rocksdb::TransactionDB* db = engine.db();
rocksdb::SequenceNumber preSeq = db->GetLatestSequenceNumber();

for (auto& trxColl : _collections) {
auto* coll = static_cast<RocksDBTransactionCollection*>(trxColl);
coll->prepareTransaction(id(), preSeq);
Expand Down Expand Up @@ -355,6 +358,17 @@ arangodb::Result RocksDBTransactionState::internalCommit() {

prepareCollections();

TRI_IF_FAILURE("TransactionChaos::randomSync") {
if (RandomGenerator::interval(uint32_t(1000)) > 950) {
auto& selector = vocbase().server().getFeature<EngineSelectorFeature>();
auto& engine = selector.engine<RocksDBEngine>();
auto* sm = engine.settingsManager();
if (sm) {
sm->sync(true); // force
}
}
}

// if we fail during commit, make sure we remove blockers, etc.
auto cleanupCollTrx = scopeGuard([this]() { cleanupCollections(); });

Expand Down
103 changes: 103 additions & 0 deletions js/client/modules/@arangodb/testsuites/chaos.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/* jshint strict: false, sub: true */
/* global */
'use strict';

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2021 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License")
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Manuel Pöter
////////////////////////////////////////////////////////////////////////////////

const functionsDocumentation = {
'chaos': 'chaos tests'
};
const optionsDocumentation = [];

const _ = require('lodash');
const tu = require('@arangodb/testutils/test-utils');

const testPaths = {
'chaos': [ tu.pathForTesting('client/chaos') ],
};

function chaos (options) {
let testCasesWithConfigs = {};
let testCases = tu.scanTestPaths(testPaths.chaos, options);

// The chaos test suite is parameterized and each configuration runs 5min.
// For the nightly tests we want to run a large number of possible parameter
// combinations, but each file has a runtime limit of 15min and ATM the test
// system is not designed to allow a test file to be run multiple times with
// different configurations.
// The hacky solution here is to build up a map of test cases with their respective
// configurations and have n copies of the test file in the test case list,
// where n is the number of configurations for this test. The new `preRun`
// callback function is called before each testCase execution. At that point we
// pop the next config from this test case's config list and set the global
// variable `currentTestConfig` which is then used in the tests.
// To allow the test cases to define the possible configs I introduced the
// possibility to write test cases as modules. A jsunity test file that contains
// "test-module-" in its filename is not executed directly, but instead must
// export a "run" function that is called. Such a module can optionally define
// a function "getConfigs" which must return an array of configurations.

testCases = _.flatMap(testCases, testCase => {
if (testCase.includes("test-module-")) {
const configProvider = require(testCase).getConfigs;
if (configProvider) {
const configs = configProvider();
if (!Array.isArray(configs)) {
throw "test case module " + testCase + " does not provide config list";
}
testCasesWithConfigs[testCase] = configs;
return Array(configs.length).fill(testCase);
}
}
return testCase;
});

testCases = tu.splitBuckets(options, testCases);

let handlers = {
preRun: (test) => {
global.currentTestConfig = undefined;
const configs = testCasesWithConfigs[test];
if (Array.isArray(configs)) {
if (configs.length === 0) {
throw "Unexpected! Have no more configs for test case " + test;
}
global.currentTestConfig = configs.shift();
}
}
};

return tu.performTests(options, testCases, 'chaos', tu.runInLocalArangosh, {}, handlers);
}

exports.setup = function (testFns, defaultFns, opts, fnDocs, optionsDoc, allTestPaths) {
Object.assign(allTestPaths, testPaths);
testFns['chaos'] = chaos;

// intentionall 4E5B y not turned on by default, as the suite may take a lot of time
// defaultFns.push('chaos');

for (var attrname in functionsDocumentation) { fnDocs[attrname] = functionsDocumentation[attrname]; }
for (var i = 0; i < optionsDocumentation.length; i++) { optionsDoc.push(optionsDocumentation[i]); }
};
Loading
0