8000 Make async registry data structure generic by jvolmer · Pull Request #21699 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Make async registry data structure generic #21699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 28, 2025
Next Next commit
Add ThreadOwnedList
  • Loading branch information
jvolmer committed Apr 14, 2025
commit 26345e2ab9cecb96f0f47aedf054c469d2188be0
1 change: 1 addition & 0 deletions lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ endif()

add_subdirectory(Actor)
add_subdirectory(Async)
add_subdirectory(Containers)
add_subdirectory(Futures)
add_subdirectory(Geo)
if (USE_V8)
Expand Down
1 change: 1 addition & 0 deletions lib/Containers/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(Concurrent)
7 changes: 7 additions & 0 deletions lib/Containers/Concurrent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
add_library(arango_thread_owning_list STATIC
thread.cpp)
target_include_directories(arango_thread_owning_list PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(arango_thread_owning_list
PRIVATE
arango_basic_utils
arango_inspection)
179 changes: 179 additions & 0 deletions lib/Containers/Concurrent/ThreadOwnedList.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2024 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Business Source License 1.1 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// https://github.com/arangodb/arangodb/blob/devel/LICENSE
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Julia Volmer
////////////////////////////////////////////////////////////////////////////////
#include "ThreadOwnedList.h"

#include "fmt/core.h"

using namespace arangodb::containers;

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::make() noexcept
-> std::shared_ptr<ThreadOwnedList<T, M>> {
struct MakeShared : ThreadOwnedList {
MakeShared() : ThreadOwnedList() {}
};
return std::make_shared<MakeShared>();
}

template<HasSnapshot T, DefinesMetrics M>
ThreadOwnedList<T, M>::ThreadOwnedList() noexcept
: thread{basics::ThreadId::current()} {}

template<HasSnapshot T, DefinesMetrics M>
ThreadOwnedList<T, M>::~ThreadOwnedList() noexcept {
cleanup();
}

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::add(T data) noexcept -> Node<T>* {
auto current_thread = basics::ThreadId::current();
ADB_PROD_ASSERT(current_thread == thread) << fmt::format(
"ThreadOwnedList::add_promise was called from thread {} but needs to be "
"called from ThreadOwnedList's owning thread {}. {}",
current_thread, thread, this);
auto current_head = _head.load(std::memory_order_relaxed);
auto node = new Node{.data = std::move(data),
.next = current_head,
.list = this->shared_from_this()};
if (current_head != nullptr) {
current_head->previous.store(node); // TODO memory order
}
// (1) - this store synchronizes with load in (2)
_head.store(node, std::memory_order_release);
metrics.increment_registered_nodes();
metrics.increment_total_nodes();
return node;
}

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::mark_for_deletion(Node<T>* node) noexcept -> void {
// makes sure that promise is really in this list
ADB_PROD_ASSERT(node->registry.get() == this);

// TODO needs to be done in Promise::mark_for_deletion instead
// promise->state.store(State::Deleted);

// keep a local copy of the shared pointer. This promise might be the
// last of the registry.
auto self = std::move(node->registry);

auto current_head = _free_head.load(std::memory_order_relaxed);
do {
node->next_to_free = current_head;
// (4) - this compare_exchange_weak synchronizes with exchange in (5)
} while (not _free_head.compare_exchange_weak(current_head, node,
std::memory_order_release,
std::memory_order_acquire));
// DO NOT access promise after this line. The owner thread might already
// be running a cleanup and promise might be deleted.

metrics.decrement_registered_nodes();
metrics.increment_ready_for_deletion_nodes();

// self destroyed here. registry might be destroyed here as well.
}

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::garbage_collect() noexcept -> void {
auto current_thread = basics::ThreadId::current();
ADB_PROD_ASSERT(current_thread == thread) << fmt::format(
"ThreadOwnedList::garbage_collect was called from thread {} but needs to "
"be called from ThreadOwnedList's owning thread {}. {}",
current_thread, thread, this);
auto guard = std::lock_guard(_mutex);
cleanup();
}

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::garbage_collect_external() noexcept -> void {
// acquire the lock. This prevents the owning thread and the observer
// from accessing promises. Note that the owing thread only adds new
// promises to the head of the list.
auto guard = std::lock_guard(_mutex);
// we can make the following observation. Once a promise is enqueued in the
// list, it previous and next pointer is never updated, except for the current
// head element. Also, Promises are only removed, after the mutex has been
// acquired. This implies that we can clean up all promises, that are not
// in head position right 8000 now.
Node<T>* maybe_head_ptr = nullptr;
Node<T>*current,
*next = _free_head.exchange(
nullptr, std::memory_order_acquire); // TODO check memory order
while (next != nullptr) {
current = next;
next = next->next_to_free;
if (current->previous != nullptr) { // TODO memory order
metrics.decrement_ready_for_deletion_nodes();
remove(current);
delete current;
} else {
// if this is the head of the promise list, we cannot delete it because
// additional promises could have been added in the meantime
// (if these new promises would have been marked in the meantime, they
// would be in the new free list due to the exchange earlier)
ADB_PROD_ASSERT(maybe_head_ptr == nullptr);
maybe_head_ptr = current;
}
}
// After the clean up we have to add the potential head back into the free
// list.
if (maybe_head_ptr) {
auto current_head = _free_head.load(std::memory_order_relaxed);
do {
maybe_head_ptr->next_to_free = current_head;
// (4) - this compare_exchange_weak synchronizes with exchange in (5)
// TODO check memory order
} while (not _free_head.compare_exchange_weak(current_head, maybe_head_ptr,
std::memory_order_release,
std::memory_order_acquire));
}
}

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::cleanup() noexcept -> void {
// (5) - this exchange synchronizes with compare_exchange_weak in (4)
Node<T>*current,
*next = _free_head.exchange(nullptr, std::memory_order_acquire);
while (next != nullptr) {
current = next;
next = next->next_to_free;
metrics.decrement_ready_for_deletion_nodes();
remove(current);
delete current;
}
}

template<HasSnapshot T, DefinesMetrics M>
auto ThreadOwnedList<T, M>::remove(Node<T>* node) -> void {
auto* next = node->next;
auto* previous = node->previous.load(); // TODO memory order
if (previous == nullptr) { // promise is current head
// (3) - this store synchronizes with the load in (2)
_head.store(next, std::memory_order_release);
} else {
previous->next = next;
}
if (next != nullptr) {
next->previous = previous; // TODO memory order
}
}
Loading
0