8000 Make async registry data structure generic by jvolmer · Pull Request #21699 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Make async registry data structure generic #21699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 28, 2025
Prev Previous commit
Next Next commit
Make metrics type dynamic
  • Loading branch information
jvolmer committed Apr 14, 2025
commit e5f47f1ae6f6de4091e5da990f330e92e8480ef1
18 changes: 15 additions & 3 deletions lib/Containers/Concurrent/ListOfThreadOwnedLists.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#pragma once

#include "Containers/Concurrent/snapshot.h"
#include "Containers/Concurrent/metrics.h"

#include <memory>
#include <vector>
Expand All @@ -37,17 +38,24 @@ template<typename List, typename Item, typename F>
concept IteratorOverSnapshots =
IteratorOverNodes<List, F> && std::invocable<F, typename Item::Snapshot>;

template<typename List, HasSnapshot Item>
template<UpdatesMetrics List, HasSnapshot Item>
struct ListOfLists {
std::shared_ptr<Metrics> metrics;

private:
std::vector<std::weak_ptr<List>> _lists;
std::mutex _mutex;
// std::shared_ptr<const Metrics> _metrics; // TODO

public:
// ListOfLists();
auto add(std::shared_ptr<List> list) -> void {
auto guard = std::lock_guard(_mutex);
// make sure that list uses our metrics
list->set_metrics(metrics);
if (metrics) {
metrics->increment_total_lists();
metrics->increment_existing_lists();
}
// make sure that expired nodes are deleted
std::erase_if(_lists, [&](auto const& list) { return list.expired(); });
_lists.emplace_back(list);
Expand All @@ -67,7 +75,11 @@ struct ListOfLists {
}
}
}
// auto set_metrics(std::shared_ptr<const Metrics> metrics) -> void; // TODO

auto set_metrics(std::shared_ptr<Metrics> new_metrics) -> void {
auto guard = std::lock_guard(_mutex);
metrics = new_metrics;
}
// void run_external_cleanup() noexcept;
};

Expand Down
67 changes: 43 additions & 24 deletions lib/Containers/Concurrent/ThreadOwnedList.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "Containers/Concurrent/thread.h"
#include "Containers/Concurrent/snapshot.h"
#include "Containers/Concurrent/metrics.h"
#include "Inspection/Format.h"
#include "fmt/core.h"

Expand All @@ -32,15 +33,6 @@

namespace arangodb::containers {

template<typename M>
concept DefinesMetrics = requires(M m) {
m.increment_total_nodes();
m.increment_registered_nodes();
m.decrement_registered_nodes();
m.increment_ready_for_deletion_nodes();
m.decrement_ready_for_deletion_nodes();
};

/**
This list is owned by one thread: nodes can only be added on this thread but
other threads can read the list and mark nodes for deletion.
Expand All @@ -50,9 +42,9 @@ concept DefinesMetrics = requires(M m) {
shared_ptr to this list. Garbage collection can run either on the owning
thread or on another thread (there are two distinct functions for gc).
*/
template<HasSnapshot T, DefinesMetrics M> // T requires a Snapshot type
template<HasSnapshot T>
struct ThreadOwnedList
: public std::enable_shared_from_this<ThreadOwnedList<T, M>> {
: public std::enable_shared_from_this<ThreadOwnedList<T>> {
basics::ThreadId const thread;

struct Node {
Expand All @@ -61,7 +53,7 @@ struct ThreadOwnedList
std::atomic<Node*> previous =
nullptr; // needs to be atomic for gc from different thread
Node* next_to_free = nullptr;
std::shared_ptr<ThreadOwnedList<T, M>>
std::shared_ptr<ThreadOwnedList<T>>
list; // to be able to mark for deletion
};

Expand All @@ -70,17 +62,25 @@ struct ThreadOwnedList
std::atomic<Node*> _free_head = nullptr;
std::mutex _mutex; // gc and reading cannot happen at same time (perhaps I
// can get rid of this with epoch based reclamation)
M metrics;
std::shared_ptr<Metrics> _metrics;

public:
static auto make() noexcept -> std::shared_ptr<ThreadOwnedList> {
static auto make(
std::shared_ptr<Metrics> metrics = std::make_shared<Metrics>()) noexcept
-> std::shared_ptr<ThreadOwnedList> {
struct MakeShared : ThreadOwnedList {
MakeShared() : ThreadOwnedList() {}
MakeShared(std::shared_ptr<Metrics> shared_metrics)
: ThreadOwnedList(shared_metrics) {}
};
return std::make_shared<MakeShared>();
return std::make_shared<MakeShared>(metrics);
}

~ThreadOwnedList() noexcept { cleanup(); }
~ThreadOwnedList() noexcept {
if (_metrics) {
_metrics->decrement_existing_lists();
}
cleanup();
}

template<typename F>
requires std::invocable<F, typename T::Snapshot>
Expand Down Expand Up @@ -110,8 +110,10 @@ struct ThreadOwnedList
}
// (1) - this store synchronizes with load in (2)
_head.store(node, std::memory_order_release);
metrics.increment_registered_nodes();
metrics.increment_total_nodes();
if (_metrics) {
_metrics->increment_registered_nodes();
_metrics->increment_total_nodes();
}
return node;
}

Expand All @@ -136,8 +138,10 @@ struct ThreadOwnedList
// DO NOT access promise after this line. The owner thread might already
// be running a cleanup and promise might be deleted.

metrics.decrement_registered_nodes();
metrics.increment_ready_for_deletion_nodes();
if (_metrics) {
_metrics->decrement_registered_nodes();
_metrics->increment_ready_for_deletion_nodes();
}

// self destroyed here. registry might be destroyed here as well.
}
Expand Down Expand Up @@ -172,7 +176,9 @@ struct ThreadOwnedList
current = next;
next = next->next_to_free;
if (current->previous != nullptr) { // TODO memory order
metrics.decrement_ready_for_deletion_nodes();
if (_metrics) {
_metrics->decrement_ready_for_deletion_nodes();
}
remove(current);
delete current;
} else {
Expand All @@ -198,8 +204,19 @@ struct ThreadOwnedList
}
}

