8000 Following term id by neunhoef · Pull Request #14405 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Following term id #14405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e8e551b
First stab at following term id.
neunhoef Jun 25, 2021
79fbdf4
Also handle the planned leader change case.
neunhoef Jun 25, 2021
4fc887c
CHANGELOG.
neunhoef Jun 25, 2021
7c67605
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 28, 2021
5be1fd3
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 28, 2021
979a956
Remove bug introduced by merge.
neunhoef Jun 28, 2021
df19281
Fix windows compilation issue.
neunhoef Jun 28, 2021
cc931fb
Fix SyncCollectionFinalize.
neunhoef Jun 28, 2021
df4d679
Fix all setTheLeader places.
neunhoef Jun 28, 2021
6aa7a8a
Fix getting in sync.
neunhoef Jun 28, 2021
b66b361
Another try to get configuration of following in shard right.
neunhoef Jun 29, 2021
de5ff72
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 29, 2021
304cc67
Apply suggestions from code review
neunhoef Jun 29, 2021
ebf6012
Fix compilation.
neunhoef Jun 29, 2021
0b77428
Improve comments.
neunhoef Jun 29, 2021
45a9336
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 30, 2021
10000
8b3e335
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jun 30, 2021
9051010
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jul 1, 2021
aaccae5
Concurrency fixes.
neunhoef Jul 1, 2021
30370c5
Merge branch 'bug-fix/following-term-id' of ssh://github.com/arangodb…
neunhoef Jul 1, 2021
cd3715c
Add test.
neunhoef Jul 1, 2021
e389c23
Merge remote-tracking branch 'origin/devel' into bug-fix/following-te…
neunhoef Jul 5, 2021
d3985e9
Merge branch 'devel' into bug-fix/following-term-id
neunhoef Jul 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
First stab at following term id.
  • Loading branch information
neunhoef committed Jun 25, 2021
commit e8e551b36b614bacf859b8c6fee7dd9fc621fa27
23 changes: 23 additions & 0 deletions arangod/Cluster/FollowerInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Random/RandomGenerator.h"
#include "VocBase/LogicalCollection.h"

using namespace arangodb;
Expand Down Expand Up @@ -542,3 +543,25 @@ VPackBuilder FollowerInfo::newShardEntry(VPackSlice oldValue) const {
}
return newValue;
}

uint64_t FollowerInfo::newFollowingTermId(ServerID const& s) noexcept {
uint64_t i = RandomGenerator::interval(UINT64_MAX);
try {
_followingTermId[s] = i;
} catch(std::bad_alloc const& exc) {
i = 0; // I assume here that I do not get bad_alloc if the key is
// already in the map, since it then only has to overwrite
// an integer, if the key is not in the map, we default to 0.
}
return i;
}

uint64_t FollowerInfo::getFollowingTermId(ServerID const& s) noexcept {
// Note that we assume that find() does not throw!
auto it = _followingTermId.find(s);
if (it == _followingTermId.end()) {
return 0;
}
return it->second;
}

