8000 Following term id by neunhoef · Pull Request #14405 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Following term id #14405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e8e551b
First stab at following term id.
neunhoef Jun 25, 2021
79fbdf4
Also handle the planned leader change case.
neunhoef Jun 25, 2021
4fc887c
CHANGELOG.
neunhoef Jun 25, 2021
7c67605
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 28, 2021
5be1fd3
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 28, 2021
979a956
Remove bug introduced by merge.
neunhoef Jun 28, 2021
df19281
Fix windows compilation issue.
neunhoef Jun 28, 2021
cc931fb
Fix SyncCollectionFinalize.
neunhoef Jun 28, 2021
df4d679
Fix all setTheLeader places.
neunhoef Jun 28, 2021
6aa7a8a
Fix getting in sync.
neunhoef Jun 28, 2021
b66b361
Another try to get configuration of following in shard right.
neunhoef Jun 29, 2021
de5ff72
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 29, 2021
304cc67
Apply suggestions from code review
neunhoef Jun 29, 2021
ebf6012
Fix compilation.
neunhoef Jun 29, 2021
0b77428
Improve comments.
neunhoef Jun 29, 2021
45a9336
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 30, 2021
8b3e335
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 30, 2021
9051010
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jul 1, 2021
aaccae5
Concurrency fixes.
neunhoef Jul 1, 2021
30370c5
Merge branch 'bug-fix/following-term-id' of ssh://github.com/arangodb…
neunhoef Jul 1, 2021
cd3715c
Add test.
neunhoef Jul 1, 2021
e389c23
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jul 5, 2021
d3985e9
Merge branch 'devel' into bug-fix/following-term-id
neunhoef Jul 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
devel
-----

* Add following term ids, which prevents old synchronous replication requests
to be accepted after a follower was dropped and has gotten in sync again.
This makes the chaos tests which delay synchronous replication requests
more reliable and prevent inconsistent shard replicas under bad network
conditions.

* Enable process metrics on agent instances by default. Previously, some
metrics (including the metrics starting with `arangodb_process` prefix) were
not returned by agent instances.
Expand Down
2 changes: 1 addition & 1 deletion arangod/Cluster/CreateCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ bool CreateCollection::first() {
std::vector<std::string> noFollowers;
col->followers()->takeOverLeadership(noFollowers, nullptr);
} else {
col->followers()->setTheLeader(leader);
col->followers()->setTheLeader(LEADER_NOT_YET_KNOWN);
}
}

Expand Down
34 changes: 34 additions & 0 deletions arangod/Cluster/FollowerInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Random/RandomGenerator.h"
#include "VocBase/LogicalCollection.h"

using namespace arangodb;
Expand Down Expand Up @@ -542,3 +543,36 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const {
}
return newValue;
}

uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept {
WRITE_LOCKER(guard, _dataLock);
uint64_t i = 0;
uint64_t prev = 0;
auto it = _followingTermId.find(s);
if (it != _followingTermId.end()) {
prev = it->second;
}
// We want the random number to be non-zero and different from a previous one:
do {
i = RandomGenerator::interval(UINT64_MAX);
} while (i == 0 || i == prev);
try {
_followingTermId[s] = i;
} catch(std::bad_alloc const&) {
i = 1; // I assume here that I do not get bad_alloc if the key is
// already in the map, since it then only has to overwrite
// an integer, if the key is not in the map, we default to 1.
}
return i;
}

uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) const noexcept {
READ_LOCKER(guard, _dataLock);
// Note that we assume that find() does not throw!
auto it = _followingTermId.find(s);
if (it == _followingTermId.end()) {
// If not found, we use the default from above:
return 1;
}
return it->second;
}
36 changes: 36 additions & 0 deletions arangod/Cluster/FollowerInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ class FollowerInfo {
// soon as we can guarantee at least so many followers locally.
std::shared_ptr<std::vector<ServerID>> _failoverCandidates;

// The following map holds a random number for each follower, this
// random number is given and sent to the follower when it gets in sync
// (actually, when it acquires the exclusive lock to get in sync), and is
// then subsequently sent alongside every synchronous replication
// request. If the number does not match, the follower will refuse the
// replication request. This is to ensure that replication requests cannot
// be delayed into the "next" leader/follower relationship.
// And here is the proof that this works: The exclusive lock divides the
// write operations between "before" and "after". The id is changed
// when the exclusive lock is established. Therefore, it is OK for the
// replication requests to send along the "current" id, directly
// from this map.
std::unordered_map<ServerID, uint64_t> _followingTermId;

// The agencyMutex is used to synchronise access to the agency.
// the _dataLock is used to sync the access to local data.
// The _canWriteLock is used to protect flag if we do have enough followers
Expand Down Expand Up @@ -135,6 +149,28 @@ class FollowerInfo {

Result remove(ServerID const& s);

//////////////////////////////////////////////////////////////////////////////
/// @brief for each run of the "get-in-sync" protocol we generate a
/// random number to identify this "following term". This is created
/// when the follower fetches the exclusive lock to finally get in sync
/// and is stored in _followingTermId, so that it can be forwarded with
/// each synchronous replication request. The follower can then decline
/// the replication in case it is not "in the same term".
//////////////////////////////////////////////////////////////////////////////

uint64_t newFollowingTermId(ServerID const& s) noexcept;

//////////////////////////////////////////////////////////////////////////////
/// @brief for each run of the "get-in-sync" protocol we generate a
/// random number to identify this "following term". This is created
/// when the follower fetches the exclusive lock to finally get in sync
/// and is stored in _followingTermId, so that it can be forwarded with
/// each synchronous replication request. The follower can then decline
/// the replication in case it is not "in the same term".
//////////////////////////////////////////////////////////////////////////////

uint64_t getFollowingTermId(ServerID const& s) const noexcept;

//////////////////////////////////////////////////////////////////////////////
/// @brief clear follower list, no changes in agency necesary
//////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 0 additions & 1 deletion arangod/Cluster/Maintenance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ void handlePlanShard(StorageEngine& engine, uint64_t planIndex, VPackSlice const
{DATABASE, dbname},
{COLLECTION, colname},
{SHARD, shname},
{THE_LEADER, std::string()},
{LOCAL_LEADER, std::string(localLeader)},
{OLD_CURRENT_COUNTER, "0"}, // legacy, no longer used
{PLAN_RAFT_INDEX, std::to_string(planIndex)}},
Expand Down
74 changes: 64 additions & 10 deletions arangod/Cluster/SynchronizeShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,38 @@ std::string const TTL("ttl");

using namespace std::chrono;

// Overview over the code in this file:
// The main method being called is "first", it does:
// first:
// - wait until leader has created shard
// - lookup local shard
// - call `replicationSynchronize`
// - call `catchupWithReadLock`
// - call `catchupWithExclusiveLock`
// replicationSynchronize:
// - set local shard to follow leader (without a following term id)
// - use a `DatabaseInitialSyncer` to synchronize to a certain state,
// (configure leaderId for it to go through)
// catchupWithReadLock:
// - start a read lock on leader
// - keep configuration for shard to follow the leader without term id
// - call `replicationSynchronizeCatchup` (WAL tailing, configure leaderId
// for it to go through)
// - cancel read lock on leader
// catchupWithExclusiveLock:
// - start an exclusive lock on leader, acquire unique following term id
// - set local shard to follower leader (with new following term id)
// - call `replicationSynchronizeFinalize` (WAL tailing)
// - do a final check by comparing counts on leader and follower
// - add us as official follower on the leader
// - release exclusive lock on leader
//

SynchronizeShard::SynchronizeShard(MaintenanceFeature& feature, ActionDescription const& desc)
: ActionBase(feature, desc),
ShardDefinition(desc.get(DATABASE), desc.get(SHARD)),
_leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()) {
_leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()),
_followingTermId(0) {
std::stringstream error;

if (!desc.has(COLLECTION)) {
Expand Down Expand Up @@ -416,7 +444,7 @@ arangodb::Result SynchronizeShard::getReadLock(
// nullptr only happens during controlled shutdown
if (pool == nullptr) {
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN,
"cancelReadLockOnLeader: Shutting down");
"getReadLock: Shutting down");
}

VPackBuilder body;
Expand Down Expand Up @@ -445,6 +473,16 @@ arangodb::Result SynchronizeShard::getReadLock(

if (res.ok()) {
// Habemus clausum, we have a lock
if (!soft) {
// Now store the random followingTermId:
VPackSlice body = response.response().slice();
if (body.isObject()) {
VPackSlice followingTermIdSlice = body.get(StaticStrings::FollowingTermId);
if (followingTermIdSlice.isNumber()) {
_followingTermId = followingTermIdSlice.getNumber<uint64_t>();
}
}
}
return arangodb::Result();
}

Expand Down Expand Up @@ -524,6 +562,7 @@ static arangodb::ResultT<SyncerId> replicationSynchronize(
auto syncer = DatabaseInitialSyncer::create(vocbase, configuration);

if (!leaderId.empty()) {
// In this phase we use the normal leader ID without following term id:
syncer->setLeaderId(leaderId);
}

Expand Down Expand Up @@ -599,6 +638,8 @@ static arangodb::Result replicationSynchronizeCatchup(
auto syncer = DatabaseTailingSyncer::create(guard.database(), configuration, fromTick, /*useTick*/true);

if (!leaderId.empty()) {
// In this phase we still use the normal leaderId without a following
// term id:
syncer->setLeaderId(leaderId);
}

Expand All @@ -624,10 +665,10 @@ static arangodb::Result replicationSynchronizeCatchup(

static arangodb::Result replicationSynchronizeFinalize(SynchronizeShard const& job,
10000 application_features::ApplicationServer& server,
VPackSlice const& conf) {
VPackSlice const& conf,
std::string const& leaderId) {
auto const database = conf.get(DATABASE).copyString();
auto const collection = conf.get(COLLECTION).copyString();
auto const leaderId = conf.get(LEADER_ID).copyString();
auto const fromTick = conf.get("from").getNumber<uint64_t>();

ReplicationApplierConfiguration configuration =
Expand Down Expand Up @@ -935,6 +976,10 @@ bool SynchronizeShard::first() {

auto details = std::make_shared<VPackBuilder>();

// Configure the shard to follow the leader without any following
// term id:
collection->followers()->setTheLeader(leader);

ResultT<SyncerId> syncRes =
replicationSynchronize(*this, collection, config.slice(), details);

Expand Down Expand Up @@ -1165,6 +1210,19 @@ Result SynchronizeShard::catchupWithExclusiveLock(
}
});

// Now we have got a unique id for this following term and have stored it
// in _followingTermId, so we can use it to set the leader:

// This is necessary to accept replications from the leader which can
// happen as soon as we are in sync.
std::string leaderIdWithTerm{leader};
if (_followingTermId != 0) {
leaderIdWithTerm += "_";
leaderIdWithTerm += basics::StringUtils::itoa(_followingTermId);
}
// If _followingTermid is 0, then this is a leader before the update,
// we tolerate this and simply use its ID without a term in this case.
collection.followers()->setTheLeader(leaderIdWithTerm);
LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId;

builder.clear();
Expand All @@ -1173,13 +1231,13 @@ Result SynchronizeShard::catchupWithExclusiveLock(
builder.add(ENDPOINT, VPackValue(ep));
builder.add(DATABASE, VPackValue(getDatabase()));
builder.add(COLLECTION, VPackValue(getShard()));
builder.add(LEADER_ID, VPackValue(leader));
builder.add(LEADER_ID, VPackValue(leaderIdWithTerm));
builder.add("from", VPackValue(lastLogTick));
builder.add("requestTimeout", VPackValue(600.0));
builder.add("connectTimeout", VPackValue(60.0));
}

res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice());
res = replicationSynchronizeFinalize(*this, feature().server(), builder.slice(), leaderIdWithTerm);

if (!res.ok()) {
std::string errorMessage(
Expand All @@ -1188,10 +1246,6 @@ Result SynchronizeShard::catchupWithExclusiveLock(
return {TRI_ERROR_INTERNAL, errorMessage};
}

// This is necessary to accept replications from the leader which can
// happen as soon as we are in sync.
collection.followers()->setTheLeader(leader);

NetworkFeature& nf = _feature.server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
res = addShardFollower(pool, ep, getDatabase(), getShard(), lockJobId, clientId,
Expand Down
1 change: 1 addition & 0 deletions arangod/Cluster/SynchronizeShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class SynchronizeShard : public ActionBase, public ShardDefinition {

/// @brief information about the leader, reused across multiple replication steps
arangodb::replutils::LeaderInfo _leaderInfo;
uint64_t _followingTermId;
};

} // namespace maintenance
Expand Down
Loading
0