8000 Bug fix/agency restart after compaction and holes in log by kvahed · Pull Request #3413 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Bug fix/agency restart after compaction and holes in log #3413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions arangod/Agency/Agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,19 @@ bool Agent::prepareLead() {
/// Becoming leader
void Agent::lead() {

{
// We cannot start sendAppendentries before first log index.
// Any missing indices before _commitIndex were compacted.
// DO NOT EDIT without understanding the consequences for sendAppendEntries!
CONDITION_LOCKER(guard, _waitForCV);
MUTEX_LOCKER(tiLocker, _tiLock);
for (auto& i : _confirmed) {
if (i.first != id()) {
i.second = _commitIndex;
}
}
}

// Wake up run
wakeupMainLoop();

Expand Down
4 changes: 2 additions & 2 deletions arangod/Agency/Agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ class Agent : public arangodb::Thread,

/// Rules for the locks: This covers the following locks:
/// _ioLock (here)
/// _logLock (in State)
/// _tiLock (here)
/// _logLock (in State) _waiForCV (here)
/// _tiLock (here) _tiLock (here)
/// One may never acquire a log in this list whilst holding another one
/// that appears further down on this list. This is to prevent deadlock.
/// For _logLock: This is local to State and we make sure that the few
Expand Down
57 changes: 45 additions & 12 deletions arangod/Agency/State.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ bool State::loadRemaining() {

TRI_ASSERT(_log.empty()); // was cleared in loadCompacted
std::string clientId;
// We know that _cur has been set in loadCompacted to the index of the
// snapshot that was loaded or to 0 if there is no snapshot.
index_t lastIndex = _cur;
for (auto const& i : VPackArrayIterator(result)) {

buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
Expand All @@ -876,19 +879,49 @@ bool State::loadRemaining() {
clientId = req.hasKey("clientId") ?
req.get("clientId").copyString() : std::string();

try {
_log.push_back(
log_t(
basics::StringUtils::uint64(
ii.get(StaticStrings::KeyString).copyString()),
ii.get("term").getNumber<uint64_t>(), tmp, clientId));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to convert " +
ii.get(StaticStrings::KeyString).copyString() +
" to integer."
<< e.what();
// Dummy fill missing entries (Not good at all.)
index_t index(basics::StringUtils::uint64(
ii.get(StaticStrings::KeyString).copyString()));
term_t term(ii.get("term").getNumber<uint64_t>());

// Ignore log entries, which are older than lastIndex:
if (index >= lastIndex) {

// Empty patches :
if (index > lastIndex + 1) {
std::shared_ptr<Buffer<uint8_t>> buf =
std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize());
for (index_t i = lastIndex+1; i < index; ++i) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Missing index " << i << " in RAFT log.";
_log.push_back(log_t(i, term, buf, std::string()));
lastIndex = i;
}
// After this loop, index will be lastIndex + 1
}

if (index == lastIndex + 1 ||
(index == lastIndex && _log.empty())) {
// Real entries
try {
_log.push_back(
log_t(
basics::StringUtils::uint64(
ii.get(StaticStrings::KeyString).copyString()),
ii.get("term").getNumber<uint64_t>(), tmp, clientId));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to convert " +
ii.get(StaticStrings::KeyString).copyString() +
" to integer."
<< e.what();
}

lastIndex = index;
}
}

}
}
if (_log.empty()) {
Expand Down
0