10BC0 [R2] Remove Dead WaitForQueue Code by maierlars · Pull Request #14858 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions arangod/Replication2/ReplicatedLog/ILogParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,57 +87,3 @@ replicated_log::WaitForResult::WaitForResult(velocypack::Slice s) {
currentCommitIndex = s.get(StaticStrings::CommitIndex).extract<LogIndex>();
quorum = std::make_shared<QuorumData>(s.get("quorum"));
}

auto ::arangodb::replication2::replicated_log::assertQueueNotEmptyOrTryToClear(
TryToClearParticipant participant, LoggerContext const& loggerContext,
std::multimap<LogIndex, futures::Promise<WaitForResult>>& queue) noexcept -> TryToClearResult {
auto const [lcParticipant, ucParticipant, resignError] = std::invoke(
[participant]() -> std::tuple<std::string_view, std::string_view, ErrorCode> {
switch (participant) {
case TryToClearParticipant::Leader:
return {"leader", "Leader", TRI_ERROR_REPLICATION_REPLICATED_LOG_LEADER_RESIGNED};
case TryToClearParticipant::Follower:
return {"follower", "Follower",
TRI_ERROR_REPLICATION_REPLICATED_LOG_FOLLOWER_RESIGNED};
}
FATAL_ERROR_ABORT();
});
auto result = TryToClearResult::NoProgress;
try {
// The queue cannot be empty: resign() clears it while under the mutex,
// and waitFor also holds the mutex, but refuses to add entries after
// the leader resigned.
// This means it should never happen in production code.
// It used to happen in the C++ unit tests, but shouldn't any more since
// ~ReplicatedLog resigns the participant (if it wasn't dropped already).
TRI_ASSERT(queue.empty());

if (!queue.empty()) {
LOG_CTX("c1138", ERR, loggerContext)
<< ucParticipant << " destroyed, but queue isn't empty!";
for (auto it = queue.begin(); it != queue.end(); ) {
if (!it->second.isFulfilled()) {
it->second.setException(basics::Exception(resignError, __FILE__, __LINE__));
} else {
LOG_CTX("a1db0", ERR, loggerContext)
<< "Fulfilled promise in replication queue!";
}
it = queue.erase(it);
result = TryToClearResult::Partial;
}
}
result = TryToClearResult::Cleared;
} catch (basics::Exception const& exception) {
LOG_CTX("c546f", ERR, loggerContext)
<< "Caught exception while destroying a log " << lcParticipant << ": "
<< exception.message();
} catch (std::exception const& exception) {
LOG_CTX("61f0d", ERR, loggerContext)
<< "Caught exception while destroying a log " << lcParticipant << ": "
<< exception.what();
} catch (...) {
LOG_CTX("338c9", ERR, loggerContext)
<< "Caught unknown exception while destroying a log " << lcParticipant << "!";
}
return result;
}
15 changes: 0 additions & 15 deletions arangod/Replication2/ReplicatedLog/ILogParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,4 @@ struct LogUnconfiguredParticipant final
std::unique_ptr<LogCore> _logCore;
std::shared_ptr<ReplicatedLogMetrics> const _logMetrics;
};

