8000 [R2] Log Multiplexer by maierlars · Pull Request #14667 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[R2] Log Multiplexer #14667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixing recursive waitFor calls.
  • Loading branch information
maierlars committed Aug 19, 2021
commit f939ee88334dc98abe7f577a5a1c6b1d775562e9
107 changes: 61 additions & 46 deletions arangod/Replication2/ReplicatedLog/LogFollower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

#include "Replication2/ReplicatedLog/Algorithms.h"
#include "Replication2/ReplicatedLog/LogContextKeys.h"
#include "Replication2/ReplicatedLog/ReplicatedLogIterator.h"
#include "Replication2/ReplicatedLog/LogStatus.h"
#include "Replication2/ReplicatedLog/NetworkMessages.h"
#include "Replication2/ReplicatedLog/PersistedLog.h"
#include "Replication2/ReplicatedLog/ReplicatedLogIterator.h"
#include "Replication2/ReplicatedLog/ReplicatedLogMetrics.h"
#include "RestServer/Metrics.h"

Expand Down Expand Up @@ -95,8 +95,7 @@ auto LogFollower::appendEntriesPreFlightChecks(GuardedFollowerData const& data,

// It is always allowed to replace the log entirely
if (req.prevLogEntry.index > LogIndex{0}) {
if (auto conflict =
algorithms::detectConflict(data._inMemoryLog, req.prevLogEntry);
if (auto conflict = algorithms::detectConflict(data._inMemoryLog, req.prevLogEntry);
conflict.has_value()) {
auto [reason, next] = *conflict;

Expand All @@ -115,7 +114,6 @@ auto replicated_log::LogFollower::appendEntries(AppendEntriesRequest req)

auto self = _guardedFollowerData.getLockedGuard();


{
// Preflight checks - does the leader, log and other stuff match?
// This code block should not modify the local state, only check values.
Expand All @@ -133,7 +131,8 @@ auto replicated_log::LogFollower::appendEntries(AppendEntriesRequest req)
// as a copy, then modify the log on disk. This is an atomic operation. If
// it fails, we forget the new state. Otherwise we replace the old in memory
// state with the new value.
auto newInMemoryLog = self->_inMemoryLog.takeSnapshotUpToAndIncluding(req.prevLogEntry.index);
auto newInMemoryLog =
self->_inMemoryLog.takeSnapshotUpToAndIncluding(req.prevLogEntry.index);

if (self->_inMemoryLog.getLastIndex() != req.prevLogEntry.index) {
auto res = self->_logCore->removeBack(req.prevLogEntry.index + 1);
Expand All @@ -149,30 +148,16 @@ auto replicated_log::LogFollower::appendEntries(AppendEntriesRequest req)
self->_inMemoryLog = std::move(newInMemoryLog);
}

struct WaitForQueueResolve {
using QueueGuard = Guarded<WaitForQueue, basics::UnshackledMutex>::mutex_guard_type;

WaitForQueueResolve(QueueGuard guard, LogIndex commitIndex) noexcept
: _guard(std::move(guard)),
begin(_guard->begin()),
end(_guard->upper_bound(commitIndex)) {}

QueueGuard _guard;
WaitForQueue::iterator begin;
WaitForQueue::iterator end;
};

// Allocations
auto newInMemoryLog = self->_inMemoryLog.append(_loggerContext, req.entries);
auto iter = std::make_unique<InMemoryPersistedLogIterator>(req.entries);
auto toBeResolvedPtr = std::make_unique<std::optional<WaitForQueueResolve>>();
auto toBeResolved = std::make_unique<WaitForQueue>();

auto* core = self->_logCore.get();
static_assert(std::is_nothrow_move_constructible_v<decltype(newInMemoryLog)>);
auto commitToMemoryAndResolve =
[selfGuard = std::move(self), req = std::move(req),
newInMemoryLog = std::move(newInMemoryLog),
toBeResolvedPtr = std::move(toBeResolvedPtr)](
newInMemoryLog = std::move(newInMemoryLog), toBeResolved = std::move(toBeResolved)](
futures::Try<Result>&& tryRes) mutable -> std::pair<AppendEntriesResult, DeferredAction> {
// We have to release the guard after this lambda is finished.
// Otherwise it would be released when the lambda is destroyed, which
Expand Down Expand Up @@ -203,26 +188,52 @@ auto replicated_log::LogFollower::appendEntries(AppendEntriesRequest req)
<< req.prevLogEntry.index << ", leader commit index = " << req.leaderCommit;
}

auto action = std::invoke([&]() noexcept -> DeferredAction {
if (self->_commitIndex < req.leaderCommit && !self->_inMemoryLog.empty()) {
self->_commitIndex =
std::min(req.leaderCommit, self->_inMemoryLog.back().entry().logIndex());
LOG_CTX("1641d", TRACE, self->_follower._loggerContext)
<< "increment commit index: " << self->_commitIndex;
auto const generateToBeResolved = [&] {
try {
auto waitForQueue = self->_waitForQueue.getLockedGuard();

auto toBeResolved = std::optional<WaitForQueueResolve>{std::in_place, self->_waitForQueue.getLockedGuard(), self->_commitIndex};
static_assert(std::is_nothrow_move_assignable_v<std::optional<WaitForQueueResolve>>);
*toBeResolvedPtr = std::move(toBeResolved);
auto const end = waitForQueue->upper_bound(self->_commitIndex);
for (auto it = waitForQueue->begin(); it != end;) {
LOG_CTX("37d9c", TRACE, self->_follower._loggerContext)
<< "resolving promise for index " << it->first;
toBeResolved->insert(waitForQueue->extract(it++));
}
return DeferredAction([commitIndex = self->_commitIndex,
toBeResolved = std::move(toBeResolvedPtr)]() noexcept {
auto& resolve = toBeResolved->value();
for (auto it = resolve.begin; it != resolve.end; it = resolve._guard->erase(it)) {
if (!it->second.isFulfilled()) {
toBeResolved = std::move(toBeResolved)]() noexcept {
for (auto& it : *toBeResolved) {
if (!it.second.isFulfilled()) {
// This only throws if promise was fulfilled earlier.
it->second.setValue(WaitForResult{commitIndex, std::shared_ptr<QuorumData>{}});
it.second.setValue(WaitForResult{commitIndex, std::shared_ptr<QuorumData>{}});
}
}
});
} catch (std::exception const& e) {
// If those promises are not fulfilled we can not continue.
// Note that the move constructor of std::multi_map is not noexcept.
LOG_CTX("e7a4d", FATAL, self->_follower._loggerContext)
<< "failed to fulfill replication promises due to exception; "
"system "
"can not continue. message: "
<< e.what();
FATAL_ERROR_EXIT();
} catch (...) {
// If those promises are not fulfilled we can not continue.
// Note that the move constructor of std::multi_map is not noexcept.
LOG_CTX("c0bbb", FATAL, self->_follower._loggerContext)
<< "failed to fulfill replication promises due to exception; "
"system "
"can not continue";
FATAL_ERROR_EXIT();
}
};

auto action = std::invoke([&]() noexcept -> DeferredAction {
if (self->_commitIndex < req.leaderCommit && !self->_inMemoryLog.empty()) {
self->_commitIndex =
std::min(req.leaderCommit, self->_inMemoryLog.back().entry().logIndex());
LOG_CTX("1641d", TRACE, self->_follower._loggerContext)
<< "increment commit index: " << self->_commitIndex;
return generateToBeResolved();
}

return {};
Expand All @@ -241,17 +252,19 @@ auto replicated_log::LogFollower::appendEntries(AppendEntriesRequest req)
.then(std::move(commitToMemoryAndResolve))
.then([measureTime = std::move(measureTimeGuard)](auto&& res) mutable {
measureTime.fire();
auto&& [result, toBeResolved] = res.get();
// It is okay to fire here, because commitToMemoryAndResolve has released
// the guard already.
toBeResolved.fire();
auto&& [result, action] = res.get();
// It is okay to fire here, because commitToMemoryAndResolve has
// released the guard already.
action.fire();
return std::move(result);
});
}

replicated_log::LogFollower::GuardedFollowerData::GuardedFollowerData(
LogFollower const& self, std::unique_ptr<LogCore> logCore, InMemoryLog inMemoryLog)
: _follower(self), _inMemoryLog(std::move(inMemoryLog)), _logCore(std::move(logCore)) {}
: _follower(self),
_inMemoryLog(std::move(inMemoryLog)),
_logCore(std::move(logCore)) {}

auto replicated_log::LogFollower::getStatus() const -> LogStatus {
return _guardedFollowerData.doUnderLock([this](auto const& followerData) {
Expand Down Expand Up @@ -336,17 +349,19 @@ auto replicated_log::LogFollower::waitFor(LogIndex idx)
auto replicated_log::LogFollower::waitForIterator(LogIndex index)
-> replicated_log::ILogParticipant::WaitForIteratorFuture {
if (index == LogIndex{0}) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "invalid parameter; log index 0 is invalid");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"invalid parameter; log index 0 is invalid");
}
return waitFor(index).thenValue([this, self = shared_from_this(), index](auto&& quorum) -> WaitForIteratorFuture {
return waitFor(index).thenValue([this, self = shared_from_this(),
index](auto&& quorum) -> WaitForIteratorFuture {
auto [fromIndex, iter] = _guardedFollowerData.doUnderLock(
[&](GuardedFollowerData& followerData) -> std::pair<LogIndex, std::unique_ptr<LogRangeIterator>> {
TRI_ASSERT(index <= followerData._commitIndex);

/*
* This code here ensures that if only private log entries are present
* we do not reply with an empty iterator but instead wait for the next
* entry containing payload.
* we do not reply with an empty iterator but instead wait for the
* next entry containing payload.
*/

auto actualIndex = index;
Expand Down Expand Up @@ -378,7 +393,7 @@ auto replicated_log::LogFollower::waitForIterator(LogIndex index)
}

auto replicated_log::LogFollower::getLogIterator(LogIndex firstIndex) const
-> std::unique_ptr<LogIterator> {
-> std::unique_ptr<LogIterator> {
return _guardedFollowerData.doUnderLock(
[&](GuardedFollowerData const& data) -> std::unique_ptr<LogIterator> {
auto const endIdx = data._inMemoryLog.getLastTermIndexPair().index + 1;
Expand All @@ -388,7 +403,7 @@ auto replicated_log::LogFollower::getLogIterator(LogIndex firstIndex) const
}

auto replicated_log::LogFollower::getCommittedLogIterator(LogIndex firstIndex) const
-> std::unique_ptr<LogIterator> {
-> std::unique_ptr<LogIterator> {
return _guardedFollowerData.doUnderLock(
[&](GuardedFollowerData const& data) -> std::unique_ptr<LogIterator> {
return data.getCommittedLogIterator(firstIndex);
Expand Down
5 changes: 1 addition & 4 deletions tests/Replication2/Streams/LogDemultiplexerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ TEST_F(LogDemuxTest, leader_follower_test) {

auto mux = LogMultiplexer<MyTestSpecification>::construct(leader);
auto demux = LogDemultiplexer<MyTestSpecification>::construct(follower);
demux->listen();

auto leaderStreamA = mux->getStreamBaseById<my_int_stream_id>();
auto leaderStreamB = mux->getStreamBaseById<my_string_stream_id>();
Expand All @@ -102,10 +103,6 @@ TEST_F(LogDemuxTest, leader_follower_test) {

auto futureA = followerStreamA->waitFor(LogIndex{2});
auto futureB = followerStreamB->waitFor(LogIndex{1});
ASSERT_FALSE(futureA.isReady());
ASSERT_FALSE(futureB.isReady());

demux->listen();
ASSERT_TRUE(futureA.isReady());
ASSERT_TRUE(futureB.isReady());

Expand Down
0