8000 More visibility to catch a sleeper by neunhoef · Pull Request #21774 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

More visibility to catch a sleeper #21774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 20, 2025
7 changes: 7 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
devel
-----

* Add detection of two threads working on an ExecutionBlock concurrently.

* Add detection of two threads waiting for a PrefetchTask concurrently.

* Add visibility into the state if a thread waits for more than a second
on a PrefetchTask (and does not see InProgress).

* Fix a memory leak in LogContext, which could leak entries which are still, and
only, used by a thread-local LogContext when that thread is destroyed.

Expand Down
7 changes: 7 additions & 0 deletions arangod/Aql/ExecutionBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ class ExecutionBlock {
/// this would harm our implementation.
std::atomic<bool> _isBlockInUse{false};
#endif

/// @brief The following is always 0 or 1, if our assumptions are correct.
/// The `execute` method as well as the destructor increment it at their
/// start and decrement it at their end. If we detect a double use, we
/// log the stack traces.
std::atomic<uint64_t> _numberOfUsers{0};
std::atomic<bool> _logStacktrace{false};
};

} // namespace aql
Expand Down
2 changes: 2 additions & 0 deletions arangod/Aql/ExecutionBlockImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ class ExecutionBlockImpl final : public ExecutionBlock {

ExecutionBlockImpl& _block;
AqlCallStack _stack;
mutable std::atomic<uint64_t> _numberWaiters{0};
mutable std::atomic<bool> _logStacktrace{false};
};

/**
Expand Down
91 changes: 79 additions & 12 deletions arangod/Aql/ExecutionBlockImpl.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,32 @@ ExecutionBlockImpl<Executor>::~ExecutionBlockImpl() {

template<class Executor>
void ExecutionBlockImpl<Executor>::stopAsyncTasks() {
if (_prefetchTask && !_prefetchTask->isConsumed() &&
!_prefetchTask->tryClaim()) {
// some thread is still working on our prefetch task
// -> we need to wait for that task to finish first!
_prefetchTask->waitFor();
if (_prefetchTask) {
// Double use diagnostics:
uint64_t userCount = _numberOfUsers.fetch_add(1, std::memory_order_relaxed);
if (userCount > 0) {
_logStacktrace.store(true, std::memory_order_relaxed);
LOG_TOPIC("52637", ERR, Logger::AQL)
<< "ALERT: Double use of ExecutionBlock detected, "
<< " Blockinfo: " << printBlockInfo()
<< " Query ID: " << getQuery().id() << ", stacktrace:";
CrashHandler::logBacktrace();
}
auto guard = scopeGuard([&]() noexcept {
_numberOfUsers.fetch_sub(1, std::memory_order_relaxed);
if (_logStacktrace.load(std::memory_order_relaxed)) {
LOG_TOPIC("52638", WARN, Logger::AQL)
<< "ALERT: Found _logStacktrace:"
<< " Blockinfo: " << printBlockInfo()
<< " Query ID: " << getQuery().id() << ", stacktrace:";
CrashHandler::logBacktrace();
}
});
if (!_prefetchTask->isConsumed() && !_prefetchTask->tryClaim()) {
// some thread is still working on our prefetch task
// -> we need to wait for that task to finish first!
_prefetchTask->waitFor();
}
}
}

Expand Down Expand Up @@ -496,6 +517,27 @@ void ExecutionBlockImpl<Executor>::collectExecStats(ExecutionStats& stats) {
template<class Executor>
std::tuple<ExecutionState, SkipResult, SharedAqlItemBlockPtr>
ExecutionBlockImpl<Executor>::execute(AqlCallStack const& stack) {
// Double use diagnostics:
uint64_t userCount = _numberOfUsers.fetch_add(1, std::memory_order_relaxed);
if (userCount > 0) {
_logStacktrace.store(true, std::memory_order_relaxed);
LOG_TOPIC("52635", WARN, Logger::AQL)
<< "ALERT: Double use of ExecutionBlock detected, "
<< " Blockinfo: " << printBlockInfo()
<< " Query ID: " << getQuery().id() << ", stacktrace:";
CrashHandler::logBacktrace();
}
auto waechter = scopeGuard([&]() noexcept {
_numberOfUsers.fetch_sub(1, std::memory_order_relaxed);
if (_logStacktrace.load(std::memory_order_relaxed)) {
LOG_TOPIC("52636", WARN, Logger::AQL)
<< "ALERT: Found _logStacktrace:"
<< " Blockinfo: " << printBlockInfo()
<< " Query ID: " << getQuery().id() << ", stacktrace:";
CrashHandler::logBacktrace();
}
});

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool old = false;
TRI_ASSERT(_isBlockInUse.compare_exchange_strong(old, true));
Expand Down Expand Up @@ -2582,16 +2624,41 @@ bool ExecutionBlockImpl<Executor>::PrefetchTask::rearmForNextCall(

template<class Executor>
void ExecutionBlockImpl<Executor>::PrefetchTask::waitFor() const noexcept {
uint64_t count = _numberWaiters.fetch_add(1, std::memory_order_relaxed);
if (count > 0) {
LOG_TOPIC("62515", WARN, Logger::AQL)
<< "ALERT: Detected " << count + 1 << " waiters for a PrefetchTask, "
<< " Blockinfo: " << _block.printBlockInfo()
<< " Query ID: " << _block.getQuery().id() << ", stacktrace:";
CrashHandler::logBacktrace();
_logStacktrace.store(true, std::memory_order_relaxed);
}
std::unique_lock<std::mutex> guard(_lock);
// (1) - this acquire-load synchronizes with the release-store (3)
if (_state.load(std::memory_order_acquire).status == Status::Finished) {
return;
while (_state.load(std::memory_order_acquire).status != Status::Finished) {
std::cv_status s = _bell.wait_for(guard, std::chrono::milliseconds(1000));
if (s == std::cv_status::timeout) {
auto state = _state.load(std::memory_order_relaxed);
// We only log this if the status is not "InProgress", since
// this is the only one we expect when a timeout occurs!
if (state.status != Status::InProgress) {
LOG_TOPIC("62514", INFO, Logger::AQL)
<< "Have waited for a second on an async prefetch task, "
"state is "
<< (int)state.status << " abandoned: " << state.abandoned
<< " Blockinfo: " << _block.printBlockInfo()
<< " Query ID: " << _block.getQuery().id();
}
}
}
count = _numberWaiters.fetch_sub(1, std::memory_order_relaxed);
if (_logStacktrace.load(std::memory_order_relaxed) == true) {
LOG_TOPIC("62516", WARN, Logger::AQL) << "ALERT: Found logStacktrace:";
CrashHandler::logBacktrace();
if (count == 0) {
_logStacktrace.store(false, std::memory_order_relaxed);
}
}

_bell.wait(guard, [this]() {
// (2) - this acquire-load synchronizes with the release-store (3)
return _state.load(std::memory_order_acquire).status == Status::Finished;
});
}

template<class Executor>
Expand Down
0