8000 Show running thread in async registry by jvolmer · Pull Request #21776 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Show running thread in async registry #21776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
devel
-----

* Fix a possible crash when requesting the async registry:
Does now not request the thread name of a potentially destroyed thread.

* Fix arangorestore failing to restore the dump containing a
vector index.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ class Thread:
# TODO is there a way to get the thread name?

@classmethod
def from_gdb(cls, value: gdb.Value):
def from_gdb(cls, value: gdb.Value | None):
if not value:
return None
return cls(value['posix_id'], value['kernel_id'])

def __str__(self):
return "thread " + str(self.lwpid)
return f"LWPID {self.lwpid} (pthread {self.posix_id})"

@dataclass
class SourceLocation:
Expand Down Expand Up @@ -79,7 +81,7 @@ def __str__(self):
@dataclass
class Promise:
id: PromiseId
thread: Thread
thread: Optional[Thread]
source_location: SourceLocation
requester: Requester
state: State
Expand All @@ -88,7 +90,7 @@ class Promise:
def from_gdb(cls, ptr: gdb.Value, value: gdb.Value):
return cls(
PromiseId(ptr),
Thread.from_gdb(value["thread"]),
Thread.from_gdb(GdbOptional.from_gdb(value["running_thread"])._value),
SourceLocation.from_gdb(value["source_location"]),
Requester.from_gdb(value["requester"]["_M_i"]),
State.from_gdb(value["state"])
Expand All @@ -98,8 +100,22 @@ def is_valid(self):
return not self.state.is_deleted()

def __str__(self):
return str(self.source_location) + ", " + str(self.thread) + ", " + str(self.state)
thread_str = f" on {self.thread}" if self.thread else ""
return str(self.source_location) + ", " + str(self.state) + thread_str

@dataclass
class GdbOptional:
_value: Optional[gdb.Value]

@classmethod
def from_gdb(cls, value: gdb.Value):
payload = value["_M_i"]["_M_payload"]
engaged = payload["_M_engaged"]
if not engaged:
return cls(None)
internal_value = payload["_M_payload"]["_M_value"]
return cls(internal_value)

@dataclass
class GdbAtomicList:
_head_ptr: gdb.Value
Expand Down
17 changes: 9 additions & 8 deletions arangod/AsyncRegistryServer/PrettyPrinter/src/pretty-printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from asyncregistry.stacktrace import Stacktrace

class Thread(object):
def __init__(self, name: str, id: int):
self.name = name
def __init__(self, id: int, posix_id: int):
self.id = id
self.posix_id = posix_id
@classmethod
def from_json(cls, blob: dict):
return cls(blob["name"], blob["LWPID"])
return cls(blob["LWPID"], blob["posix_id"])
def __str__(self):
return self.name + "(" + str(self.id) + ")"
return f"LWPID {self.id} (pthread {self.posix_id})"

class SourceLocation(object):
def __init__(self, file_name: str, line: int, function_name: str):
Expand Down Expand Up @@ -58,18 +58,19 @@ def __str__(self):
return ""

class Data(object):
def __init__(self, owning_thread: Thread, source_location: SourceLocation, id: int, state: str, requester: Requester):
self.owning_thread = owning_thread
def __init__(self, running_thread: Optional[Thread], source_location: SourceLocation, id: int, state: str, requester: Requester):
self.running_thread = running_thread
self.source_location = source_location
self.id = id
self.waiter = requester
self.state = state
@classmethod
def from_json(cls, blob: dict):
return cls(Thread.from_json(blob["owning_thread"]), SourceLocation.from_json(blob["source_location"]), blob["id"], blob["state"], Requester.from_json(blob["requester"]))
return cls(Thread.from_json(blob["running_thread"]) if "running_thread" in blob else None, SourceLocation.from_json(blob["source_location"]), blob["id"], blob["state"], Requester.from_json(blob["requester"]))
def __str__(self):
waiter_str = str(self.waiter) if self.waiter != None else ""
return str(self.source_location) + ", " + str(self.owning_thread) + ", " + self.state + waiter_str
thread_str = f" on {self.running_thread}" if self.running_thread else ""
return str(self.source_location) + ", " + self.state + thread_str + waiter_str

class Promise(object):
def __init__(self, hierarchy: int, data: Data):
Expand Down
15 changes: 12 additions & 3 deletions lib/Async/Registry/promise.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
using namespace arangodb::async_registry;

Promise::Promise(Requester requester, std::source_location entry_point)
: thread{basics::ThreadId::current()},
: owning_thread{basics::ThreadId::current()},
requester{requester},
state{State::Running},
running_thread{basics::ThreadId::current()},
source_location{entry_point.file_name(), entry_point.function_name(),
entry_point.line()},
requester{requester} {}
entry_point.line()} {}

