diff --git a/CHANGELOG b/CHANGELOG index 7ef25adf9d71..664bc93bf3ac 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,12 @@ devel ----- +* Add following term ids, which prevents old synchronous replication requests + to be accepted after a follower was dropped and has gotten in sync again. + This makes the chaos tests which delay synchronous replication requests + more reliable and prevent inconsistent shard replicas under bad network + conditions. + * Enable process metrics on agent instances by default. Previously, some metrics (including the metrics starting with `arangodb_process` prefix) were not returned by agent instances. diff --git a/arangod/Cluster/CreateCollection.cpp b/arangod/Cluster/CreateCollection.cpp index d58500c902d0..72d03085d238 100644 --- a/arangod/Cluster/CreateCollection.cpp +++ b/arangod/Cluster/CreateCollection.cpp @@ -165,7 +165,7 @@ bool CreateCollection::first() { std::vector noFollowers; col->followers()->takeOverLeadership(noFollowers, nullptr); } else { - col->followers()->setTheLeader(leader); + col->followers()->setTheLeader(LEADER_NOT_YET_KNOWN); } } diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index 16c7f0c2eb1c..d4a3da2f4bde 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -33,6 +33,7 @@ #include "Logger/LogMacros.h" #include "Logger/Logger.h" #include "Logger/LoggerStream.h" +#include "Random/RandomGenerator.h" #include "VocBase/LogicalCollection.h" using namespace arangodb; @@ -542,3 +543,36 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const { } return newValue; } + +uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept { + WRITE_LOCKER(guard, _dataLock); + uint64_t i = 0; + uint64_t prev = 0; + auto it = _followingTermId.find(s); + if (it != _followingTermId.end()) { + prev = it->second; + } + // We want the random number to be non-zero and different from a previous one: + do { + i = RandomGenerator::interval(UINT64_MAX); + } while (i == 0 || i == prev); + try { + _followingTermId[s] = i; + } catch(std::bad_alloc const&) { + i = 1; // I assume here that I do not get bad_alloc if the key is + // already in the map, since it then only has to overwrite + // an integer, if the key is not in the map, we default to 1. + } + return i; +} + +uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) const noexcept { + READ_LOCKER(guard, _dataLock); + // Note that we assume that find() does not throw! + auto it = _followingTermId.find(s); + if (it == _followingTermId.end()) { + // If not found, we use the default from above: + return 1; + } + return it->second; +} diff --git a/arangod/Cluster/FollowerInfo.h b/arangod/Cluster/FollowerInfo.h index 507e79ab2c36..4e1b16c6fdf3 100644 --- a/arangod/Cluster/FollowerInfo.h +++ b/arangod/Cluster/FollowerInfo.h @@ -54,6 +54,20 @@ class FollowerInfo { // soon as we can guarantee at least so many followers locally. std::shared_ptr> _failoverCandidates; + // The following map holds a random number for each follower, this + // random number is given and sent to the follower when it gets in sync + // (actually, when it acquires the exclusive lock to get in sync), and is + // then subsequently sent alongside every synchronous replication + // request. If the number does not match, the follower will refuse the + // replication request. This is to ensure that replication requests cannot + // be delayed into the "next" leader/follower relationship. + // And here is the proof that this works: The exclusive lock divides the + // write operations between "before" and "after". The id is changed + // when the exclusive lock is established. Therefore, it is OK for the + // replication requests to send along the "current" id, directly + // from this map. + std::unordered_map _followingTermId; + // The agencyMutex is used to synchronise access to the agency. // the _dataLock is used to sync the access to local data. // The _canWriteLock is used to protect flag if we do have enough followers @@ -135,6 +149,28 @@ class FollowerInfo { Result remove(ServerID const& s); + ////////////////////////////////////////////////////////////////////////////// + /// @brief for each run of the "get-in-sync" protocol we generate a + /// random number to identify this "following term". This is created + /// when the follower fetches the exclusive lock to finally get in sync + /// and is stored in _followingTermId, so that it can be forwarded with + /// each synchronous replication request. The follower can then decline + /// the replication in case it is not "in the same term". + ////////////////////////////////////////////////////////////////////////////// + + uint64_t newFollowingTermId(ServerID const& s) noexcept; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief for each run of the "get-in-sync" protocol we generate a + /// random number to identify this "following term". This is created + /// when the follower fetches the exclusive lock to finally get in sync + /// and is stored in _followingTermId, so that it can be forwarded with + /// each synchronous replication request. The follower can then decline + /// the replication in case it is not "in the same term". + ////////////////////////////////////////////////////////////////////////////// + + uint64_t getFollowingTermId(ServerID const& s) const noexcept; + ////////////////////////////////////////////////////////////////////////////// /// @brief clear follower list, no changes in agency necesary ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/Maintenance.cpp b/arangod/Cluster/Maintenance.cpp index 77b78902e02e..4b48930d6183 100644 --- a/arangod/Cluster/Maintenance.cpp +++ b/arangod/Cluster/Maintenance.cpp @@ -320,7 +320,6 @@ void handlePlanShard(StorageEngine& engine, uint64_t planIndex, VPackSlice const {DATABASE, dbname}, {COLLECTION, colname}, {SHARD, shname}, - {THE_LEADER, std::string()}, {LOCAL_LEADER, std::string(localLeader)}, {OLD_CURRENT_COUNTER, "0"}, // legacy, no longer used {PLAN_RAFT_INDEX, std::to_string(planIndex)}}, diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index caa3268049ba..5d621be29b55 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -89,10 +89,38 @@ std::string const TTL("ttl"); using namespace std::chrono; +// Overview over the code in this file: +// The main method being called is "first", it does: +// first: +// - wait until leader has created shard +// - lookup local shard +// - call `replicationSynchronize` +// - call `catchupWithReadLock` +// - call `catchupWithExclusiveLock` +// replicationSynchronize: +// - set local shard to follow leader (without a following term id) +// - use a `DatabaseInitialSyncer` to synchronize to a certain state, +// (configure leaderId for it to go through) +// catchupWithReadLock: +// - start a read lock on leader +// - keep configuration for shard to follow the leader without term id +// - call `replicationSynchronizeCatchup` (WAL tailing, configure leaderId +// for it to go through) +// - cancel read lock on leader +// catchupWithExclusiveLock: +// - start an exclusive lock on leader, acquire unique following term id +// - set local shard to follower leader (with new following term id) +// - call `replicationSynchronizeFinalize` (WAL tailing) +// - do a final check by comparing counts on leader and follower +// - add us as official follower on the leader +// - release exclusive lock on leader +// + SynchronizeShard::SynchronizeShard(MaintenanceFeature& feature, ActionDescription const& desc) : ActionBase(feature, desc), ShardDefinition(desc.get(DATABASE), desc.get(SHARD)), - _leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()) { + _leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()), + _followingTermId(0) { std::stringstream error; if (!desc.has(COLLECTION)) { @@ -416,7 +444,7 @@ arangodb::Result SynchronizeShard::getReadLock( // nullptr only happens during controlled shutdown if (pool == nullptr) { return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, - "cancelReadLockOnLeader: Shutting down"); + "getReadLock: Shutting down"); } VPackBuilder body; @@ -445,6 +473,16 @@ arangodb::Result SynchronizeShard::getReadLock( if (res.ok()) { // Habemus clausum, we have a lock + if (!soft) { + // Now store the random followingTermId: + VPackSlice body = response.response().slice(); + if (body.isObject()) { + VPackSlice followingTermIdSlice = body.get(StaticStrings::FollowingTermId); + if (followingTermIdSlice.isNumber()) { + _followingTermId = followingTermIdSlice.getNumber(); + } + } + } return arangodb::Result(); } @@ -524,6 +562,7 @@ static arangodb::ResultT replicationSynchronize( auto syncer = DatabaseInitialSyncer::create(vocbase, configuration); if (!leaderId.empty()) { + // In this phase we use the normal leader ID without following term id: syncer->setLeaderId(leaderId); } @@ -599,6 +638,8 @@ static arangodb::Result replicationSynchronizeCatchup( auto syncer = DatabaseTailingSyncer::create(guard.database(), configuration, fromTick, /*useTick*/true); if (!leaderId.empty()) { + // In this phase we still use the normal leaderId without a following + // term id: syncer->setLeaderId(leaderId); } @@ -624,10 +665,10 @@ static arangodb::Result replicationSynchronizeCatchup( static arangodb::Result replicationSynchronizeFinalize(SynchronizeShard const& job, application_features::ApplicationServer& server, - VPackSlice const& conf) { + VPackSlice const& conf, + std::string const& leaderId) { auto const database = conf.get(DATABASE).copyString(); auto const collection = conf.get(COLLECTION).copyString(); - auto const leaderId = conf.get(LEADER_ID).copyString(); auto const fromTick = conf.get("from").getNumber(); ReplicationApplierConfiguration configuration = @@ -935,6 +976,10 @@ bool SynchronizeShard::first() { auto details = std::make_shared(); + // Configure the shard to follow the leader without any following + // term id: + collection->followers()->setTheLeader(leader); + ResultT syncRes = replicationSynchronize(*this, collection, config.slice(), details); @@ -1165,6 +1210,19 @@ Result SynchronizeShard::catchupWithExclusiveLock( } }); + // Now we have got a unique id for this following term and have stored it + // in _followingTermId, so we can use it to set the leader: + + // This is necessary to accept replications from the leader which can + // happen as soon as we are in sync. + std::string leaderIdWithTerm{leader}; + if (_followingTermId != 0) { + leaderIdWithTerm += "_"; + leaderIdWithTerm += basics::StringUtils::itoa(_followingTermId); + } + // If _followingTermid is 0, then this is a leader before the update, + // we tolerate this and simply use its ID without a term in this case. + collection.followers()->setTheLeader(leaderIdWithTerm); LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId; builder.clear(); @@ -1173,13 +1231,13 @@ Result SynchronizeShard::catchupWithExclusiveLock( builder.add(ENDPOINT, VPackValue(ep)); builder.add(DATABASE, VPackValue(getDatabase())); builder.add(COLLECTION, VPackValue(getShard())); - builder.add(LEADER_ID, VPackValue(leader)); + builder.add(LEADER_ID, VPackValue(leaderIdWithTerm)); builder.add("from", VPackValue(lastLogTick)); builder.add("requestTimeout", VPackValue(600.0)); builder.add("connectTimeout", VPackValue(60.0)); } - res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice()); + res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice(), leaderIdWithTerm); if (!res.ok()) { std::string errorMessage( @@ -1188,10 +1246,6 @@ Result SynchronizeShard::catchupWithExclusiveLock( return {TRI_ERROR_INTERNAL, errorMessage}; } - // This is necessary to accept replications from the leader which can - // happen as soon as we are in sync. - collection.followers()->setTheLeader(leader); - NetworkFeature& nf = _feature.server().getFeature(); network::ConnectionPool* pool = nf.pool(); res = addShardFollower(pool, ep, getDatabase(), getShard(), lockJobId, clientId, diff --git a/arangod/Cluster/SynchronizeShard.h b/arangod/Cluster/SynchronizeShard.h index 6bd413db9e81..f321295ee5bc 100644 --- a/arangod/Cluster/SynchronizeShard.h +++ b/arangod/Cluster/SynchronizeShard.h @@ -91,6 +91,7 @@ class SynchronizeShard : public ActionBase, public ShardDefinition { /// @brief information about the leader, reused across multiple replication steps arangodb::replutils::LeaderInfo _leaderInfo; + uint64_t _followingTermId; }; } // namespace maintenance diff --git a/arangod/Cluster/TakeoverShardLeadership.cpp b/arangod/Cluster/TakeoverShardLeadership.cpp index 6797fc5d932d..10a732e11d9f 100644 --- a/arangod/Cluster/TakeoverShardLeadership.cpp +++ b/arangod/Cluster/TakeoverShardLeadership.cpp @@ -86,11 +86,6 @@ TakeoverShardLeadership::TakeoverShardLeadership(MaintenanceFeature& feature, error << "database and shard must be specified. "; } - if (!desc.has(THE_LEADER)) { - error << "leader must be specified. "; - } - TRI_ASSERT(desc.has(THE_LEADER)); - if (!desc.has(LOCAL_LEADER)) { error << "local leader must be specified. "; } @@ -112,7 +107,7 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, std::vector const& currentServers, std::shared_ptr>& realInsyncFollowers, std::string const& databaseName, - ShardID const& shardID, std::string const& oldLeader) { + LogicalCollection& shard, std::string const& oldLeader) { if (pool == nullptr) { // nullptr happens only during controlled shutdown return; @@ -120,15 +115,6 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, std::string const& sid = ServerState::instance()->getId(); - VPackBufferUInt8 buffer; - VPackBuilder bodyBuilder(buffer); - { - VPackObjectBuilder ob(&bodyBuilder); - bodyBuilder.add("leaderId", VPackValue(sid)); - bodyBuilder.add("oldLeaderId", VPackValue(oldLeader)); - bodyBuilder.add("shard", VPackValue(shardID)); - } - network::RequestOptions options; options.database = databaseName; options.timeout = network::Timeout(3.0); @@ -143,6 +129,17 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, if (srv == sid) { continue; // ignore ourself } + uint64_t followingTermId = shard.followers()->newFollowingTermId(srv); + VPackBufferUInt8 buffer; + VPackBuilder bodyBuilder(buffer); + { + VPackObjectBuilder ob(&bodyBuilder); + bodyBuilder.add("leaderId", VPackValue(sid)); + bodyBuilder.add("oldLeaderId", VPackValue(oldLeader)); + bodyBuilder.add("shard", VPackValue(shard.name())); + bodyBuilder.add(StaticStrings::FollowingTermId, VPackValue(followingTermId)); + } + LOG_TOPIC("42516", DEBUG, Logger::MAINTENANCE) << "Sending " << bodyBuilder.toJson() << " to " << srv; auto f = network::sendRequest(pool, "server:" + srv, fuerte::RestVerb::Put, @@ -165,78 +162,54 @@ static void sendLeaderChangeRequests(network::ConnectionPool* pool, } static void handleLeadership(uint64_t planIndex, LogicalCollection& collection, - std::string const& localLeader, std::string const& plannedLeader, + std::string const& localLeader, std::string const& databaseName, MaintenanceFeature& feature) { auto& followers = collection.followers(); - if (plannedLeader.empty()) { // Planned to lead - if (!localLeader.empty()) { // We were not leader, assume leadership - LOG_TOPIC("5632f", DEBUG, Logger::MAINTENANCE) - << "handling leadership of shard '" << databaseName << "/" - << collection.name() << ": becoming leader"; - - auto& ci = collection.vocbase().server().getFeature().clusterInfo(); - // This will block the thread until our ClusterInfo cache fetched a - // Current version in background thread which is at least as new as the - // Plan which brought us here. This is important for the assertion - // below where we check that we are in the list of failoverCandidates! - ci.waitForCurrent(planIndex); - auto currentInfo = - ci.getCollectionCurrent(databaseName, - std::to_string(collection.planId().id())); - if (currentInfo == nullptr) { - // Collection has been dropped. we cannot continue here. - return; - } - TRI_ASSERT(currentInfo != nullptr); - std::vector currentServers = currentInfo->servers(collection.name()); - std::shared_ptr> realInsyncFollowers; - - if (!currentServers.empty()) { - std::string& oldLeader = currentServers[0]; - // Check if the old leader has resigned and stopped all write - // (if so, we can assume that all servers are still in sync) - if (!oldLeader.empty() && oldLeader[0] == '_') { - // remove the underscore from the list as it is useless anyway - oldLeader = oldLeader.substr(1); - - // Update all follower and tell them that we are the leader now - NetworkFeature& nf = - collection.vocbase().server().getFeature(); - network::ConnectionPool* pool = nf.pool(); - sendLeaderChangeRequests(pool, currentServers, realInsyncFollowers, - databaseName, collection.name(), oldLeader); - } - } - - std::vector failoverCandidates = - currentInfo->failoverCandidates(collection.name()); - followers->takeOverLeadership(failoverCandidates, realInsyncFollowers); - transaction::cluster::abortFollowerTransactionsOnShard(collection.id()); + if (!localLeader.empty()) { // We were not leader, assume leadership + LOG_TOPIC("5632f", DEBUG, Logger::MAINTENANCE) + << "handling leadership of shard '" << databaseName << "/" + << collection.name() << ": becoming leader"; + + auto& ci = collection.vocbase().server().getFeature().clusterInfo(); + // This will block the thread until our ClusterInfo cache fetched a + // Current version in background thread which is at least as new as the + // Plan which brought us here. This is important for the assertion + // below where we check that we are in the list of failoverCandidates! + ci.waitForCurrent(planIndex); + auto currentInfo = + ci.getCollectionCurrent(databaseName, + std::to_string(collection.planId().id())); + if (currentInfo == nullptr) { + // Collection has been dropped. we cannot continue here. + return; } - } else { // Planned to follow - if (localLeader.empty() || localLeader == LEADER_NOT_YET_KNOWN) { - // Note that the following does not delete the follower list - // and that this is crucial, because in the planned leader - // resign case, updateCurrentForCollections will report the - // resignation together with the old in-sync list to the - // agency. If this list would be empty, then the supervision - // would be very angry with us! - - LOG_TOPIC("5632e", DEBUG, Logger::MAINTENANCE) - << "handling leadership of shard '" << databaseName << "/" - << collection.name() << ": following " << plannedLeader; - - followers->setTheLeader(plannedLeader); - transaction::cluster::abortLeaderTransactionsOnShard(collection.id()); + TRI_ASSERT(currentInfo != nullptr); + std::vector currentServers = currentInfo->servers(collection.name()); + std::shared_ptr> realInsyncFollowers; + + if (!currentServers.empty()) { + std::string& oldLeader = currentServers[0]; + // Check if the old leader has resigned and stopped all write + // (if so, we can assume that all servers are still in sync) + if (!oldLeader.empty() && oldLeader[0] == '_') { + // remove the underscore from the list as it is useless anyway + oldLeader = oldLeader.substr(1); + + // Update all follower and tell them that we are the leader now + NetworkFeature& nf = + collection.vocbase().server().getFeature(); + network::ConnectionPool* pool = nf.pool(); + sendLeaderChangeRequests(pool, currentServers, realInsyncFollowers, + databaseName, collection, oldLeader); + } } - // Note that if we have been a follower to some leader - // we do not immediately adjust the leader here, even if - // the planned leader differs from what we have set locally. - // The setting must only be adjusted once we have - // synchronized with the new leader and negotiated - // a leader/follower relationship! + + std::vector failoverCandidates = + currentInfo->failoverCandidates(collection.name()); + followers->takeOverLeadership(failoverCandidates, realInsyncFollowers); + transaction::cluster::abortFollowerTransactionsOnShard(collection.id()); } } @@ -244,7 +217,6 @@ bool TakeoverShardLeadership::first() { std::string const& database = getDatabase(); std::string const& collection = _description.get(COLLECTION); std::string const& shard = getShard(); - std::string const& plannedLeader = _description.get(THE_LEADER); std::string const& localLeader = _description.get(LOCAL_LEADER); std::string const& planRaftIndex = _description.get(PLAN_RAFT_INDEX); uint64_t planIndex = basics::StringUtils::uint64(planRaftIndex); @@ -265,7 +237,7 @@ bool TakeoverShardLeadership::first() { // resignation case is not handled here, since then // ourselves does not appear in shards[shard] but only // "_" + ourselves. - handleLeadership(planIndex, *coll, localLeader, plannedLeader, vocbase.name(), + handleLeadership(planIndex, *coll, localLeader, vocbase.name(), feature()); } else { std::stringstream error; diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 57690376f2e0..5cd4e97fd060 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2567,6 +2567,10 @@ void RestReplicationHandler::handleCommandSetTheLeader() { VPackSlice const leaderIdSlice = body.get("leaderId"); VPackSlice const oldLeaderIdSlice = body.get("oldLeaderId"); VPackSlice const shard = body.get("shard"); + VPackSlice const followingTermId = body.get(StaticStrings::FollowingTermId); + // Note that we tolerate if followingTermId is not present or not a number + // for upgrade scenarios. If the new leader does not send it, it will also + // not send it in the option IsSynchronousReplication. if (!leaderIdSlice.isString() || !shard.isString() || !oldLeaderIdSlice.isString()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "'leaderId' and 'shard' attributes must be strings"); @@ -2600,7 +2604,12 @@ void RestReplicationHandler::handleCommandSetTheLeader() { return; } - col->followers()->setTheLeader(leaderId); + if (followingTermId.isNumber()) { + col->followers()->setTheLeader(leaderId + "_" + + StringUtils::itoa(followingTermId.getNumber())); + } else { + col->followers()->setTheLeader(leaderId); + } } VPackBuilder b; @@ -2729,6 +2738,10 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() { { VPackObjectBuilder bb(&b); b.add(StaticStrings::Error, VPackValue(false)); + if (!serverId.empty()) { + b.add(StaticStrings::FollowingTermId, + VPackValue(col->followers()->newFollowingTermId(serverId))); + } } LOG_TOPIC("61a9d", DEBUG, Logger::REPLICATION) diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 0e43c93c6717..f593e067cbe4 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -1257,7 +1257,8 @@ Future transaction::Methods::insertLocal(std::string const& cna // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, value, TRI_VOC_DOCUMENT_OPERATION_INSERT, resDocs, - excludePositions) + excludePositions, + *collection->followers()) .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) mutable { if (!res.ok()) { return OperationResult{std::move(res), options}; @@ -1530,7 +1531,7 @@ Future transaction::Methods::modifyLocal(std::string const& col // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, newValue, - operation, resDocs, {}) + operation, resDocs, {}, *collection->followers()) .thenValue([options, errs = std::move(errorCounter), resDocs](Result&& res) mutable { if (!res.ok()) { return OperationResult{std::move(res), options}; @@ -1745,7 +1746,8 @@ Future transaction::Methods::removeLocal(std::string const& col // Now replicate the good operations on all followers: return replicateOperations(collection.get(), followers, options, value, - TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs, {}) + TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs, + {}, *collection->followers()) .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) mutable { if (!res.ok()) { return OperationResult{std::move(res), options}; @@ -1927,10 +1929,15 @@ Future transaction::Methods::truncateLocal(std::string const& c network::RequestOptions reqOpts; reqOpts.database = vocbase().name(); reqOpts.timeout = network::Timeout(600); - reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId()); reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false")); for (auto const& f : *followers) { + reqOpts.param(StaticStrings::IsSynchronousReplicationString, + ServerState::instance()->getId() + "_" + + basics::StringUtils::itoa( + collection->followers()->getFollowingTermId(f))); + // reqOpts is copied deep in sendRequestRetry, so we are OK to + // change it in the loop! network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); auto future = network::sendRequestRetry(pool, "server:" + f, fuerte::RestVerb::Put, @@ -2355,7 +2362,8 @@ Future Methods::replicateOperations( OperationOptions const& options, VPackSlice const value, TRI_voc_document_operation_e const operation, std::shared_ptr> const& ops, - std::unordered_set const& excludePositions) { + std::unordered_set const& excludePositions, + FollowerInfo& followerInfo) { TRI_ASSERT(followerList != nullptr); if (followerList->empty()) { @@ -2367,8 +2375,6 @@ Future Methods::replicateOperations( network::RequestOptions reqOpts; reqOpts.database = vocbase().name(); reqOpts.param(StaticStrings::IsRestoreString, "true"); - reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId()); - std::string url = "/_api/document/"; url.append(arangodb::basics::StringUtils::urlEncode(collection->name())); if (operation != TRI_VOC_DOCUMENT_OPERATION_INSERT && !value.isArray()) { @@ -2491,6 +2497,12 @@ Future Methods::replicateOperations( auto* pool = vocbase().server().getFeature().pool(); for (auto const& f : *followerList) { + reqOpts.param(StaticStrings::IsSynchronousReplicationString, + ServerState::instance()->getId() + "_" + + basics::StringUtils::itoa( + collection->followers()->getFollowingTermId(f))); + // reqOpts is copied deep in sendRequestRetry, so we are OK to + // change it in the loop! network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); futures.emplace_back(network::sendRequestRetry(pool, "server:" + f, requestType, diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index 54bea8cf1aca..32d207a319c7 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -27,6 +27,7 @@ #include "Basics/Common.h" #include "Basics/Exceptions.h" #include "Basics/Result.h" +#include "Cluster/FollowerInfo.h" #include "Futures/Future.h" #include "Indexes/IndexIterator.h" #include "Rest/CommonDefines.h" @@ -498,7 +499,8 @@ class Methods { std::shared_ptr> const& followers, OperationOptions const& options, VPackSlice value, TRI_voc_document_operation_e operation, std::shared_ptr> const& ops, - std::unordered_set const& excludePositions); + std::unordered_set const& excludePositions, + FollowerInfo& followerInfo); private: /// @brief transaction hints diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index 419fefb1fe8a..3925922ed5bb 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -336,6 +336,7 @@ std::string const StaticStrings::RevisionTreeInitialRangeMin("initialRangeMin"); std::string const StaticStrings::RevisionTreeRanges("ranges"); std::string const StaticStrings::RevisionTreeResume("resume"); std::string const StaticStrings::RevisionTreeVersion("version"); +std::string const StaticStrings::FollowingTermId("followingTermId"); // Generic attribute names std::string const StaticStrings::AttrCoordinator("coordinator"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index a1b640d3d7f0..69950603065c 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -312,6 +312,7 @@ class StaticStrings { static std::string const RevisionTreeRanges; static std::string const RevisionTreeResume; static std::string const RevisionTreeVersion; + static std::string const FollowingTermId; // generic attribute names static std::string const AttrCoordinator; diff --git a/tests/Maintenance/MaintenanceTest.cpp b/tests/Maintenance/MaintenanceTest.cpp index c43e0f18b57e..15a6da4c31de 100644 --- a/tests/Maintenance/MaintenanceTest.cpp +++ b/tests/Maintenance/MaintenanceTest.cpp @@ -740,7 +740,6 @@ class MaintenanceTestActionPhaseOne : public SharedMaintenanceTest { ASSERT_TRUE(action.has(DATABASE)); ASSERT_TRUE(action.has(COLLECTION)); ASSERT_TRUE(action.has(SHARD)); - ASSERT_TRUE(action.has(THE_LEADER)); ASSERT_TRUE(action.has(LOCAL_LEADER)); ASSERT_TRUE(action.has(PLAN_RAFT_INDEX)); ASSERT_EQ(action.get(DATABASE), dbName); @@ -1285,7 +1284,6 @@ TEST_F(MaintenanceTestActionPhaseOne, leader_behaviour_plan_self_local_other) { auto shardName = action->get(SHARD); auto removed = relevantShards.erase(shardName); EXPECT_EQ(removed, 1) << "We created a JOB for a shard we do not expect " << shardName; - EXPECT_EQ(action->get(THE_LEADER), ""); EXPECT_EQ(action->get(LOCAL_LEADER), unusedServer()); } } @@ -1383,7 +1381,6 @@ TEST_F(MaintenanceTestActionPhaseOne, leader_behaviour_plan_self_local_resigned) auto shardName = action->get(SHARD); auto removed = relevantShards.erase(shardName); EXPECT_EQ(removed, 1) << "We created a JOB for a shard we do not expect " << shardName; - EXPECT_EQ(action->get(THE_LEADER), ""); EXPECT_EQ(action->get(LOCAL_LEADER), ResignShardLeadership::LeaderNotYetKnownString); } } @@ -1477,7 +1474,6 @@ TEST_F(MaintenanceTestActionPhaseOne, leader_behaviour_plan_self_local_reboot) { auto shardName = action->get(SHARD); auto removed = relevantShards.erase(shardName); EXPECT_EQ(removed, 1) << "We created a JOB for a shard we do not expect " << shardName; - EXPECT_EQ(action->get(THE_LEADER), ""); EXPECT_EQ(action->get(LOCAL_LEADER), LEADER_NOT_YET_KNOWN); } } diff --git a/tests/js/client/shell/shell-following-term-id-cluster.js b/tests/js/client/shell/shell-following-term-id-cluster.js new file mode 100644 index 000000000000..cc717e819274 --- /dev/null +++ b/tests/js/client/shell/shell-following-term-id-cluster.js @@ -0,0 +1,119 @@ +/* jshint globalstrict:false, strict:false, maxlen: 200 */ +/* global fail, assertEqual, assertTrue, assertFalse, arango */ + +// ////////////////////////////////////////////////////////////////////////////// +// / @brief Test following term id behaviour. +// / +// / DISCLAIMER +// / +// / Copyright 2021 ArangoDB GmbH, Cologne, Germany +// / +// / Licensed under the Apache License, Version 2.0 (the "License") +// / you may not use this file except in compliance with the License. +// / You may obtain a copy of the License at +// / +// / http://www.apache.org/licenses/LICENSE-2.0 +// / +// / Unless required by applicable law or agreed to in writing, software +// / distributed under the License is distributed on an "AS IS" BASIS, +// / WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// / See the License for the specific language governing permissions and +// / limitations under the License. +// / +// / Copyright holder is triAGENS GmbH, Cologne, Germany +// / +// / @author Max Neunhoeffer +// ////////////////////////////////////////////////////////////////////////////// + +const _ = require('lodash'); +let jsunity = require('jsunity'); +let internal = require('internal'); +let arangodb = require('@arangodb'); +let db = arangodb.db; + +function createCollectionWithKnownLeaderAndFollower(cn) { + db._create(cn, {numberOfShards:1, replicationFactor:2}); + // Get dbserver names first: + let health = arango.GET("/_admin/cluster/health").Health; + let endpointMap = {}; + let idMap = {}; + for (let sid in health) { + endpointMap[health[sid].ShortName] = health[sid].Endpoint; + idMap[health[sid].ShortName] = sid; + } + let plan = arango.GET("/_admin/cluster/shardDistribution").results[cn].Plan; + let shard = Object.keys(plan)[0]; + let coordinator = "Coordinator0001"; + let leader = plan[shard].leader; + let follower = plan[shard].followers[0]; + return { endpointMap, idMap, coordinator, leader, follower, shard }; +} + +function switchConnectionToCoordinator(collInfo) { + arango.reconnect(collInfo.endpointMap[collInfo.coordinator], "_system", "root", ""); +} + +function switchConnectionToLeader(collInfo) { + arango.reconnect(collInfo.endpointMap[collInfo.leader], "_system", "root", ""); +} + +function switchConnectionToFollower(collInfo) { + arango.reconnect(collInfo.endpointMap[collInfo.follower], "_system", "root", ""); +} + +function followingTermIdSuite() { + 'use strict'; + const cn = 'UnitTestsFollowingTermId'; + let collInfo = {}; + + return { + + setUp: function () { + db._drop(cn); + collInfo = createCollectionWithKnownLeaderAndFollower(cn); + }, + + tearDown: function () { + db._drop(cn); + }, + + testFollowingTermIdSuite: function() { + // We have a shard whose leader and follower is known to us. + + // Let's insert some documents: + let c = db._collection(cn); + for (let i = 0; i < 100; ++i) { + c.insert({Hallo:i}); + } + + // Now check that both leader and follower have 100 documents: + switchConnectionToLeader(collInfo); + assertEqual(100, db._collection(collInfo.shard).count()); + switchConnectionToFollower(collInfo); + assertEqual(100, db._collection(collInfo.shard).count()); + + // Try to insert a document with the leaderId: + let res = arango.POST(`/_api/document/${collInfo.shard}?isSynchronousReplication=${collInfo.idMap[collInfo.leader]}`, {Hallo:101}); + assertTrue(res.error); + assertEqual(406, res.code); + assertEqual(1490, res.errorNum); + + switchConnectionToCoordinator(collInfo); + + // Now insert another document: + c.insert({Hallo:101}); + + // And check that both leader and follower have 101 documents: + switchConnectionToLeader(collInfo); + assertEqual(101, db._collection(collInfo.shard).count()); + switchConnectionToFollower(collInfo); + assertEqual(101, db._collection(collInfo.shard).count()); + + switchConnectionToCoordinator(collInfo); + }, + + }; +} + +jsunity.run(followingTermIdSuite); +return jsunity.done();