36 changes: 36 additions & 0 deletions arangod/Cluster/FollowerInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ class FollowerInfo {
// soon as we can guarantee at least so many followers locally.
std::shared_ptr<std::vector<ServerID>> _failoverCandidates;

// The following map holds a random number for each follower, this
// random number is sent given to the follower when it gets in sync
// (actually, when it acquires the hard lock to get in sync), and is
// then subsequently sent alongside every synchronous replication
// request. If the number does not match, the follower will refuse the
// replication request. This is to ensure that replication requests cannot
// be delayed into the "next" leader/follower relationship.
// And here is the proof that this works: The exclusive lock divides the
// write operations between "before" and "after". The id is changed
// when the exclusive lock is established. Therefore, it is OK for the
// replication requests to send along the "current" id, directly
// from this map.
std::unordered_map<ServerID, uint64_t> _followingTermId;

// The agencyMutex is used to synchronise access to the agency.
// the _dataLock is used to sync the access to local data.
// The _canWriteLock is used to protect flag if we do have enough followers
Expand Down Expand Up @@ -135,6 +149,28 @@ class FollowerInfo {

Result remove(ServerID const& s);

//////////////////////////////////////////////////////////////////////////////
/// @brief for each run of the "get-in-sync" protocol we generate a
/// random number to identify this "following term". This is created
/// when the follower fetches the exclusive lock to finally get in sync
/// and is stored in _followingTermId, so that it can be forwarded with
/// each synchronous replication request. The follower can then decline
/// the replication in case it is not "in the same term".
//////////////////////////////////////////////////////////////////////////////

uint64_t newFollowingTermId(ServerID const& s) noexcept;

//////////////////////////////////////////////////////////////////////////////
/// @brief for each run of the "get-in-sync" protocol we generate a
/// random number to identify this "following term". This is created
/// when the follower fetches the exclusive lock to finally get in sync
/// and is stored in _followingTermId, so that it can be forwarded with
/// each synchronous replication request. The follower can then decline
/// the replication in case it is not "in the same term".
//////////////////////////////////////////////////////////////////////////////

uint64_t getFollowingTermId(ServerID const& s) noexcept;

//////////////////////////////////////////////////////////////////////////////
/// @brief clear follower list, no changes in agency necesary
//////////////////////////////////////////////////////////////////////////////
Expand Down
26 changes: 20 additions & 6 deletions arangod/Cluster/SynchronizeShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ using namespace std::chrono;
SynchronizeShard::SynchronizeShard(MaintenanceFeature& feature, ActionDescription const& desc)
: ActionBase(feature, desc),
ShardDefinition(desc.get(DATABASE), desc.get(SHARD)),
_leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()) {
_leaderInfo(arangodb::replutils::LeaderInfo::createEmpty()),
_followingTermId(0) {
std::stringstream error;

if (!desc.has(COLLECTION)) {
Expand Down Expand Up @@ -416,7 +417,7 @@ arangodb::Result SynchronizeShard::getReadLock(
// nullptr only happens during controlled shutdown
if (pool == nullptr) {
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN,
"cancelReadLockOnLeader: Shutting down");
"getReadLock: Shutting down");
}

VPackBuilder body;
Expand Down Expand Up @@ -445,6 +446,16 @@ arangodb::Result SynchronizeShard::getReadLock(

if (res.ok()) {
// Habemus clausum, we have a lock
if (!soft) {
// Now store the random followingTermId:
VPackSlice body = response.response().slice();
if (body.isObject()) {
VPackSlice followingTermIdSlice = body.get(StaticStrings::FollowingTermId);
if (followingTermIdSlice.isNumber()) {
_followingTermId = followingTermIdSlice.getNumber<uint64_t>();
}
}
}
return arangodb::Result();
}

Expand Down Expand Up @@ -913,10 +924,6 @@ bool SynchronizeShard::first() {
return false;
}

// This is necessary to accept replications from the leader which can
// happen as soon as we are in sync.
collection->followers()->setTheLeader(leader);

startTime = system_clock::now();

VPackBuilder config;
Expand Down Expand Up @@ -1169,6 +1176,13 @@ Result SynchronizeShard::catchupWithExclusiveLock(
}
});

// Now we have got a unique id for this following term and have stored it
// in _followingTermId, so we can use it to set the leader:

// This is necessary to accept replications from the leader which can
// happen as soon as we are in sync.
collection.followers()->setTheLeader(leader + "_" + basics::StringUtils::itoa(_followingTermId));

LOG_TOPIC("d76cb", DEBUG, Logger::MAINTENANCE) << "lockJobId: " << lockJobId;

builder.clear();
Expand Down
1 change: 1 addition & 0 deletions arangod/Cluster/SynchronizeShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class SynchronizeShard : public ActionBase, public ShardDefinition {

/// @brief information about the leader, reused across multiple replication steps
arangodb::replutils::LeaderInfo _leaderInfo;
uint64_t _followingTermId;
};

} // namespace maintenance
Expand Down
4 changes: 4 additions & 0 deletions arangod/RestHandler/RestReplicationHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,10 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
{
VPackObjectBuilder bb(&b);
b.add(StaticStrings::Error, VPackValue(false));
if (!serverId.empty()) {
b.add(StaticStrings::FollowingTermId,
VPackValue(col->followers()->newFollowingTermId(serverId)));
}
}

LOG_TOPIC("61a9d", DEBUG, Logger::REPLICATION)
Expand Down
26 changes: 19 additions & 7 deletions arangod/Transaction/Methods.cpp
10000
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,8 @@ Future<OperationResult> transaction::Methods::insertLocal(std::string const& cna

// Now replicate the good operations on all followers:
return replicateOperations(collection.get(), followers, options, value,
TRI_VOC_DOCUMENT_OPERATION_INSERT, resDocs)
TRI_VOC_DOCUMENT_OPERATION_INSERT, resDocs,
*collection->followers())
.thenValue([options, errs = std::move(errorCounter), resDocs](Result res) mutable {
if (!res.ok()) {
return OperationResult{std::move(res), options};
Expand Down Expand Up @@ -1493,7 +1494,7 @@ Future<OperationResult> transaction::Methods::modifyLocal(std::string const& col

// Now replicate the good operations on all followers:
return replicateOperations(collection.get(), followers, options, newValue,
operation, resDocs)
operation, resDocs, *collection->followers())
.thenValue([options, errs = std::move(errorCounter), resDocs](Result&& res) mutable {
if (!res.ok()) {
return OperationResult{std::move(res), options};
Expand Down Expand Up @@ -1704,7 +1705,8 @@ Future<OperationResult> transaction::Methods::removeLocal(std::string const& col

// Now replicate the good operations on all followers:
return replicateOperations(collection.get(), followers, options, value,
TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs)
TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs,
*collection->followers())
.thenValue([options, errs = std::move(errorCounter), resDocs](Result res) mutable {
if (!res.ok()) {
return OperationResult{std::move(res), options};
Expand Down Expand Up @@ -1892,10 +1894,15 @@ Future<OperationResult> transaction::Methods::truncateLocal(std::string const& c
network::RequestOptions reqOpts;
reqOpts.database = vocbase().name();
reqOpts.timeout = network::Timeout(600);
reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId());
reqOpts.param(StaticStrings::Compact, (options.truncateCompact ? "true" : "false"));

for (auto const& f : *followers) {
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
ServerState::instance()->getId() + "_" +
basics::StringUtils::itoa(
collection->followers()->getFollowingTermId(f)));
// reqOpts is copied deep in sendRequestRetry, so we are OK to
// change it in the loop!
network::Headers headers;
ClusterTrxMethods::addTransactionHeader(*this, f, headers);
auto future = network::sendRequestRetry(pool, "server:" + f, fuerte::RestVerb::Put,
Expand Down Expand Up @@ -2311,7 +2318,8 @@ Future<Result> Methods::replicateOperations(
std::shared_ptr<const std::vector<ServerID>> const& followerList,
OperationOptions const& options, VPackSlice const value,
TRI_voc_document_operation_e const operation,
std::shared_ptr<VPackBuffer<uint8_t>> const& ops) {
std::shared_ptr<VPackBuffer<uint8_t>> const& ops,
FollowerInfo& followerInfo) {
TRI_ASSERT(followerList != nullptr);

if (followerList->empty()) {
Expand All @@ -2323,8 +2331,6 @@ Future<Result> Methods::replicateOperations(
network::RequestOptions reqOpts;
reqOpts.database = vocbase().name();
reqOpts.param(StaticStrings::IsRestoreString, "true");
reqOpts.param(StaticStrings::IsSynchronousReplicationString, ServerState::instance()->getId());

std::string url = "/_api/document/";
url.append(arangodb::basics::StringUtils::urlEncode(collection->name()));
if (operation != TRI_VOC_DOCUMENT_OPERATION_INSERT && !value.isArray()) {
Expand Down Expand Up @@ -2417,6 +2423,12 @@ Future<Result> Methods::replicateOperations(

auto* pool = vocbase().server().getFeature<NetworkFeature>().pool();
for (auto const& f : *followerList) {
reqOpts.param(StaticStrings::IsSynchronousReplicationString,
ServerState::instance()->getId() + "_" +
basics::StringUtils::itoa(
collection->followers()->getFollowingTermId(f)));
// reqOpts is copied deep in sendRequestRetry, so we are OK to
// change it in the loop!
network::Headers headers;
ClusterTrxMethods::addTransactionHeader(*this, f, headers);
futures.emplace_back(network::sendRequestRetry(pool, "server:" + f, requestType,
Expand Down
4 changes: 3 additions & 1 deletion arangod/Transaction/Methods.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "Basics/Common.h"
#include "Basics/Exceptions.h"
#include "Basics/Result.h"
#include "Cluster/FollowerInfo.h"
#include "Futures/Future.h"
#include "Indexes/IndexIterator.h"
#include "Rest/CommonDefines.h"
Expand Down Expand Up @@ -492,7 +493,8 @@ class Methods {
LogicalCollection* collection,
std::shared_ptr<const std::vector<std::string>> const& followers,
OperationOptions const& options, VPackSlice value, TRI_voc_document_operation_e operation,
std::shared_ptr<velocypack::Buffer<uint8_t>> const& ops);
std::shared_ptr<velocypack::Buffer<uint8_t>> const& ops,
FollowerInfo& followerInfo);

private:
/// @brief transaction hints
Expand Down
1 change: 1 addition & 0 deletions lib/Basics/StaticStrings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ std::string const StaticStrings::RevisionTreeInitialRangeMin("initialRangeMin");
std::string const StaticStrings::RevisionTreeRanges("ranges");
std::string const StaticStrings::RevisionTreeResume("resume");
std::string const StaticStrings::RevisionTreeVersion("version");
std::string const StaticStrings::FollowingTermId("followingTermId");

// Generic attribute names
std::string const StaticStrings::AttrCoordinator("coordinator");
Expand Down
1 change: 1 addition & 0 deletions lib/Basics/StaticStrings.h
59F8
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class StaticStrings {
static std::string const RevisionTreeRanges;
static std::string const RevisionTreeResume;
static std::string const RevisionTreeVersion;
static std::string const FollowingTermId;

// generic attribute names
static std::string const AttrCoordinator;
Expand Down
0