8000 Make async registry data structure generic by jvolmer · Pull Request #21699 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Make async registry data structure generic #21699

New i 8000 ssue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 28, 2025
Merged
Prev Previous commit
Next Next commit
Use new, more general types for async registry
  • Loading branch information
jvolmer committed Apr 14, 2025
commit b33dae7c5732eb8db0eff65e14ee6bb275a402d1
33 changes: 30 additions & 3 deletions arangod/AsyncRegistryServer/Feature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,40 @@ DECLARE_GAUGE(
arangodb_async_existing_thread_registries, std::uint64_t,
"Number of threads that started currently existing asyncronous operations");

auto Feature::RegistryMetrics::increment_total_nodes() -> void {
promises_total->count();
}
auto Feature::RegistryMetrics::increment_registered_nodes() -> void {
existing_promises->fetch_add(1);
}
auto Feature::RegistryMetrics::decrement_registered_nodes() -> void {
existing_promises->fetch_sub(1);
}
auto Feature::RegistryMetrics::increment_ready_for_deletion_nodes() -> void {
existing_promises->fetch_add(1);
}
auto Feature::RegistryMetrics::decrement_ready_for_deletion_nodes() -> void {
existing_promises->fetch_sub(1);
}
auto Feature::RegistryMetrics::increment_total_lists() -> void {
thread_registries_total->count();
}
auto Feature::RegistryMetrics::increment_existing_lists() -> void {
existing_thread_registries->fetch_add(1);
}
auto Feature::RegistryMetrics::decrement_existing_lists() -> void {
existing_thread_registries->fetch_sub(1);
}

Feature::Feature(Server& server)
: ArangodFeature{server, *this}, _async_mutex{_schedulerWrapper} {
startsAfter<metrics::MetricsFeature>();
startsAfter<SchedulerFeature>();
}