auto arangodb::async_registry::get_current_coroutine() noexcept -> Requester* {
struct Guard {
Expand Down Expand Up @@ -73,6 +75,13 @@ auto AddToAsyncRegistry::update_source_location(std::source_location loc)
}
auto AddToAsyncRegistry::update_state(State state) -> std::optional<State> {
if (node_in_registry != nullptr) {
if (state == State::Running) {
node_in_registry->data.running_thread.store(basics::ThreadId::current(),
std::memory_order_release);
} else {
node_in_registry->data.running_thread.store(std::nullopt,
std::memory_order_release);
}
return node_in_registry->data.state.exchange(state);
} else {
return std::nullopt;
Expand Down
29 changes: 15 additions & 14 deletions lib/Async/Registry/promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,18 @@ auto inspect(Inspector& f, Requester& x) {

struct PromiseSnapshot {
void* id;
basics::ThreadId thread;
basics::SourceLocationSnapshot source_location;
Requester requester;
State state;
std::optional<basics::ThreadId> thread;
basics::SourceLocationSnapshot source_location;
bool operator==(PromiseSnapshot const&) const = default;
};
template<typename Inspector>
auto inspect(Inspector& f, PromiseSnapshot& x) {
return f.object(x).fields(f.field("owning_thread", x.thread),
f.field("source_location", x.source_location),
f.field("id", fmt::format("{}", x.id)),
f.field("requester", x.requester),
f.field("state", x.state));
return f.object(x).fields(
f.field("id", fmt::format("{}", x.id)), f.field("requester", x.requester),
f.field("state", x.state), f.field("running_thread", x.thread),
f.field("source_location", x.source_location));
}

/**
Expand All @@ -127,20 +126,22 @@ struct Promise {

auto id() -> void* { return this; }
auto snapshot() -> Snapshot {
return PromiseSnapshot{.id = id(),
.thread = thread,
.source_location = source_location.snapshot(),
.requester = requester.load(),
.state = state.load()};
return PromiseSnapshot{
.id = id(),
.requester = requester.load(),
.state = state.load(),
.thread = running_thread.load(std::memory_order_acquire),
.source_location = source_location.snapshot()};
}
auto set_to_deleted() -> void {
state.store(State::Deleted, std::memory_order_relaxed);
}

basics::ThreadId thread;
basics::VariableSourceLocation source_location;
basics::ThreadId owning_thread;
std::atomic<Requester> requester;
std::atomic<State> state = State::Running;
std::atomic<std::optional<basics::ThreadId>> running_thread;
basics::VariableSourceLocation source_location;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/Containers/Concurrent/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct ThreadId {
template<typename Inspector>
auto inspect(Inspector& f, ThreadId& x) {
return f.object(x).fields(f.field("LWPID", x.kernel_id),
f.field("name", x.name()));
f.field("posix_id", x.posix_id));
}

} // namespace arangodb::basics
Expand Down
46 changes: 43 additions & 3 deletions tests/Async/Registry/RegistryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
////////////////////////////////////////////////////////////////////////////////
#include "Async/Registry/promise.h"
#include "Async/Registry/registry_variable.h"
#include "thread.h"

#include <gtest/gtest.h>
#include <optional>
#include <source_location>
#include <thread>

Expand All @@ -48,10 +50,10 @@ struct MyPromise : public AddToAsyncRegistry {
thread{basics::ThreadId::current()} {}
auto snapshot(State state = State::Running) -> PromiseSnapshot {
return PromiseSnapshot{.id = id(),
.thread = thread,
.source_location = source_location,
.requester = {thread},
.state = state};
.state = state,
.thread = thread,
.source_location = source_location};
}
};

Expand Down Expand Up @@ -132,3 +134,41 @@ TEST_F(
get_thread_registry().garbage_collect();
EXPECT_EQ(promises_in_registry(), (std::vector<PromiseSnapshot>{}));
}

TEST_F(AsyncRegistryTest, sets_running_thread_to_current_thread_when_running) {
auto promise = MyPromise{};
auto all_promises = promises_in_registry();
EXPECT_EQ(all_promises.size(), 1);
EXPECT_EQ(all_promises[0].state, State::Running);
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());

promise.update_state(State::Suspended);
all_promises = promises_in_registry();
EXPECT_EQ(all_promises[0].state, State::Suspended);
EXPECT_EQ(all_promises[0].thread, std::nullopt);

promise.update_state(State::Running);
all_promises = promises_in_registry();
EXPECT_EQ(all_promises[0].state, State::Running);
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());

promise.update_state(State::Resolved);
all_promises = promises_in_registry();
EXPECT_EQ(all_promises[0].state, State::Resolved);
EXPECT_EQ(all_promises[0].thread, std::nullopt);

promise.update_state(State::Running);
all_promises = promises_in_registry();
EXPECT_EQ(all_promises[0].state, State::Running);
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());

promise.update_state(State::Deleted);
all_promises = promises_in_registry();
EXPECT_EQ(all_promises[0].state, State::Deleted);
EXPECT_EQ(all_promises[0].thread, std::nullopt);

promise.update_state(State::Running);
all_promises = promises_in_registry();
EXPECT_EQ(all_promises[0].state, State::Running);
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());
}
0