diff --git a/CHANGELOG b/CHANGELOG index b938ce22c63a..fd21cba429f0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ devel ----- +* Fix a possible crash when requesting the async registry: + Does now not request the thread name of a potentially destroyed thread. + * Add detection of two threads working on an ExecutionBlock concurrently. * Add detection of two threads waiting for a PrefetchTask concurrently. diff --git a/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/asyncregistry/gdb_data.py b/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/asyncregistry/gdb_data.py index 0162a3de8e69..46c6332e53af 100644 --- a/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/asyncregistry/gdb_data.py +++ b/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/asyncregistry/gdb_data.py @@ -26,11 +26,13 @@ class Thread: # TODO is there a way to get the thread name? @classmethod - def from_gdb(cls, value: gdb.Value): + def from_gdb(cls, value: gdb.Value | None): + if not value: + return None return cls(value['posix_id'], value['kernel_id']) def __str__(self): - return "thread " + str(self.lwpid) + return f"LWPID {self.lwpid} (pthread {self.posix_id})" @dataclass class SourceLocation: @@ -79,7 +81,7 @@ def __str__(self): @dataclass class Promise: id: PromiseId - thread: Thread + thread: Optional[Thread] source_location: SourceLocation requester: Requester state: State @@ -88,7 +90,7 @@ class Promise: def from_gdb(cls, ptr: gdb.Value, value: gdb.Value): return cls( PromiseId(ptr), - Thread.from_gdb(value["thread"]), + Thread.from_gdb(GdbOptional.from_gdb(value["running_thread"])._value), SourceLocation.from_gdb(value["source_location"]), Requester.from_gdb(value["requester"]["_M_i"]), State.from_gdb(value["state"]) @@ -98,8 +100,22 @@ def is_valid(self): return not self.state.is_deleted() def __str__(self): - return str(self.source_location) + ", " + str(self.thread) + ", " + str(self.state) + thread_str = f" on {self.thread}" if self.thread else "" + return str(self.source_location) + ", " + str(self.state) + thread_str + +@dataclass +class GdbOptional: + _value: Optional[gdb.Value] + @classmethod + def from_gdb(cls, value: gdb.Value): + payload = value["_M_i"]["_M_payload"] + engaged = payload["_M_engaged"] + if not engaged: + return cls(None) + internal_value = payload["_M_payload"]["_M_value"] + return cls(internal_value) + @dataclass class GdbAtomicList: _head_ptr: gdb.Value diff --git a/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/pretty-printer.py b/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/pretty-printer.py index 0166253b0b24..1e71c97b66d5 100755 --- a/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/pretty-printer.py +++ b/arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/pretty-printer.py @@ -15,14 +15,14 @@ from asyncregistry.stacktrace import Stacktrace class Thread(object): - def __init__(self, name: str, id: int): - self.name = name + def __init__(self, id: int, posix_id: int): self.id = id + self.posix_id = posix_id @classmethod def from_json(cls, blob: dict): - return cls(blob["name"], blob["LWPID"]) + return cls(blob["LWPID"], blob["posix_id"]) def __str__(self): - return self.name + "(" + str(self.id) + ")" + return f"LWPID {self.id} (pthread {self.posix_id})" class SourceLocation(object): def __init__(self, file_name: str, line: int, function_name: str): @@ -58,18 +58,19 @@ def __str__(self): return "" class Data(object): - def __init__(self, owning_thread: Thread, source_location: SourceLocation, id: int, state: str, requester: Requester): - self.owning_thread = owning_thread + def __init__(self, running_thread: Optional[Thread], source_location: SourceLocation, id: int, state: str, requester: Requester): + self.running_thread = running_thread self.source_location = source_location self.id = id self.waiter = requester self.state = state @classmethod def from_json(cls, blob: dict): - return cls(Thread.from_json(blob["owning_thread"]), SourceLocation.from_json(blob["source_location"]), blob["id"], blob["state"], Requester.from_json(blob["requester"])) + return cls(Thread.from_json(blob["running_thread"]) if "running_thread" in blob else None, SourceLocation.from_json(blob["source_location"]), blob["id"], blob["state"], Requester.from_json(blob["requester"])) def __str__(self): waiter_str = str(self.waiter) if self.waiter != None else "" - return str(self.source_location) + ", " + str(self.owning_thread) + ", " + self.state + waiter_str + thread_str = f" on {self.running_thread}" if self.running_thread else "" + return str(self.source_location) + ", " + self.state + thread_str + waiter_str class Promise(object): def __init__(self, hierarchy: int, data: Data): diff --git a/lib/Async/Registry/promise.cpp b/lib/Async/Registry/promise.cpp index 501bfe5deadf..ada0a3b3d364 100644 --- a/lib/Async/Registry/promise.cpp +++ b/lib/Async/Registry/promise.cpp @@ -29,10 +29,12 @@ using namespace arangodb::async_registry; Promise::Promise(Requester requester, std::source_location entry_point) - : thread{basics::ThreadId::current()}, + : owning_thread{basics::ThreadId::current()}, + requester{requester}, + state{State::Running}, + running_thread{basics::ThreadId::current()}, source_location{entry_point.file_name(), entry_point.function_name(), - entry_point.line()}, - requester{requester} {} + entry_point.line()} {} auto arangodb::async_registry::get_current_coroutine() noexcept -> Requester* { struct Guard { @@ -73,6 +75,13 @@ auto AddToAsyncRegistry::update_source_location(std::source_location loc) } auto AddToAsyncRegistry::update_state(State state) -> std::optional { if (node_in_registry != nullptr) { + if (state == State::Running) { + node_in_registry->data.running_thread.store(basics::ThreadId::current(), + std::memory_order_release); + } else { + node_in_registry->data.running_thread.store(std::nullopt, + std::memory_order_release); + } return node_in_registry->data.state.exchange(state); } else { return std::nullopt; diff --git a/lib/Async/Registry/promise.h b/lib/Async/Registry/promise.h index ddec2fff4761..f077799dad49 100644 --- a/lib/Async/Registry/promise.h +++ b/lib/Async/Registry/promise.h @@ -102,19 +102,18 @@ auto inspect(Inspector& f, Requester& x) { struct PromiseSnapshot { void* id; - basics::ThreadId thread; - basics::SourceLocationSnapshot source_location; Requester requester; State state; + std::optional thread; + basics::SourceLocationSnapshot source_location; bool operator==(PromiseSnapshot const&) const = default; }; template auto inspect(Inspector& f, PromiseSnapshot& x) { - return f.object(x).fields(f.field("owning_thread", x.thread), - f.field("source_location", x.source_location), - f.field("id", fmt::format("{}", x.id)), - f.field("requester", x.requester), - f.field("state", x.state)); + return f.object(x).fields( + f.field("id", fmt::format("{}", x.id)), f.field("requester", x.requester), + f.field("state", x.state), f.field("running_thread", x.thread), + f.field("source_location", x.source_location)); } /** @@ -127,20 +126,22 @@ struct Promise { auto id() -> void* { return this; } auto snapshot() -> Snapshot { - return PromiseSnapshot{.id = id(), - .thread = thread, - .source_location = source_location.snapshot(), - .requester = requester.load(), - .state = state.load()}; + return PromiseSnapshot{ + .id = id(), + .requester = requester.load(), + .state = state.load(), + .thread = running_thread.load(std::memory_order_acquire), + .source_location = source_location.snapshot()}; } auto set_to_deleted() -> void { state.store(State::Deleted, std::memory_order_relaxed); } - basics::ThreadId thread; - basics::VariableSourceLocation source_location; + basics::ThreadId owning_thread; std::atomic requester; std::atomic state = State::Running; + std::atomic> running_thread; + basics::VariableSourceLocation source_location; }; /** diff --git a/lib/Containers/Concurrent/thread.h b/lib/Containers/Concurrent/thread.h index 45a7ad290444..6652ebb7dae3 100644 --- a/lib/Containers/Concurrent/thread.h +++ b/lib/Containers/Concurrent/thread.h @@ -37,7 +37,7 @@ struct ThreadId { template auto inspect(Inspector& f, ThreadId& x) { return f.object(x).fields(f.field("LWPID", x.kernel_id), - f.field("name", x.name())); + f.field("posix_id", x.posix_id)); } } // namespace arangodb::basics diff --git a/tests/Async/Registry/RegistryTest.cpp b/tests/Async/Registry/RegistryTest.cpp index b9a7258d34df..b04224e7b6be 100644 --- a/tests/Async/Registry/RegistryTest.cpp +++ b/tests/Async/Registry/RegistryTest.cpp @@ -22,8 +22,10 @@ //////////////////////////////////////////////////////////////////////////////// #include "Async/Registry/promise.h" #include "Async/Registry/registry_variable.h" +#include "thread.h" #include +#include #include #include @@ -48,10 +50,10 @@ struct MyPromise : public AddToAsyncRegistry { thread{basics::ThreadId::current()} {} auto snapshot(State state = State::Running) -> PromiseSnapshot { return PromiseSnapshot{.id = id(), - .thread = thread, - .source_location = source_location, .requester = {thread}, - .state = state}; + .state = state, + .thread = thread, + .source_location = source_location}; } }; @@ -132,3 +134,53 @@ TEST_F( get_thread_registry().garbage_collect(); EXPECT_EQ(promises_in_registry(), (std::vector{})); } + +TEST_F(AsyncRegistryTest, sets_running_thread_to_current_thread_when_running) { + auto promise = MyPromise{}; + auto all_promises = promises_in_registry(); + EXPECT_EQ(all_promises.size(), 1); + EXPECT_EQ(all_promises[0].state, State::Running); + EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current()); + + promise.update_state(State::Suspended); + all_promises = promises_in_registry(); + EXPECT_EQ(all_promises[0].state, State::Suspended); + EXPECT_EQ(all_promises[0].thread, std::nullopt); + + promise.update_state(State::Running); + all_promises = promises_in_registry(); + EXPECT_EQ(all_promises[0].state, State::Running); + EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current()); + + promise.update_state(State::Resolved); + all_promises = promises_in_registry(); + EXPECT_EQ(all_promises[0].state, State::Resolved); + EXPECT_EQ(all_promises[0].thread, std::nullopt); + + promise.update_state(State::Running); + all_promises = promises_in_registry(); + EXPECT_EQ(all_promises[0].state, State::Running); + EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current()); + + promise.update_state(State::Deleted); + all_promises = promises_in_registry(); + EXPECT_EQ(all_promises[0].state, State::Deleted); + EXPECT_EQ(all_promises[0].thread, std::nullopt); + + promise.update_state(State::Running); + all_promises = promises_in_registry(); + EXPECT_EQ(all_promises[0].state, State::Running); + EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current()); +} + +TEST_F(AsyncRegistryTest, inpection_works_on_after_thread_was_deleted) { + PromiseSnapshot promise_snapshot; + std::ignore = std::jthread([&promise_snapshot]() { + auto promise = MyPromise{}; + promise_snapshot = promise.snapshot(); + }); + + // we just make sure that we can still inspect the promise (and it does not + // crash the system), although the thread the promise was created on is gone + EXPECT_NE(fmt::format("{}", inspection::json(promise_snapshot)), ""); +}