auto Feature::create_metrics(arangodb::metrics::MetricsFeature& metrics_feature)
-> std::shared_ptr<const Metrics> {
return std::make_shared<Metrics>(
-> std::shared_ptr<RegistryMetrics> {
return std::make_shared<RegistryMetrics>(
metrics_feature.addShared(arangodb_async_promises_total{}),
metrics_feature.addShared(arangodb_async_existing_promises{}),
metrics_feature.addShared(arangodb_async_ready_for_deletion_promises{}),
Expand Down Expand Up @@ -114,4 +139,6 @@ void Feature::collectOptions(std::shared_ptr<options::ProgramOptions> options) {
R"(Each thread that is involved in the async-registry needs to garbage collect its finished async function calls regularly. This option controls how often this is done in seconds. This can possibly be performance relevant because each involved thread aquires a lock.)");
}

Feature::~Feature() { registry.set_metrics(std::make_shared<Metrics>()); }
Feature::~Feature() {
registry.set_metrics(std::make_shared<arangodb::containers::Metrics>());
}
40 changes: 37 additions & 3 deletions arangod/AsyncRegistryServer/Feature.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#pragma once

#include "Async/Registry/registry_variable.h"
#include "Async/Registry/Metrics.h"
#include "Metrics/Fwd.h"
#include "Basics/FutureSharedLock.h"
#include "RestServer/arangod.h"
#include "Scheduler/SchedulerFeature.h"
Expand All @@ -32,8 +32,42 @@ namespace arangodb::async_registry {

class Feature final : public ArangodFeature {
private:
struct RegistryMetrics : arangodb::containers::Metrics {
RegistryMetrics(
std::shared_ptr<metrics::Counter> promises_total,
std::shared_ptr<metrics::Gauge<std::uint64_t>> existing_promises,
std::shared_ptr<metrics::Gauge<std::uint64_t>>
ready_for_deletion_promises,
std::shared_ptr<metrics::Counter> thread_registries_total,
std::shared_ptr<metrics::Gauge<std::uint64_t>>
existing_thread_registries)
: promises_total{promises_total},
existing_promises{existing_promises},
ready_for_deletion_promises{ready_for_deletion_promises},
thread_registries_total{thread_registries_total},
existing_thread_registries{existing_thread_registries} {}
~RegistryMetrics() = default;
auto increment_total_nodes() -> void override;
auto increment_registered_nodes() -> void override;
auto decrement_registered_nodes() -> void override;
auto increment_ready_for_deletion_nodes() -> void override;
auto decrement_ready_for_deletion_nodes() -> void override;
auto increment_total_lists() -> void override;
auto increment_existing_lists() -> void override;
auto decrement_existing_lists() -> void override;

private:
std::shared_ptr<metrics::Counter> promises_total = nullptr;
std::shared_ptr<metrics::Gauge<std::uint64_t>> existing_promises = nullptr;
std::shared_ptr<metrics::Gauge<std::uint64_t>> ready_for_deletion_promises =
nullptr;
std::shared_ptr<metrics::Counter> thread_registries_total = nullptr;
std::shared_ptr<metrics::Gauge<std::uint64_t>> existing_thread_registries =
nullptr;
};

static auto create_metrics(arangodb::metrics::MetricsFeature& metrics_feature)
-> std::shared_ptr<const Metrics>;
-> std::shared_ptr<RegistryMetrics>;
struct SchedulerWrapper {
using WorkHandle = Scheduler::WorkHandle;
template<typename F>
Expand Down Expand Up @@ -68,7 +102,7 @@ class Feature final : public ArangodFeature {
};
Options _options;

std::shared_ptr<const Metrics> metrics;
std::shared_ptr<RegistryMetrics> metrics;

struct PromiseCleanupThread;
std::shared_ptr<PromiseCleanupThread> _cleanupThread;
Expand Down
4 changes: 2 additions & 2 deletions arangod/AsyncRegistryServer/RestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ namespace {
auto all_undeleted_promises() -> ForestWithRoots<PromiseSnapshot> {
Forest<PromiseSnapshot> forest;
std::vector<Id> roots;
registry.for_promise([&](PromiseSnapshot promise) {
registry.for_node([&](PromiseSnapshot promise) {
if (promise.state != State::Deleted) {
std::visit(overloaded{
[&](PromiseId async_waiter) {
forest.insert(promise.id, async_waiter, promise);
},
[&](ThreadId sync_waiter_thread) {
[&](basics::ThreadId sync_waiter_thread) {
forest.insert(promise.id, nullptr, promise);
roots.emplace_back(promise.id);
},
Expand Down
5 changes: 2 additions & 3 deletions lib/Async/Registry/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
add_library(arango_async_registry STATIC
promise.cpp
registry.cpp
registry_variable.cpp
thread_registry.cpp)
registry_variable.cpp)
target_include_directories(arango_async_registry PRIVATE
${PROJECT_SOURCE_DIR}/arangod)
target_link_libraries(arango_async_registry
PRIVATE
arango_thread_owning_list
arango_assertions
PUBLIC
arango_metrics_base)
55 changes: 19 additions & 36 deletions lib/Async/Registry/promise.cpp
8000
Original file line number Diff line number Diff line change
Expand Up @@ -21,67 +21,50 @@
/// @author Julia Volmer
////////////////////////////////////////////////////////////////////////////////
#include "promise.h"
#include <optional>

#include "Containers/Concurrent/ThreadOwnedList.h"
#include "Async/Registry/registry_variable.h"
#include "Async/Registry/thread_registry.h"
#include "Basics/Thread.h"
#include "Inspection/Format.h"
#include "Containers/Concurrent/thread.h"

using namespace arangodb::async_registry;

auto ThreadId::current() noexcept -> ThreadId {
return ThreadId{.posix_id = arangodb::Thread::currentThreadId(),
.kernel_id = arangodb::Thread::currentKernelThreadId()};
}
auto ThreadId::name() -> std::string {
return std::string{ThreadNameFetcher{posix_id}.get()};
}

auto Requester::current_thread() -> Requester { return {ThreadId::current()}; }

Promise::Promise(Promise* next, std::shared_ptr<ThreadRegistry> registry,
Requester requester, std::source_location entry_point)
: thread{registry->thread},
Promise::Promise(Requester requester, std::source_location entry_point)
: thread{basics::ThreadId::current()},
source_location{entry_point.file_name(), entry_point.function_name(),
entry_point.line()},
requester{requester},
registry{std::move(registry)},
next{next} {}

auto Promise::mark_for_deletion() noexcept -> void {
registry->mark_for_deletion(this);
}
requester{requester} {}

AddToAsyncRegistry::AddToAsyncRegistry(std::source_location loc)
: promise_in_registry{get_thread_registry().add_promise(
*get_current_coroutine(), std::move(loc))} {}
: node_in_registry{get_thread_registry().add([&]() {
return Promise{*get_current_coroutine(), std::move(loc)};
})} {}

AddToAsyncRegistry::~AddToAsyncRegistry() {
if (promise_in_registry != nullptr) {
promise_in_registry->mark_for_deletion();
if (node_in_registry != nullptr) {
node_in_registry->list->mark_for_deletion(node_in_registry.get());
}
}
auto AddToAsyncRegistry::update_requester(Requester new_requester) -> void {
if (promise_in_registry != nullptr) {
promise_in_registry->requester.store(new_requester);
if (node_in_registry != nullptr) {
node_in_registry->data.requester.store(new_requester);
}
}
auto AddToAsyncRegistry::id() -> void* {
if (promise_in_registry != nullptr) {
return promise_in_registry->id();
if (node_in_registry != nullptr) {
return node_in_registry->data.id();
} else {
return nullptr;
}
}
auto AddToAsyncRegistry::update_source_location(std::source_location loc)
-> void {
if (promise_in_registry != nullptr) {
promise_in_registry->source_location.line.store(loc.line());
if (node_in_registry != nullptr) {
node_in_registry->data.source_location.line.store(loc.line());
}
}
auto AddToAsyncRegistry::update_state(State state) -> std::optional<State> {
if (promise_in_registry != nullptr) {
return promise_in_registry->state.exchange(state);
if (node_in_registry != nullptr) {
return node_in_registry->data.state.exchange(state);
} else {
return std::nullopt;
}
Expand Down
69 changes: 2 103CE 4 additions & 45 deletions lib/Async/Registry/promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
////////////////////////////////////////////////////////////////////////////////
#pragma once

#include <atomic>
#include <iostream>
#include <memory>
#include <optional>
#include <source_location>
#include <string>
#include <thread>
#include "Basics/threads-posix.h"
#include "Inspection/Format.h"
#include "Inspection/Types.h"
#include "Containers/Concurrent/ThreadOwnedList.h"
#include "Containers/Concurrent/thread.h"
#include "fmt/format.h"
#include "fmt/std.h"

Expand All @@ -43,28 +43,19 @@ struct overloaded : Ts... {
template<class... Ts>
overloaded(Ts...) -> overloaded<Ts...>;
} // namespace

namespace arangodb::async_registry {

struct ThreadRegistry;

struct ThreadId {
static auto current() noexcept -> ThreadId;
auto name() -> std::string;
TRI_tid_t posix_id;
pid_t kernel_id;
bool operator==(ThreadId const&) const = default;
};
template<typename Inspector>
auto inspect(Inspector& f, ThreadId& x) {
return f.object(x).fields(f.field("LWPID", x.kernel_id),
f.field("name", x.name()));
}

struct SourceLocationSnapshot {
std::string_view file_name;
std::string_view function_name;
std::uint_least32_t line;
bool operator==(SourceLocationSnapshot const&) const = default;
static auto from(std::source_location loc) -> SourceLocationSnapshot {
return SourceLocationSnapshot{.file_name=loc.file_name(), .function_name=loc.function_name(), .line=loc.line()};
}
};
template<typename Inspector>
auto inspect(Inspector& f, SourceLocationSnapshot& x) {
Expand Down Expand Up @@ -101,7 +92,7 @@ auto inspect(Inspector& f, PromiseIdWrapper& x) {
return f.object(x).fields(f.field("promise", fmt::format("{}", x.item)));
}
struct ThreadIdWrapper {
ThreadId item;
basics::ThreadId item;
};
template<typename Inspector>
auto inspect(Inspector& f, ThreadIdWrapper& x) {
Expand All @@ -114,8 +105,10 @@ auto inspect(Inspector& f, RequesterWrapper& x) {
inspection::inlineType<PromiseIdWrapper>(),
inspection::inlineType<ThreadIdWrapper>());
}
struct Requester : std::variant<ThreadId, PromiseId> {
static auto current_thread() -> Requester;
struct Requester : std::variant<basics::ThreadId, PromiseId> {
static auto current_thread() -> Requester {
return {basics::ThreadId::current()};
}
};
template<typename Inspector>
auto inspect(Inspector& f, Requester& x) {
Expand All @@ -125,7 +118,7 @@ auto inspect(Inspector& f, Requester& x) {
[&](PromiseId waiter) {
return RequesterWrapper{PromiseIdWrapper{waiter}};
},
[&](ThreadId waiter) {
[&](basics::ThreadId waiter) {
return RequesterWrapper{ThreadIdWrapper{waiter}};
},
},
Expand All @@ -136,7 +129,7 @@ auto inspect(Inspector& f, Requester& x) {

struct PromiseSnapshot {
void* id;
ThreadId thread;
basics::ThreadId thread;
SourceLocationSnapshot source_location;
Requester requester;
State state;
Expand All @@ -150,39 +143,29 @@ auto inspect(Inspector& f, PromiseSnapshot& x) {
f.field("requester", x.requester),
f.field("state", x.state));
}

struct Promise {
Promise(Promise* next, std::shared_ptr<ThreadRegistry> registry,
Requester requester, std::source_location location);
using Snapshot = PromiseSnapshot;
Promise(Requester requester, std::source_location location);
~Promise() = default;

auto mark_for_deletion() noexcept -> void;
auto id() -> void* { return this; }
auto snapshot() -> PromiseSnapshot {
auto snapshot() -> Snapshot {
return PromiseSnapshot{.id = id(),
.thread = thread,
.source_location = source_location.snapshot(),
.requester = requester.load(),
.state = state.load()};
}
auto set_to_deleted() -> void {
state.store(State::Deleted, std::memory_order_relaxed);
}

ThreadId thread;
basics::ThreadId thread;

SourceLocation source_location;
std::atomic<Requester> requester;
std::atomic<State> state = State::Running;
// identifies the promise list it belongs to
std::shared_ptr<ThreadRegistry> registry;
Promise* next = nullptr;
// this needs to be an atomic because it is accessed during garbage
// collection which can happen in a different thread. This thread will
// load the value. Since there is only one transition, i.e. from nullptr
// to non-null ptr, any missed update will result in a pessimistic
// execution and not an error. More precise, the item might not be
// deleted, although it is not in head position and can be deleted. It
// will be deleted next round.
std::atomic<Promise*> previous = nullptr;
// only needed to garbage collect promises
Promise* next_to_free = nullptr;
};

struct AddToAsyncRegistry {
Expand All @@ -203,13 +186,9 @@ struct AddToAsyncRegistry {
struct noop {
void operator()(void*) {}
};

public:
std::unique_ptr<Promise, noop> promise_in_registry = nullptr;
std::unique_ptr<containers::ThreadOwnedList<Promise>::Node, noop>
node_in_registry = nullptr;
};

} // namespace arangodb::async_registry

template<>
struct fmt::formatter<arangodb::async_registry::ThreadId>
: arangodb::inspection::inspection_formatter {};
Loading
0