From e8e551b36b614bacf859b8c6fee7dd9fc621fa27 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 25 Jun 2021 14:33:35 +0200 Subject: [PATCH 01/14] First stab at following term id. --- arangod/Cluster/FollowerInfo.cpp | 23 ++++++++++++ arangod/Cluster/FollowerInfo.h | 36 +++++++++++++++++++ arangod/Cluster/SynchronizeShard.cpp | 26 ++++++++++---- arangod/Cluster/SynchronizeShard.h | 1 + .../RestHandler/RestReplicationHandler.cpp | 4 +++ arangod/Transaction/Methods.cpp | 26 ++++++++++---- arangod/Transaction/Methods.h | 4 ++- lib/Basics/StaticStrings.cpp | 1 + lib/Basics/StaticStrings.h | 1 + 9 files changed, 108 insertions(+), 14 deletions(-) diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index 16c7f0c2eb1c..9532d3d5ce0c 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -33,6 +33,7 @@ #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" +#include "Random/RandomGenerator.h" #include "VocBase/LogicalCollection.h" using namespace arangodb; @@ -542,3 +543,25 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const { } return newValue; } + +uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { + uint64_t i = RandomGenerator::interval(UINT64_MAX); + try { + _followingTermId[s] = i; + } catch(std::bad_alloc const& exc) { + i = 0; // I assume here that I do not get bad_alloc if the key is + // already in the map, since it then only has to overwrite + // an integer, if the key is not in the map, we default to 0. + } + return i; +} + +uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) noexcept { + // Note that we assume that find() does not throw! + auto it = _followingTermId.find(s); + if (it == _followingTermId.end()) { + return 0; + } + return it->second; +} + diff --git a/arangod/Cluster/FollowerInfo.h b/arangod/Cluster/FollowerInfo.h index 507e79ab2c36..dc475096ddc8 100644 --- a/arangod/Cluster/FollowerInfo.h +++ b/arangod/Cluster/FollowerInfo.h @@ -54,6 +54,20 @@ class FollowerInfo { // soon as we can guarantee at least so many followers locally. std::shared_ptr> _failoverCandidates; + // The following map holds a random number for each follower, this + // random number is sent given to the follower when it gets in sync + // (actually, when it acquires the hard lock to get in sync), and is + // then subsequently sent alongside every synchronous replication + // request. If the number does not match, the follower will refuse the + // replication request. This is to ensure that replication requests cannot + // be delayed into the "next" leader/follower relationship. + // And here is the proof that this works: The exclusive lock divides the + // write operations between "before" and "after". The id is changed + // when the exclusive lock is established. Therefore, it is OK for the + // replication requests to send along the "current" id, directly + // from this map. + std::unordered_map _followingTermId; + // The agencyMutex is used to synchronise access to the agency. // the _dataLock is used to sync the access to local data. // The _canWriteLock is used to protect flag if we do have enough followers @@ -135,6 +149,28 @@ class FollowerInfo { Result remove(ServerID const& s); + ////////////////////////////////////////////////////////////////////////////// + /// @brief for each run of the "get-in-sync" protocol we generate a + /// random number to identify this "following term". This is created + /// when the follower fetches the exclusive lock to finally get in sync + /// and is stored in _followingTermId, so that it can be forwarded with + /// each synchronous replication request. The follower can then decline + /// the replication in case it is not "in the same term". + ////////////////////////////////////////////////////////////////////////////// + + uint64_t newFollowingTermId(ServerID const& s) noexcept; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief for each run of the "get-in-sync" protocol we generate a + /// random number to identify this "following term". This is created + /// when the follower fetches the exclusive lock to finally get in sync + /// and is stored in _followingTermId, so that it can be forwarded with + /// each synchronous replication request. The follower can then decline + /// the replication in case it is not "in the same term". + ////////////////////////////////////////////////////////////////////////////// + + uint64_t getFollowingTermId(ServerID const& s) noexcept; + ////////////////////////////////////////////////////////////////////////////// /// @brief clear follower list, no changes in agency necesary ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index c0c4a7ffe7ed..5e09fda6b8ed 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -92,7 +92,8 @@ using namespace std::chrono; SynchronizeShard::SynchronizeShard(MaintenanceFeature& feature, ActionDescription const& desc) : ActionBase(feature, desc), ShardDefinition(desc.get(DATABASE), desc.get(SHARD)), - _leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()) { + _leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()), + _followingTermId(0) { std::stringstream error; if (!desc.has(COLLECTION)) { @@ -416,7 +417,7 @@ arangodb::Result SynchronizeShard::getReadLock( // nullptr only happens during controlled shutdown if (pool == nullptr) { return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, - "cancelReadLockOnLeader: Shutting down"); + "getReadLock: Shutting down"); } VPackBuilder body; @@ -445,6 +446,16 @@ arangodb::Result SynchronizeShard::getReadLock( if (res.ok()) { // Habemus clausum, we have a lock + if (!soft) { + // Now store the random followingTermId: + VPackSlice body = response.response().slice(); + if (body.isObject()) { + VPackSlice followingTermIdSlice = body.get(StaticStrings::FollowingTermId); + if (followingTermIdSlice.isNumber()) { + _followingTermId = followingTermIdSlice.getNumber(); + } + } + } return arangodb::Result(); } @@ -913,10 +924,6 @@ bool SynchronizeShard::first() { return false; } - // This is necessary to accept replications from the leader which can - // happen as soon as we are in sync. - collection->followers()->setTheLeader(leader); - startTime = system_clock::now(); VPackBuilder config; @@ -1169,6 +1176,13 @@ Result SynchronizeShard::catchupWithExclusiveLock( } }); + // Now we have got a unique id for this following term and have stored it + // in _followingTermId, so we can use it to set the leader: + + // This is necessary to accept replications from the leader which can + // happen as soon as we are in sync. + collection.followers()->setTheLeader(leader + "_" + basics::StringUtils::itoa(_followingTermId)); + LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; builder.clear(); diff --git a/arangod/Cluster/SynchronizeShard.h b/arangod/Cluster/SynchronizeShard.h index 6bd413db9e81..f321295ee5bc 100644 --- a/arangod/Cluster/SynchronizeShard.h +++ b/arangod/Cluster/SynchronizeShard.h @@ -91,6 +91,7 @@ class SynchronizeShard : public ActionBase, public ShardDefinition { /// @brief information about the leader, reused across multiple replication steps arangodb::replutils::LeaderInfo _leaderInfo; + uint64_t _followingTermId; }; } // namespace maintenance diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 854036189a41..a432301c5fa9 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2708,6 +2708,10 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() { { VPackObjectBuilder bb(&b); b.add(StaticStrings::Error, VPackValue(false)); + if (!serverId.empty()) { + b.add(StaticStrings::FollowingTermId, + VPackValue(col->followers()->newFollowingTermId(serverId))); + } } LOG_TOPIC("61a9d", DEBUG, Logger::REPLICATION) diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 22d5d54edfcb..5e27da5984c8 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -1221,7 +1221,8 @@ Future transaction::Methods::insertLocal(std::string const& cna // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, value, - TRI_VOC_DOCUMENT_OPERATION_INSERT, resDocs) + TRI_VOC_DOCUMENT_OPERATION_INSERT, resDocs, + *collection->followers()) .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) mutable { if (!res.ok()) { return OperationResult{std::move(res), options}; @@ -1493,7 +1494,7 @@ Future transaction::Methods::modifyLocal(std::string const& col // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, newValue, - operation, resDocs) + operation, resDocs, *collection->followers()) .thenValue([options, errs = std::move(errorCounter), resDocs](Result&& res) mutable { if (!res.ok()) { return OperationResult{std::move(res), options}; @@ -1704,7 +1705,8 @@ Future transaction::Methods::removeLocal(std::string const& col // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, value, - TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs) + TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs, + *collection->followers()) .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) mutable { if (!res.ok()) { return OperationResult{std::move(res), options}; @@ -1892,10 +1894,15 @@ Future transaction::Methods::truncateLocal(std::string const& c network::RequestOptions reqOpts; reqOpts.database = vocbase().name(); reqOpts.timeout = network::Timeout(600); - reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId()); reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false")); for (auto const& f : *followers) { + reqOpts.param(StaticStrings::IsSynchronousReplicationString, + ServerState::instance()->getId() + "_" + + basics::StringUtils::itoa( + collection->followers()->getFollowingTermId(f))); + // reqOpts is copied deep in sendRequestRetry, so we are OK to + // change it in the loop! network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); auto future = network::sendRequestRetry(pool, "server:" + f, fuerte::RestVerb::Put, @@ -2311,7 +2318,8 @@ Future Methods::replicateOperations( std::shared_ptr> const& followerList, OperationOptions const& options, VPackSlice const value, TRI_voc_document_operation_e const operation, - std::shared_ptr> const& ops) { + std::shared_ptr> const& ops, + FollowerInfo& followerInfo) { TRI_ASSERT(followerList != nullptr); if (followerList->empty()) { @@ -2323,8 +2331,6 @@ Future Methods::replicateOperations( network::RequestOptions reqOpts; reqOpts.database = vocbase().name(); reqOpts.param(StaticStrings::IsRestoreString, "true"); - reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId()); - std::string url = "/_api/document/"; url.append(arangodb::basics::StringUtils::urlEncode(collection->name())); if (operation != TRI_VOC_DOCUMENT_OPERATION_INSERT && !value.isArray()) { @@ -2417,6 +2423,12 @@ Future Methods::replicateOperations( auto* pool = vocbase().server().getFeature().pool(); for (auto const& f : *followerList) { + reqOpts.param(StaticStrings::IsSynchronousReplicationString, + ServerState::instance()->getId() + "_" + + basics::StringUtils::itoa( + collection->followers()->getFollowingTermId(f))); + // reqOpts is copied deep in sendRequestRetry, so we are OK to + // change it in the loop! network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); futures.emplace_back(network::sendRequestRetry(pool, "server:" + f, requestType, diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index 69cc9c45c546..e570db22730b 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -27,6 +27,7 @@ #include "Basics/Common.h" #include "Basics/Exceptions.h" #include "Basics/Result.h" +#include "Cluster/FollowerInfo.h" #include "Futures/Future.h" #include "Indexes/IndexIterator.h" #include "Rest/CommonDefines.h" @@ -492,7 +493,8 @@ class Methods { LogicalCollection* collection, std::shared_ptr> const& followers, OperationOptions const& options, VPackSlice value, TRI_voc_document_operation_e operation, - std::shared_ptr> const& ops); + std::shared_ptr> const& ops, + FollowerInfo& followerInfo); private: /// @brief transaction hints diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index 419fefb1fe8a..3925922ed5bb 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -336,6 +336,7 @@ std::string const StaticStrings::RevisionTreeInitialRangeMin("initialRangeMin"); std::string const StaticStrings::RevisionTreeRanges("ranges"); std::string const StaticStrings::RevisionTreeResume("resume"); std::string const StaticStrings::RevisionTreeVersion("version"); +std::string const StaticStrings::FollowingTermId("followingTermId"); // Generic attribute names std::string const StaticStrings::AttrCoordinator("coordinator"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index a1b640d3d7f0..69950603065c 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -312,6 +312,7 @@ class StaticStrings { static std::string const RevisionTreeRanges; static std::string const RevisionTreeResume; static std::string const RevisionTreeVersion; + static std::string const FollowingTermId; // generic attribute names static std::string const AttrCoordinator; From 79fbdf4fbee387330dfb137a953d38dcaf5bfda9 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 25 Jun 2021 16:04:01 +0200 Subject: [PATCH 02/14] Also handle the planned leader change case. --- arangod/Cluster/FollowerInfo.cpp | 11 +- arangod/Cluster/Maintenance.cpp | 1 - arangod/Cluster/SynchronizeShard.cpp | 7 +- arangod/Cluster/TakeoverShardLeadership.cpp | 138 +++++++----------- .../RestHandler/RestReplicationHandler.cpp | 11 +- tests/Maintenance/MaintenanceTest.cpp | 4 - 6 files changed, 79 insertions(+), 93 deletions(-) diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index 9532d3d5ce0c..e4833cd57beb 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -545,11 +545,15 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const { } uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { - uint64_t i = RandomGenerator::interval(UINT64_MAX); + uint64_t i = 0; + // We want the random number to be non-zero: + do { + i = RandomGenerator::interval(UINT64_MAX); + } while (i == 0); try { _followingTermId[s] = i; } catch(std::bad_alloc const& exc) { - i = 0; // I assume here that I do not get bad_alloc if the key is + i = 1; // I assume here that I do not get bad_alloc if the key is // already in the map, since it then only has to overwrite // an integer, if the key is not in the map, we default to 0. } @@ -560,7 +564,8 @@ uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) noexcept { // Note that we assume that find() does not throw! auto it = _followingTermId.find(s); if (it == _followingTermId.end()) { - return 0; + // If not found, we use the default from above: + return 1; } return it->second; } diff --git a/arangod/Cluster/Maintenance.cpp b/arangod/Cluster/Maintenance.cpp index 77b78902e02e..4b48930d6183 100644 --- a/arangod/Cluster/Maintenance.cpp +++ b/arangod/Cluster/Maintenance.cpp @@ -320,7 +320,6 @@ void handlePlanShard(StorageEngine& engine, uint64_t planIndex, VPackSlice const {DATABASE, dbname}, {COLLECTION, colname}, {SHARD, shname}, - {THE_LEADER, std::string()}, {LOCAL_LEADER, std::string(localLeader)}, {OLD_CURRENT_COUNTER, "0"}, // legacy, no longer used {PLAN_RAFT_INDEX, std::to_string(planIndex)}}, diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 5e09fda6b8ed..93739c16cc58 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -1181,8 +1181,13 @@ Result SynchronizeShard::catchupWithExclusiveLock( // This is necessary to accept replications from the leader which can // happen as soon as we are in sync. - collection.followers()->setTheLeader(leader + "_" + basics::StringUtils::itoa(_followingTermId)); + if (_followingTermId != 0) { + collection.followers()->setTheLeader(leader + "_" + basics::StringUtils::itoa(_followingTermId)); + } else { + // This is the case for a leader before the upgrade, we tolerate this: + collection.followers()->setTheLeader(leader); + } LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; builder.clear(); diff --git a/arangod/Cluster/TakeoverShardLeadership.cpp b/arangod/Cluster/TakeoverShardLeadership.cpp index 6797fc5d932d..10a732e11d9f 100644 --- a/arangod/Cluster/TakeoverShardLeadership.cpp +++ b/arangod/Cluster/TakeoverShardLeadership.cpp @@ -86,11 +86,6 @@ TakeoverShardLeadership::TakeoverShardLeadership(MaintenanceFeature& feature, error << "database and shard must be specified. "; } - if (!desc.has(THE_LEADER)) { - error << "leader must be specified. "; - } - TRI_ASSERT(desc.has(THE_LEADER)); - if (!desc.has(LOCAL_LEADER)) { error << "local leader must be specified. "; } @@ -112,7 +107,7 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, std::vector const& currentServers, std::shared_ptr>& realInsyncFollowers, std::string const& databaseName, - ShardID const& shardID, std::string const& oldLeader) { + LogicalCollection& shard, std::string const& oldLeader) { if (pool == nullptr) { // nullptr happens only during controlled shutdown return; @@ -120,15 +115,6 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, std::string const& sid = ServerState::instance()->getId(); - VPackBufferUInt8 buffer; - VPackBuilder bodyBuilder(buffer); - { - VPackObjectBuilder ob(&bodyBuilder); - bodyBuilder.add("leaderId", VPackValue(sid)); - bodyBuilder.add("oldLeaderId", VPackValue(oldLeader)); - bodyBuilder.add("shard", VPackValue(shardID)); - } - network::RequestOptions options; options.database = databaseName; options.timeout = network::Timeout(3.0); @@ -143,6 +129,17 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, if (srv == sid) { continue; // ignore ourself } + uint64_t followingTermId = shard.followers()->newFollowingTermId(srv); + VPackBufferUInt8 buffer; + VPackBuilder bodyBuilder(buffer); + { + VPackObjectBuilder ob(&bodyBuilder); + bodyBuilder.add("leaderId", VPackValue(sid)); + bodyBuilder.add("oldLeaderId", VPackValue(oldLeader)); + bodyBuilder.add("shard", VPackValue(shard.name())); + bodyBuilder.add(StaticStrings::FollowingTermId, VPackValue(followingTermId)); + } + LOG_TOPIC("42516", DEBUG, Logger::MAINTENANCE) << "Sending " << bodyBuilder.toJson() << " to " << srv; auto f = network::sendRequest(pool, "server:" + srv, fuerte::RestVerb::Put, @@ -165,78 +162,54 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, } static void handleLeadership(uint64_t planIndex, LogicalCollection& collection, - std::string const& localLeader, std::string const& plannedLeader, + std::string const& localLeader, std::string const& databaseName, MaintenanceFeature& feature) { auto& followers = collection.followers(); - if (plannedLeader.empty()) { // Planned to lead - if (!localLeader.empty()) { // We were not leader, assume leadership - LOG_TOPIC("5632f", DEBUG, Logger::MAINTENANCE) - << "handling leadership of shard '" << databaseName << "/" - << collection.name() << ": becoming leader"; - - auto& ci = collection.vocbase().server().getFeature().clusterInfo(); - // This will block the thread until our ClusterInfo cache fetched a - // Current version in background thread which is at least as new as the - // Plan which brought us here. This is important for the assertion - // below where we check that we are in the list of failoverCandidates! - ci.waitForCurrent(planIndex); - auto currentInfo = - ci.getCollectionCurrent(databaseName, - std::to_string(collection.planId().id())); - if (currentInfo == nullptr) { - // Collection has been dropped. we cannot continue here. - return; - } - TRI_ASSERT(currentInfo != nullptr); - std::vector currentServers = currentInfo->servers(collection.name()); - std::shared_ptr> realInsyncFollowers; - - if (!currentServers.empty()) { - std::string& oldLeader = currentServers[0]; - // Check if the old leader has resigned and stopped all write - // (if so, we can assume that all servers are still in sync) - if (!oldLeader.empty() && oldLeader[0] == '_') { - // remove the underscore from the list as it is useless anyway - oldLeader = oldLeader.substr(1); - - // Update all follower and tell them that we are the leader now - NetworkFeature& nf = - collection.vocbase().server().getFeature(); - network::ConnectionPool* pool = nf.pool(); - sendLeaderChangeRequests(pool, currentServers, realInsyncFollowers, - databaseName, collection.name(), oldLeader); - } - } - - std::vector failoverCandidates = - currentInfo->failoverCandidates(collection.name()); - followers->takeOverLeadership(failoverCandidates, realInsyncFollowers); - transaction::cluster::abortFollowerTransactionsOnShard(collection.id()); + if (!localLeader.empty()) { // We were not leader, assume leadership + LOG_TOPIC("5632f", DEBUG, Logger::MAINTENANCE) + << "handling leadership of shard '" << databaseName << "/" + << collection.name() << ": becoming leader"; + + auto& ci = collection.vocbase().server().getFeature().clusterInfo(); + // This will block the thread until our ClusterInfo cache fetched a + // Current version in background thread which is at least as new as the + // Plan which brought us here. This is important for the assertion + // below where we check that we are in the list of failoverCandidates! + ci.waitForCurrent(planIndex); + auto currentInfo = + ci.getCollectionCurrent(databaseName, + std::to_string(collection.planId().id())); + if (currentInfo == nullptr) { + // Collection has been dropped. we cannot continue here. + return; } - } else { // Planned to follow - if (localLeader.empty() || localLeader == LEADER_NOT_YET_KNOWN) { - // Note that the following does not delete the follower list - // and that this is crucial, because in the planned leader - // resign case, updateCurrentForCollections will report the - // resignation together with the old in-sync list to the - // agency. If this list would be empty, then the supervision - // would be very angry with us! - - LOG_TOPIC("5632e", DEBUG, Logger::MAINTENANCE) - << "handling leadership of shard '" << databaseName << "/" - << collection.name() << ": following " << plannedLeader; - - followers->setTheLeader(plannedLeader); - transaction::cluster::abortLeaderTransactionsOnShard(collection.id()); + TRI_ASSERT(currentInfo != nullptr); + std::vector currentServers = currentInfo->servers(collection.name()); + std::shared_ptr> realInsyncFollowers; + + if (!currentServers.empty()) { + std::string& oldLeader = currentServers[0]; + // Check if the old leader has resigned and stopped all write + // (if so, we can assume that all servers are still in sync) + if (!oldLeader.empty() && oldLeader[0] == '_') { + // remove the underscore from the list as it is useless anyway + oldLeader = oldLeader.substr(1); + + // Update all follower and tell them that we are the leader now + NetworkFeature& nf = + collection.vocbase().server().getFeature(); + network::ConnectionPool* pool = nf.pool(); + sendLeaderChangeRequests(pool, currentServers, realInsyncFollowers, + databaseName, collection, oldLeader); + } } - // Note that if we have been a follower to some leader - // we do not immediately adjust the leader here, even if - // the planned leader differs from what we have set locally. - // The setting must only be adjusted once we have - // synchronized with the new leader and negotiated - // a leader/follower relationship! + + std::vector failoverCandidates = + currentInfo->failoverCandidates(collection.name()); + followers->takeOverLeadership(failoverCandidates, realInsyncFollowers); + transaction::cluster::abortFollowerTransactionsOnShard(collection.id()); } } @@ -244,7 +217,6 @@ bool TakeoverShardLeadership::first() { std::string const& database = getDatabase(); std::string const& collection = _description.get(COLLECTION); std::string const& shard = getShard(); - std::string const& plannedLeader = _description.get(THE_LEADER); std::string const& localLeader = _description.get(LOCAL_LEADER); std::string const& planRaftIndex = _description.get(PLAN_RAFT_INDEX); uint64_t planIndex = basics::StringUtils::uint64(planRaftIndex); @@ -265,7 +237,7 @@ bool TakeoverShardLeadership::first() { // resignation case is not handled here, since then // ourselves does not appear in shards[shard] but only // "_" + ourselves. - handleLeadership(planIndex, *coll, localLeader, plannedLeader, vocbase.name(), + handleLeadership(planIndex, *coll, localLeader, vocbase.name(), feature()); } else { std::stringstream error; diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index a432301c5fa9..03abe9f086c1 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2561,6 +2561,10 @@ void RestReplicationHandler::handleCommandSetTheLeader() { VPackSlice const leaderIdSlice = body.get("leaderId"); VPackSlice const oldLeaderIdSlice = body.get("oldLeaderId"); VPackSlice const shard = body.get("shard"); + VPackSlice const followingTermId = body.get(StaticStrings::FollowingTermId); + // Note that we tolerate if followingTermId is not present or not a number + // for upgrade scenarios. If the new leader does not send it, it will also + // not send it in the option IsSynchronousReplication. if (!leaderIdSlice.isString() || !shard.isString() || !oldLeaderIdSlice.isString()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "'leaderId' and 'shard' attributes must be strings"); @@ -2594,7 +2598,12 @@ void RestReplicationHandler::handleCommandSetTheLeader() { return; } - col->followers()->setTheLeader(leaderId); + if (followingTermId.isNumber()) { + col->followers()->setTheLeader(leaderId + "_" + + StringUtils::itoa(followingTermId.getNumber())); + } else { + col->followers()->setTheLeader(leaderId); + } } VPackBuilder b; diff --git a/tests/Maintenance/MaintenanceTest.cpp b/tests/Maintenance/MaintenanceTest.cpp index c43e0f18b57e..15a6da4c31de 100644 --- a/tests/Maintenance/MaintenanceTest.cpp +++ b/tests/Maintenance/MaintenanceTest.cpp @@ -740,7 +740,6 @@ class MaintenanceTestActionPhaseOne : public SharedMaintenanceTest { ASSERT_TRUE(action.has(DATABASE)); ASSERT_TRUE(action.has(COLLECTION)); ASSERT_TRUE(action.has(SHARD)); - ASSERT_TRUE(action.has(THE_LEADER)); ASSERT_TRUE(action.has(LOCAL_LEADER)); ASSERT_TRUE(action.has(PLAN_RAFT_INDEX)); ASSERT_EQ(action.get(DATABASE), dbName); @@ -1285,7 +1284,6 @@ TEST_F(MaintenanceTestActionPhaseOne, leader_behaviour_plan_self_local_other) { auto shardName = action->get(SHARD); auto removed = relevantShards.erase(shardName); EXPECT_EQ(removed, 1) << "We created a JOB for a shard we do not expect " << shardName; - EXPECT_EQ(action->get(THE_LEADER), ""); EXPECT_EQ(action->get(LOCAL_LEADER), unusedServer()); } } @@ -1383,7 +1381,6 @@ TEST_F(MaintenanceTestActionPhaseOne, leader_behaviour_plan_self_local_resigned) auto shardName = action->get(SHARD); auto removed = relevantShards.erase(shardName); EXPECT_EQ(removed, 1) << "We created a JOB for a shard we do not expect " << shardName; - EXPECT_EQ(action->get(THE_LEADER), ""); EXPECT_EQ(action->get(LOCAL_LEADER), ResignShardLeadership::LeaderNotYetKnownString); } } @@ -1477,7 +1474,6 @@ TEST_F(MaintenanceTestActionPhaseOne, leader_behaviour_plan_self_local_reboot) { auto shardName = action->get(SHARD); auto removed = relevantShards.erase(shardName); EXPECT_EQ(removed, 1) << "We created a JOB for a shard we do not expect " << shardName; - EXPECT_EQ(action->get(THE_LEADER), ""); EXPECT_EQ(action->get(LOCAL_LEADER), LEADER_NOT_YET_KNOWN); } } From 4fc887c62c62001e0bb762ae24ddd39e2d882f58 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 25 Jun 2021 16:18:01 +0200 Subject: [PATCH 03/14] CHANGELOG. --- CHANGELOG | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG b/CHANGELOG index 29b300e56872..f6bc8f122201 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,12 @@ devel ----- +* Add following term ids, which prevents old synchronous replication requests + to be accepted after a follower was dropped and has gotten in sync again. + This makes the chaos tests which delay synchronous replication requests + more reliable and prevent inconsistent shard replicas under bad network + conditions. + * Web UI: Disables the hover tooltip within the statistics view of the memory consumption chart. From 979a956c12a5c50b2bb9d665877a5e1c54e2f41d Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 28 Jun 2021 10:44:17 +0200 Subject: [PATCH 04/14] Remove bug introduced by merge. --- arangod/Cluster/SynchronizeShard.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 03a2a8ac1cce..b76421ccb0d9 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -1211,10 +1211,6 @@ Result SynchronizeShard::catchupWithExclusiveLock( return {TRI_ERROR_INTERNAL, errorMessage}; } - // This is necessary to accept replications from the leader which can - // happen as soon as we are in sync. - collection.followers()->setTheLeader(leader); - NetworkFeature& nf = _feature.server().getFeature(); network::ConnectionPool* pool = nf.pool(); res = addShardFollower(pool, ep, getDatabase(), getShard(), lockJobId, clientId, From df192815713a334159e317ad33be49f6fbccd88d Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 28 Jun 2021 10:47:05 +0200 Subject: [PATCH 05/14] Fix windows compilation issue. --- arangod/Cluster/FollowerInfo.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index e4833cd57beb..05773d7999fb 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -552,7 +552,7 @@ uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { } while (i == 0); try { _followingTermId[s] = i; - } catch(std::bad_alloc const& exc) { + } catch(std::bad_alloc const&) { i = 1; // I assume here that I do not get bad_alloc if the key is // already in the map, since it then only has to overwrite // an integer, if the key is not in the map, we default to 0. From cc931fb4f490e304427802fd1f57943951aaa347 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 28 Jun 2021 11:42:14 +0200 Subject: [PATCH 06/14] Fix SyncCollectionFinalize. --- arangod/Cluster/SynchronizeShard.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index b76421ccb0d9..525e86e1cb9a 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -1181,13 +1181,14 @@ Result SynchronizeShard::catchupWithExclusiveLock( // This is necessary to accept replications from the leader which can // happen as soon as we are in sync. + std::string leaderIdWithTerm{leader}; if (_followingTermId != 0) { - collection.followers()->setTheLeader(leader + "_" + basics::StringUtils::itoa(_followingTermId)); - - } else { - // This is the case for a leader before the upgrade, we tolerate this: - collection.followers()->setTheLeader(leader); + leaderIdWithTerm += "_"; + leaderIdWithTerm += basics::StringUtils::itoa(_followingTermId); } + // If _followingTermid is 0, then this is a leader before the update, + // we tolerate this and simply use its ID without a term in this case. + collection.followers()->setTheLeader(leader); LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; builder.clear(); @@ -1196,7 +1197,7 @@ Result SynchronizeShard::catchupWithExclusiveLock( builder.add(ENDPOINT, VPackValue(ep)); builder.add(DATABASE, VPackValue(getDatabase())); builder.add(COLLECTION, VPackValue(getShard())); - builder.add(LEADER_ID, VPackValue(leader)); + builder.add(LEADER_ID, VPackValue(leaderIdWithTerm)); builder.add("from", VPackValue(lastLogTick)); builder.add("requestTimeout", VPackValue(600.0)); builder.add("connectTimeout", VPackValue(60.0)); From df4d679c6c5c9ab91ce320ca15cc0163fc72a535 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 28 Jun 2021 15:34:03 +0200 Subject: [PATCH 07/14] Fix all setTheLeader places. --- arangod/Cluster/CreateCollection.cpp | 2 +- arangod/Cluster/SynchronizeShard.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/arangod/Cluster/CreateCollection.cpp b/arangod/Cluster/CreateCollection.cpp index d58500c902d0..72d03085d238 100644 --- a/arangod/Cluster/CreateCollection.cpp +++ b/arangod/Cluster/CreateCollection.cpp @@ -165,7 +165,7 @@ bool CreateCollection::first() { std::vector noFollowers; col->followers()->takeOverLeadership(noFollowers, nullptr); } else { - col->followers()->setTheLeader(leader); + col->followers()->setTheLeader(LEADER_NOT_YET_KNOWN); } } diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 525e86e1cb9a..9cf660ac7c7f 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -1188,7 +1188,7 @@ Result SynchronizeShard::catchupWithExclusiveLock( } // If _followingTermid is 0, then this is a leader before the update, // we tolerate this and simply use its ID without a term in this case. - collection.followers()->setTheLeader(leader); + collection.followers()->setTheLeader(leaderIdWithTerm); LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; builder.clear(); From 6aa7a8a2f2c128020d34fa23ebe1b3912820be6d Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 28 Jun 2021 15:50:01 +0200 Subject: [PATCH 08/14] Fix getting in sync. --- arangod/Cluster/SynchronizeShard.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 9cf660ac7c7f..23527b0778f4 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -521,10 +521,12 @@ static arangodb::ResultT replicationSynchronize( auto& vocbase = col->vocbase(); auto database = vocbase.name(); +#if 0 std::string leaderId; if (config.hasKey(LEADER_ID)) { leaderId = config.get(LEADER_ID).copyString(); } +#endif ReplicationApplierConfiguration configuration = ReplicationApplierConfiguration::fromVelocyPack(vocbase.server(), config, database); @@ -534,9 +536,11 @@ static arangodb::ResultT replicationSynchronize( // database-specific synchronization auto syncer = DatabaseInitialSyncer::create(vocbase, configuration); +#if 0 if (!leaderId.empty()) { syncer->setLeaderId(leaderId); } +#endif SyncerId syncerId{syncer->syncerId()}; @@ -597,7 +601,9 @@ static arangodb::Result replicationSynchronizeCatchup( auto const database = conf.get(DATABASE).copyString(); auto const collection = conf.get(COLLECTION).copyString(); +#if 0 auto const leaderId = conf.get(LEADER_ID).copyString(); +#endif auto const fromTick = conf.get("from").getNumber(); ReplicationApplierConfiguration configuration = @@ -609,9 +615,11 @@ static arangodb::Result replicationSynchronizeCatchup( DatabaseGuard guard(df, database); auto syncer = DatabaseTailingSyncer::create(guard.database(), configuration, fromTick, /*useTick*/true); +#if 0 if (!leaderId.empty()) { syncer->setLeaderId(leaderId); } +#endif Result r; try { From b66b3619b3d6404b24b952de0692e9842f45a588 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Tue, 29 Jun 2021 10:47:50 +0200 Subject: [PATCH 09/14] Another try to get configuration of following in shard right. --- arangod/Cluster/SynchronizeShard.cpp | 48 +++++++++++++++++++++------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 23527b0778f4..5d621be29b55 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -89,6 +89,33 @@ std::string const TTL("ttl"); using namespace std::chrono; +// Overview over the code in this file: +// The main method being called is "first", it does: +// first: +// - wait until leader has created shard +// - lookup local shard +// - call `replicationSynchronize` +// - call `catchupWithReadLock` +// - call `catchupWithExclusiveLock` +// replicationSynchronize: +// - set local shard to follow leader (without a following term id) +// - use a `DatabaseInitialSyncer` to synchronize to a certain state, +// (configure leaderId for it to go through) +// catchupWithReadLock: +// - start a read lock on leader +// - keep configuration for shard to follow the leader without term id +// - call `replicationSynchronizeCatchup` (WAL tailing, configure leaderId +// for it to go through) +// - cancel read lock on leader +// catchupWithExclusiveLock: +// - start an exclusive lock on leader, acquire unique following term id +// - set local shard to follower leader (with new following term id) +// - call `replicationSynchronizeFinalize` (WAL tailing) +// - do a final check by comparing counts on leader and follower +// - add us as official follower on the leader +// - release exclusive lock on leader +// + SynchronizeShard::SynchronizeShard(MaintenanceFeature& feature, ActionDescription const& desc) : ActionBase(feature, desc), ShardDefinition(desc.get(DATABASE), desc.get(SHARD)), @@ -521,12 +548,10 @@ static arangodb::ResultT replicationSynchronize( auto& vocbase = col->vocbase(); auto database = vocbase.name(); -#if 0 std::string leaderId; if (config.hasKey(LEADER_ID)) { leaderId = config.get(LEADER_ID).copyString(); } -#endif ReplicationApplierConfiguration configuration = ReplicationApplierConfiguration::fromVelocyPack(vocbase.server(), config, database); @@ -536,11 +561,10 @@ static arangodb::ResultT replicationSynchronize( // database-specific synchronization auto syncer = DatabaseInitialSyncer::create(vocbase, configuration); -#if 0 if (!leaderId.empty()) { + // In this phase we use the normal leader ID without following term id: syncer->setLeaderId(leaderId); } -#endif SyncerId syncerId{syncer->syncerId()}; @@ -601,9 +625,7 @@ static arangodb::Result replicationSynchronizeCatchup( auto const database = conf.get(DATABASE).copyString(); auto const collection = conf.get(COLLECTION).copyString(); -#if 0 auto const leaderId = conf.get(LEADER_ID).copyString(); -#endif auto const fromTick = conf.get("from").getNumber(); ReplicationApplierConfiguration configuration = @@ -615,11 +637,11 @@ static arangodb::Result replicationSynchronizeCatchup( DatabaseGuard guard(df, database); auto syncer = DatabaseTailingSyncer::create(guard.database(), configuration, fromTick, /*useTick*/true); -#if 0 if (!leaderId.empty()) { + // In this phase we still use the normal leaderId without a following + // term id: syncer->setLeaderId(leaderId); } -#endif Result r; try { @@ -643,10 +665,10 @@ static arangodb::Result replicationSynchronizeCatchup( static arangodb::Result replicationSynchronizeFinalize(SynchronizeShard const& job, application_features::ApplicationServer& server, - VPackSlice const& conf) { + VPackSlice const& conf, + std::string const& leaderId) { auto const database = conf.get(DATABASE).copyString(); auto const collection = conf.get(COLLECTION).copyString(); - auto const leaderId = conf.get(LEADER_ID).copyString(); auto const fromTick = conf.get("from").getNumber(); ReplicationApplierConfiguration configuration = @@ -954,6 +976,10 @@ bool SynchronizeShard::first() { auto details = std::make_shared(); + // Configure the shard to follow the leader without any following + // term id: + collection->followers()->setTheLeader(leader); + ResultT syncRes = replicationSynchronize(*this, collection, config.slice(), details); @@ -1211,7 +1237,7 @@ Result SynchronizeShard::catchupWithExclusiveLock( builder.add("connectTimeout", VPackValue(60.0)); } - res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice()); + res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice(), leaderIdWithTerm); if (!res.ok()) { std::string errorMessage( From 304cc67edefadf1586525d134c682efd0eaaca01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Neunh=C3=B6ffer?= Date: Tue, 29 Jun 2021 10:59:11 +0200 Subject: [PATCH 10/14] Apply suggestions from code review Co-authored-by: Jan --- arangod/Cluster/FollowerInfo.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index 05773d7999fb..e3bc5ebb12fe 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -555,12 +555,12 @@ uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { } catch(std::bad_alloc const&) { i = 1; // I assume here that I do not get bad_alloc if the key is // already in the map, since it then only has to overwrite - // an integer, if the key is not in the map, we default to 0. + // an integer, if the key is not in the map, we default to 1. } return i; } -uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) noexcept { +uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) const noexcept { // Note that we assume that find() does not throw! auto it = _followingTermId.find(s); if (it == _followingTermId.end()) { @@ -569,4 +569,3 @@ uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) noexcept { } return it->second; } - From ebf60127b3e520278b6d559429930fe08e88f8a6 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Tue, 29 Jun 2021 12:06:56 +0200 Subject: [PATCH 11/14] Fix compilation. --- arangod/Cluster/FollowerInfo.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangod/Cluster/FollowerInfo.h b/arangod/Cluster/FollowerInfo.h index dc475096ddc8..12c0ab775c6a 100644 --- a/arangod/Cluster/FollowerInfo.h +++ b/arangod/Cluster/FollowerInfo.h @@ -169,7 +169,7 @@ class FollowerInfo { /// the replication in case it is not "in the same term". ////////////////////////////////////////////////////////////////////////////// - uint64_t getFollowingTermId(ServerID const& s) noexcept; + uint64_t getFollowingTermId(ServerID const& s) const noexcept; ////////////////////////////////////////////////////////////////////////////// /// @brief clear follower list, no changes in agency necesary From 0b774280eb261ae750f90e5bdb101f6b37b0fef6 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Tue, 29 Jun 2021 13:57:12 +0200 Subject: [PATCH 12/14] Improve comments. --- arangod/Cluster/FollowerInfo.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arangod/Cluster/FollowerInfo.h b/arangod/Cluster/FollowerInfo.h index 12c0ab775c6a..4e1b16c6fdf3 100644 --- a/arangod/Cluster/FollowerInfo.h +++ b/arangod/Cluster/FollowerInfo.h @@ -55,8 +55,8 @@ class FollowerInfo { std::shared_ptr> _failoverCandidates; // The following map holds a random number for each follower, this - // random number is sent given to the follower when it gets in sync - // (actually, when it acquires the hard lock to get in sync), and is + // random number is given and sent to the follower when it gets in sync + // (actually, when it acquires the exclusive lock to get in sync), and is // then subsequently sent alongside every synchronous replication // request. If the number does not match, the follower will refuse the // replication request. This is to ensure that replication requests cannot From aaccae53c643e335fbdac44727d148154f12bb30 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 1 Jul 2021 07:47:55 +0200 Subject: [PATCH 13/14] Concurrency fixes. --- arangod/Cluster/FollowerInfo.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index e3bc5ebb12fe..d4a3da2f4bde 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -545,11 +545,17 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const { } uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { + WRITE_LOCKER(guard, _dataLock); uint64_t i = 0; - // We want the random number to be non-zero: + uint64_t prev = 0; + auto it = _followingTermId.find(s); + if (it != _followingTermId.end()) { + prev = it->second; + } + // We want the random number to be non-zero and different from a previous one: do { i = RandomGenerator::interval(UINT64_MAX); - } while (i == 0); + } while (i == 0 || i == prev); try { _followingTermId[s] = i; } catch(std::bad_alloc const&) { @@ -561,6 +567,7 @@ uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { } uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) const noexcept { + READ_LOCKER(guard, _dataLock); // Note that we assume that find() does not throw! auto it = _followingTermId.find(s); if (it == _followingTermId.end()) { From cd3715c11bae933aec6abc9733ff404adaf9b947 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 1 Jul 2021 08:27:48 +0200 Subject: [PATCH 14/14] Add test. --- .../shell/shell-following-term-id-cluster.js | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 tests/js/client/shell/shell-following-term-id-cluster.js diff --git a/tests/js/client/shell/shell-following-term-id-cluster.js b/tests/js/client/shell/shell-following-term-id-cluster.js new file mode 100644 index 000000000000..cc717e819274 --- /dev/null +++ b/tests/js/client/shell/shell-following-term-id-cluster.js @@ -0,0 +1,119 @@ +/* jshint globalstrict:false, strict:false, maxlen: 200 */ +/* global fail, assertEqual, assertTrue, assertFalse, arango */ + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief Test following term id behaviour. +// / +// / DISCLAIMER +// / +// / Copyright 2021 ArangoDB GmbH, Cologne, Germany +// / +// / Licensed under the Apache License, Version 2.0 (the "License") +// / you may not use this file except in compliance with the License. +// / You may obtain a copy of the License at +// / +// / http://www.apache.org/licenses/LICENSE-2.0 +// / +// / Unless required by applicable law or agreed to in writing, software +// / distributed under the License is distributed on an "AS IS" BASIS, +// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// / See the License for the specific language governing permissions and +// / limitations under the License. +// / +// / Copyright holder is triAGENS GmbH, Cologne, Germany +// / +// / @author Max Neunhoeffer +// ////////////////////////////////////////////////////////////////////////////// + +const _ = require('lodash'); +let jsunity = require('jsunity'); +let internal = require('internal'); +let arangodb = require('@arangodb'); +let db = arangodb.db; + +function createCollectionWithKnownLeaderAndFollower(cn) { + db._create(cn, {numberOfShards:1, replicationFactor:2}); + // Get dbserver names first: + let health = arango.GET("/_admin/cluster/health").Health; + let endpointMap = {}; + let idMap = {}; + for (let sid in health) { + endpointMap[health[sid].ShortName] = health[sid].Endpoint; + idMap[health[sid].ShortName] = sid; + } + let plan = arango.GET("/_admin/cluster/shardDistribution").results[cn].Plan; + let shard = Object.keys(plan)[0]; + let coordinator = "Coordinator0001"; + let leader = plan[shard].leader; + let follower = plan[shard].followers[0]; + return { endpointMap, idMap, coordinator, leader, follower, shard }; +} + +function switchConnectionToCoordinator(collInfo) { + arango.reconnect(collInfo.endpointMap[collInfo.coordinator], "_system", "root", ""); +} + +function switchConnectionToLeader(collInfo) { + arango.reconnect(collInfo.endpointMap[collInfo.leader], "_system", "root", ""); +} + +function switchConnectionToFollower(collInfo) { + arango.reconnect(collInfo.endpointMap[collInfo.follower], "_system", "root", ""); +} + +function followingTermIdSuite() { + 'use strict'; + const cn = 'UnitTestsFollowingTermId'; + let collInfo = {}; + + return { + + setUp: function () { + db._drop(cn); + collInfo = createCollectionWithKnownLeaderAndFollower(cn); + }, + + tearDown: function () { + db._drop(cn); + }, + + testFollowingTermIdSuite: function() { + // We have a shard whose leader and follower is known to us. + + // Let's insert some documents: + let c = db._collection(cn); + for (let i = 0; i < 100; ++i) { + c.insert({Hallo:i}); + } + + // Now check that both leader and follower have 100 documents: + switchConnectionToLeader(collInfo); + assertEqual(100, db._collection(collInfo.shard).count()); + switchConnectionToFollower(collInfo); + assertEqual(100, db._collection(collInfo.shard).count()); + + // Try to insert a document with the leaderId: + let res = arango.POST(`/_api/document/${collInfo.shard}?isSynchronousReplication=${collInfo.idMap[collInfo.leader]}`, {Hallo:101}); + assertTrue(res.error); + assertEqual(406, res.code); + assertEqual(1490, res.errorNum); + + switchConnectionToCoordinator(collInfo); + + // Now insert another document: + c.insert({Hallo:101}); + + // And check that both leader and follower have 101 documents: + switchConnectionToLeader(collInfo); + assertEqual(101, db._collection(collInfo.shard).count()); + switchConnectionToFollower(collInfo); + assertEqual(101, db._collection(collInfo.shard).count()); + + switchConnectionToCoordinator(collInfo); + }, + + }; +} + +jsunity.run(followingTermIdSuite); +return jsunity.done();