auto set_metrics(std::shared_ptr<Metrics> metrics) -> void {
_metrics = metrics;
}

private:
ThreadOwnedList() noexcept : thread{basics::ThreadId::current()} {}
ThreadOwnedList(std::shared_ptr<Metrics> metrics) noexcept
: thread{basics::ThreadId::current()}, _metrics{metrics} {
// is now done in ListOfLists
// if (_metrics) {
// _metrics->increment_total_lists();
// _metrics->increment_existing_lists();
// }
}

auto cleanup() noexcept -> void {
// (5) - this exchange synchronizes with compare_exchange_weak in (4)
Expand All @@ -208,7 +225,9 @@ struct ThreadOwnedList
while (next != nullptr) {
current = next;
next = next->next_to_free;
metrics.decrement_ready_for_deletion_nodes();
if (_metrics) {
_metrics->decrement_ready_for_deletion_nodes();
}
remove(current);
delete current;
}
Expand Down
46 changes: 46 additions & 0 deletions lib/Containers/Concurrent/metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2024 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Business Source License 1.1 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// https://github.com/arangodb/arangodb/blob/devel/LICENSE
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Julia Volmer
////////////////////////////////////////////////////////////////////////////////
#pragma once

#include <memory>

namespace arangodb::containers {

struct Metrics {
virtual ~Metrics() = default;
virtual auto increment_total_nodes() -> void {}
virtual auto increment_registered_nodes() -> void {}
virtual auto decrement_registered_nodes() -> void {}
virtual auto increment_ready_for_deletion_nodes() -> void {}
virtual auto decrement_ready_for_deletion_nodes() -> void {}
virtual auto increment_total_lists() -> void {}
virtual auto increment_existing_lists() -> void {}
virtual auto decrement_existing_lists() -> void {}
};

template<typename T>
concept UpdatesMetrics = requires(T t, std::shared_ptr<Metrics> m) {
t.set_metrics(m);
};

} // namespace arangodb::containers
45 changes: 45 additions & 0 deletions tests/Containers/Concurrent/ListOfListsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,37 @@ struct MyData {
auto snapshot() const -> Snapshot { return Snapshot{.number = number}; }
};

struct MyMetrics : Metrics {
size_t lists = 0;
auto increment_existing_lists() -> void override { lists++; }
auto decrement_existing_lists() -> void override { lists--; }
};

struct MyNodeList {
std::vector<MyData> data;
std::shared_ptr<Metrics> metrics;

MyNodeList(std::vector<MyData> data) : data{std::move(data)} {
if (metrics) {
metrics->increment_existing_lists();
}
}
~MyNodeList() {
if (metrics) {
metrics->decrement_existing_lists();
}
}
template<typename F>
requires std::invocable<F, MyData::Snapshot>
auto for_node(F&& function) -> void {
for (auto const& item : data) {
function(item.snapshot());
}
}

auto set_metrics(std::shared_ptr<Metrics> new_metrics) -> void {
metrics = new_metrics;
}
};

using MyList = ListOfLists<MyNodeList, MyData>;
Expand Down Expand Up @@ -90,3 +111,27 @@ TEST(ListOfListsTest, iterates_over_list_items) {
EXPECT_EQ(nodes_in_list(list),
(std::vector<MyData::Snapshot>{{1}, {2}, {3}, {4}, {5}, {6}}));
}

TEST(ListOfListsTest, uses_list_of_lists_metrics_for_all_lists) {
MyList list;
auto inner_list = std::make_shared<MyNodeList>(std::vector<MyData>{1, 3, 4});
list.add(inner_list);

EXPECT_EQ(dynamic_cast<Metrics*>(list.metrics.get()),
nullptr); // uses default empty metrics

auto newMetrics = std::make_shared<MyMetrics>();
list.set_metrics(newMetrics);
EXPECT_NE(dynamic_cast<MyMetrics*>(list.metrics.get()), nullptr);

auto first_inner_list =
std::make_shared<MyNodeList>(std::vector<MyData>{1, 2, 3});
auto second_inner_list =
std::make_shared<MyNodeList>(std::vector<MyData>{4, 5, 6});
list.add(first_inner_list);
list.add(second_inner_list);

EXPECT_NE(dynamic_cast<MyMetrics*>(list.metrics.get()), nullptr);
EXPECT_EQ(newMetrics->lists, 2);
EXPECT_EQ(dynamic_cast<MyMetrics*>(list.metrics.get())->lists, 2);
}
9 changes: 1 addition & 8 deletions tests/Containers/Concurrent/ThreadOwnedListTest.cpp
7031
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,8 @@ struct NodeData : InstanceCounterValue {
};
auto snapshot() -> Snapshot { return Snapshot{.number = number}; }
};
struct Metrics {
auto increment_total_nodes() -> void{};
auto increment_registered_nodes() -> void{};
auto decrement_registered_nodes() -> void{};
auto increment_ready_for_deletion_nodes() -> void{};
auto decrement_ready_for_deletion_nodes() -> void{};
};

using MyList = ThreadOwnedList<NodeData, Metrics>;
using MyList = ThreadOwnedList<NodeData>;

auto nodes_in_registry(std::shared_ptr<MyList> registry)
-> std::vector<NodeData::Snapshot> {
Expand Down
0