// whether the assertQueueNotEmptyOrTryToClear is called from a leader or follower,
// to give more precise information in log messages and error codes only.
enum class TryToClearParticipant { Leader, Follower };
// How much progress was made clearing the queue.
// NoProgress means what it says, and Cleared means the queue was completely
// cleared. Partial means that at least one entry could be erased.
enum class TryToClearResult { NoProgress, Partial, Cleared };
// Asserts that the queue is not empty in maintainer mode. Otherwise, tries
// to clear it and reports the progress. Will log and return on error, which
// will result in either NoProgress or Partial return value.
extern auto assertQueueNotEmptyOrTryToClear(
TryToClearParticipant participant, LoggerContext const& loggerContext,
std::multimap<LogIndex, futures::Promise<WaitForResult>>& queue) noexcept -> TryToClearResult;

} // namespace arangodb::replication2::replicated_log
36 changes: 7 additions & 29 deletions arangod/Replication2/ReplicatedLog/LogFollower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,35 +444,13 @@ auto replicated_log::LogFollower::GuardedFollowerData::getCommittedLogIterator(L

replicated_log::LogFollower::~LogFollower() {
_logMetrics->replicatedLogFollowerNumber->fetch_sub(1);
tryHardToClearQueue();
}

auto replicated_log::LogFollower::tryHardToClearQueue() noexcept -> void {
bool finished = false;
auto consecutiveTriesWithoutProgress = 0;
do {
++consecutiveTriesWithoutProgress;
auto followerDataGuard = _guardedFollowerData.getLockedGuard();
auto queueGuard = followerDataGuard->_waitForQueue.getLockedGuard();
auto& queue = queueGuard.get();
switch (assertQueueNotEmptyOrTryToClear(TryToClearParticipant::Follower,
_loggerContext, queue)) {
case TryToClearResult::NoProgress:
break;
case TryToClearResult::Partial:
consecutiveTriesWithoutProgress = 0;
break;
case TryToClearResult::Cleared:
finished = true;
break;
}
if (!finished && consecutiveTriesWithoutProgress > 10) {
LOG_CTX("a27a7", FATAL, _loggerContext)
<< "We keep failing at destroying a log follower instance. Giving up "
"now.";
FATAL_ERROR_EXIT();
}
} while (!finished);
if (auto queueEmpty =
_guardedFollowerData.getLockedGuard()->_waitForQueue.getLockedGuard()->empty();
!queueEmpty) {
TRI_ASSERT(false) << "expected wait-for-queue to be empty";
LOG_CTX("ce7f8", ERR, _loggerContext)
<< "expected wait-for-queue to be empty";
}
}

auto LogFollower::release(LogIndex doneWithIdx) -> Result {
Expand Down
2 changes: 0 additions & 2 deletions arangod/Replication2/ReplicatedLog/LogFollower.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ class LogFollower final : public ILogParticipant,
[[nodiscard]] auto appendEntriesPreFlightChecks(GuardedFollowerData const&,
AppendEntriesRequest const&) const noexcept
-> std::optional<AppendEntriesResult>;

auto tryHardToClearQueue() noexcept -> void;
};

} // namespace arangodb::replication2::replicated_log
31 changes: 4 additions & 27 deletions arangod/Replication2/ReplicatedLog/LogLeader.cpp
2 changes: 0 additions & 2 deletions arangod/Replication2/ReplicatedLog/LogLeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,33 +98,10 @@ replicated_log::LogLeader::LogLeader(LoggerContext logContext,

replicated_log::LogLeader::~LogLeader() {
_logMetrics->replicatedLogLeaderNumber->fetch_sub(1);
tryHardToClearQueue();
}

auto replicated_log::LogLeader::tryHardToClearQueue() noexcept -> void {
bool finished = false;
auto consecutiveTriesWithoutProgress = 0;
do {
++consecutiveTriesWithoutProgress;
auto leaderDataGuard = acquireMutex();
auto& queue = leaderDataGuard->_waitForQueue;
switch (assertQueueNotEmptyOrTryToClear(TryToClearParticipant::Leader, _logContext, queue)) {
case TryToClearResult::NoProgress:
break;
case TryToClearResult::Partial:
consecutiveTriesWithoutProgress = 0;
break;
case TryToClearResult::Cleared:
finished = true;
break;
}
if (!finished && consecutiveTriesWithoutProgress > 10) {
LOG_CTX("d5d25", FATAL, _logContext)
<< "We keep failing at destroying a log leader instance. Giving up "
"now.";
FATAL_ERROR_EXIT();
}
} while (!finished);
if (auto queueEmpty = _guardedLeaderData.getLockedGuard()->_waitForQueue.empty(); !queueEmpty) {
TRI_ASSERT(false) << "expected wait-for-queue to be empty";
LOG_CTX("ce7f7", ERR, _logContext) << "expected wait-for-queue to be empty";
}
}

auto replicated_log::LogLeader::instantiateFollowers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ class LogLeader : public std::enable_shared_from_this<LogLeader>, public ILogPar
std::shared_ptr<ReplicatedLogMetrics> const& logMetrics);
static void handleResolvedPromiseSet(ResolvedPromiseSet set,
std::shared_ptr<ReplicatedLogMetrics> const& logMetrics);

auto tryHardToClearQueue() noexcept -> void;
};

} // namespace arangodb::replication2::replicated_log
8 changes: 3 additions & 5 deletions arangod/Replication2/Streams/StreamInformationBlock.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,10 @@ auto StreamInformationBlock<stream_descriptor<Id, Type, Tags>>::getIteratorRange
explicit Iterator(ContainerType log, LogIndex start, LogIndex stop)
: _log(std::move(log)),
_current(std::lower_bound(std::begin(_log), std::end(_log), start,
[](StreamEntry<Type> const& left, LogIndex index) {
return left.first < index;
})),
// cppcheck-suppress selfInitialization
[](StreamEntry<Type> const& left, LogIndex index) {
return left.first < index;
})),
_start(start),
// cppcheck-suppress selfInitialization
_stop(stop) {}
};
return std::make_unique<Iterator>(std::move(log), start, stop);
Expand Down
0