8000 Separate promise in list item and actual promise · arangodb/arangodb@5178390 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5178390

Browse files
committed
Separate promise in list item and actual promise
TODO: - Tests are not yet fixed - ASAN failure [ RUN ] IResearchAnalyzerFeatureTest.test_emplace_valid AddressSanitizer:DEADLYSIGNAL ================================================================= ==247960==ERROR: AddressSanitizer: SEGV on unknown address (pc 0x618333055636 bp 0x7ffee439d260 sp 0x7ffee439d250 T0) ==247960==The signal is caused by a READ memory access. ==247960==Hint: this fault was caused by a dereference of a high value address (see register values below). Disassemble the provided pc to learn which register was used. #0 0x618333055636 in std::__shared_ptr<arangodb::async_registry::ThreadRegistry, (__gnu_cxx::_Lock_policy)2>::get() const /usr/bin/../lib/gcc/x86_64-linux-gnu/14/../../../../include/c++/14/bits/shared_ptr_base.h:1667 #1 0x6183330555f9 in std::__shared_ptr_access<arangodb::async_registry::ThreadRegistry, (__gnu_cxx::_Lock_policy)2, false, false>::_M_get() const /usr/bin/../lib/gcc/x86_64-linux-gnu/14/../../../../include/c++/14/bits/shared_ptr_base.h:1364 #2 0x618333053636 in std::__shared_ptr_access<arangodb::async_registry::ThreadRegistry, (__gnu_cxx::_Lock_policy)2, false, false>::operator->() const /usr/bin/../lib/gcc/x86_64-linux-gnu/14/../../../../include/c++/14/bits/shared_ptr_base.h:1358 #3 0x6183483a2cd4 in arangodb::async_registry::PromiseInList::mark_for_deletion() /home/jvolmer/code/arangodb/lib/Async/Registry/promise.cpp:17 #4 0x61832bae6ddd in arangodb::futures::detail::SharedState<arangodb::Result>::detachOne() /home/jvolmer/code/arangodb/lib/Futures/include/Futures/SharedState.h:276 #5 0x61832bae66b6 in arangodb::futures::detail::SharedState<arangodb::Result>::detachFuture() /home/jvolmer/code/arangodb/lib/Futures/include/Futures/SharedState.h:226 #6 0x61832bae6519 in arangodb::futures::Future<arangodb::Result>::detach() /home/jvolmer/code/arangodb/lib/Futures/include/Futures/Future.h:531 #7 0x61832ba8db76 in arangodb::futures::Future<arangodb::Result>::~Future() /home/jvolmer/code/arangodb/lib/Futures/include/Futures/Future.h:218 #8 0x6183337bbb44 in arangodb::futures::FutureAwaitable<arangodb::Result>::~FutureAwaitable() /home/jvolmer/code/arangodb/lib/Futures/include/Futures/coro-helper.h:63 #9 0x618339fdbef9 in arangodb::transaction::Methods::commitInternal(arangodb::transaction::MethodsApi) /home/jvolmer/code/arangodb/arangod/Transaction/Methods.cpp:3787 #10 0x618339fdc8f4 in arangodb::transaction::Methods::commitAsync() /home/jvolmer/code/arangodb/arangod/Transaction/Methods.cpp:1925 #11 0x61833e23903d in arangodb::aql::Query::cleanupTrxAndEngines() /home/jvolmer/code/arangodb/arangod/Aql/Query.cpp:1972 #12 0x61833e20838f in arangodb::aql::Query::cleanupPlanAndEngine(bool) /home/jvolmer/code/arangodb/arangod/Aql/Query.cpp:1642 #13 0x61833e21ccc8 in arangodb::aql::Query::finalize(arangodb::velocypack::Builder&) /home/jvolmer/code/arangodb/arangod/Aql/Query.cpp:985 #14 0x61833e218ddc in arangodb::aql::Query::execute(arangodb::aql::QueryResult&) /home/jvolmer/code/arangodb/arangod/Aql/Query.cpp:704 #15 0x61833e21dfc0 in arangodb::aql::Query::executeSync() /home/jvolmer/code/arangodb/arangod/Aql/Query.cpp:756 #16 0x61833c6843b2 in (anonymous namespace)::visitAnalyzers(TRI_vocbase_t&, std::function<arangodb::Result (arangodb::velocypack::Slice)> const&, arangodb::transaction::OperationOrigin) /home/jvolmer/code/arangodb/arangod/IResearch/IResearchAnalyzerFeature.cpp:631 #17 0x61833c65a2a8 in arangodb::iresearch::IResearchAnalyzerFeature::loadAnalyzers(arangodb::transaction::OperationOrigin, std::basic_string_view<char, std::char_traits<char>>) /home/jvolmer/code/arangodb/arangod/IResearch/IResearchAnalyzerFeature.cpp:2282 #18 0x61833c64f58c in arangodb::iresearch::IResearchAnalyzerFeature::emplace(std::pair<std::shared_ptr<arangodb::iresearch::AnalyzerPool>, bool>&, std::basic_string_view<char, std::char_traits<char>>, std::basic_string_view<char, std::char_traits<char>>, arangodb::velocypack::Slice, arangodb::transaction::OperationOrigin, arangodb::iresearch::Features) /home/jvolmer/code/arangodb/arangod/IResearch/IResearchAnalyzerFeature.cpp:1389 #19 0x61832b9065b2 in IResearchAnalyzerFeatureTest_test_emplace_valid_Test::TestBody() /home/jvolmer/code/arangodb/tests/IResearch/IResearchAnalyzerFeatureTest.cpp:601 #20 0x61833bcd323f in void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:2599 #21 0x61833bc7f59c in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:2635 #22 0x61833bc1e3fe in testing::Test::Run() /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:2674 #23 0x61833bc20a1c in testing::TestInfo::Run() /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:2853 #24 0x61833bc2282f in testing::TestSuite::Run() /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:3012 #25 0x61833bc566db in testing::internal::UnitTestImpl::RunAllTests() /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:5870 #26 0x61833bcd46cf in bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:2599 #27 0x61833bc868ac in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:2635 #28 0x61833bc553c0 in testing::UnitTest::Run() /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/src/gtest.cc:5444 #29 0x6183338f239b in RUN_ALL_TESTS() /home/jvolmer/code/arangodb/3rdParty/gtest/googletest/include/gtest/gtest.h:2293 #30 0x6183338f18b7 in main::$_0::operator()(int, char**) const /home/jvolmer/code/arangodb/tests/main.cpp:150 #31 0x6183338f1674 in TestThread<main::$_0>::run() /home/jvolmer/code/arangodb/tests/main.cpp:61 #32 0x6183338f1050 in TestThread<main::$_0>::TestThread(arangodb::application_features::ApplicationServerT<arangodb::ArangodFeatures>&, main::$_0&&, int, char**) /home/jvolmer/code/arangodb/tests/main.cpp:48 #33 0x6183338f0656 in main /home/jvolmer/code/arangodb/tests/main.cpp:151 #34 0x789583e2a1c9 in __libc_start_call_main csu/../sysdeps/nptl/libc_start_call_main.h:58:16 #35 0x789583e2a28a in __libc_start_main csu/../csu/libc-start.c:360:3 #36 0x61832b54b024 in _start (/home/jvolmer/code/arangodb/build-presets/my-alubsan/bin/arangodbtests+0x3b9a8024) (BuildId: 4c994b7192b507c6ed2fbee420b30cfaa0527976) AddressSanitizer can not provide additional info. SUMMARY: AddressSanitizer: SEGV /usr/bin/../lib/gcc/x86_64-linux-gnu/14/../../../../include/c++/14/bits/shared_ptr_base.h:1667 in std::__shared_ptr<arangodb::async_registry::ThreadRegistry, (__gnu_cxx::_Lock_policy)2>::get() const ==247960==ABORTING
1 parent 5fa7555 commit 5178390

File tree

10 files changed

+280
-269
lines changed

10 files changed

+280
-269
lines changed

lib/Async/Registry/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
add_library(arango_async_registry STATIC
2+
promise.cpp
23
registry.cpp
34
registry_variable.cpp
45
thread_registry.cpp)

lib/Async/Registry/promise.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#include "promise.h"
2+
3+
#include "Async/Registry/thread_registry.h"
4+
5+
using namespace arangodb::async_registry;
6+
7+
PromiseInList::PromiseInList(PromiseInList* next,
8+
std::shared_ptr<ThreadRegistry> registry,
9+
std::source_location entry_point)
10+
: registry{registry},
11+
thread{
12+
Thread{.name = registry->thread_name, .id = registry->owning_thread}},
13+
next{next},
14+
entry_point{std::move(entry_point)} {}
15+
16+
auto PromiseInList::mark_for_deletion() noexcept -> void {
17+
registry->mark_for_deletion(this);
18+
}

lib/Async/Registry/promise.h

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,40 +12,44 @@ namespace arangodb::async_registry {
1212

1313
struct ThreadRegistry;
1414

15-
struct Observables {
16-
Observables(std::source_location loc) : _where(std::move(loc)) {}
17-
18-
std::source_location _where;
15+
struct Thread {
16+
std::string name;
17+
std::thread::id id;
1918
};
19+
template<typename Inspector>
20+
auto inspect(Inspector& f, Thread& x) {
21+
return f.object(x).fields(f.field("name", x.name),
22+
f.field("id", fmt::format("{}", x.id)));
23+
}
2024

21-
struct PromiseInList : Observables {
22-
PromiseInList(std::source_location loc) : Observables(std::move(loc)) {}
25+
struct PromiseInList {
26+
PromiseInList(PromiseInList* next, std::shared_ptr<ThreadRegistry> registry,
27+
std::source_location location);
28+
~PromiseInList() = default;
2329

24-
virtual auto destroy() noexcept -> void = 0;
25-
virtual ~PromiseInList() = default;
30+
auto mark_for_deletion() noexcept -> void;
2631

2732
// identifies the promise list it belongs to
28-
std::shared_ptr<ThreadRegistry> registry = nullptr;
29-
30-
std::string thread_name;
31-
std::thread::id thread_id;
33+
std::shared_ptr<ThreadRegistry> registry;
34+
Thread thread;
3235

3336
PromiseInList* next = nullptr;
3437
// only needed to remove an item
3538
PromiseInList* previous = nullptr;
3639
// only needed to garbage collect promises
3740
PromiseInList* next_to_free = nullptr;
38-
};
3941

42+
std::source_location entry_point;
43+
};
4044
template<typename Inspector>
4145
auto inspect(Inspector& f, PromiseInList& x) {
4246
// perhaps just use for saving
4347
return f.object(x).fields(
44-
f.field("source_location",
45-
fmt::format("{}:{} {}", x._where.file_name(), x._where.line(),
46-
x._where.function_name())),
47-
f.field("thread_name", x.thread_name),
48-
f.field("thread_id", fmt::format("{}", x.thread_id)));
48+
f.field("thread", x.thread),
49+
f.field(
50+
"source_location",
51+
fmt::format("{}:{} {}", x.entry_point.file_name(),
52+
x.entry_point.line(), x.entry_point.function_name())));
4953
}
5054

5155
} // namespace arangodb::async_registry

lib/Async/Registry/thread_registry.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#include "thread_registry.h"
2+
#include <source_location>
23

34
#include "Assertions/ProdAssert.h"
45
#include "Async/Registry/Metrics.h"
6+
#include "Async/Registry/promise.h"
57
#include "Basics/Thread.h"
68
#include "Inspection/Format.h"
79
#include "Logger/LogMacros.h"
@@ -34,7 +36,8 @@ ThreadRegistry::~ThreadRegistry() noexcept {
3436
cleanup();
3537
}
3638

37-
auto ThreadRegistry::add(PromiseInList* promise) noexcept -> void {
39+
auto ThreadRegistry::add_promise(std::source_location location) noexcept
40+
-> PromiseInList* {
3841
// promise needs to live on the same thread as this registry
3942
ADB_PROD_ASSERT(std::this_thread::get_id() == owning_thread)
4043
<< "ThreadRegistry::add was called from thread "
@@ -45,10 +48,8 @@ auto ThreadRegistry::add(PromiseInList* promise) noexcept -> void {
4548
metrics->total_functions->count();
4649
}
4750
auto current_head = promise_head.load(std::memory_order_relaxed);
48-
promise->next = current_head;
49-
promise->registry = shared_from_this();
50-
promise->thread_name = thread_name;
51-
promise->thread_id = owning_thread;
51+
auto promise =
52+
new PromiseInList{current_head, shared_from_this(), std::move(location)};
5253
if (current_head != nullptr) {
5354
current_head->previous = promise;
5455
}
@@ -57,6 +58,7 @@ auto ThreadRegistry::add(PromiseInList* promise) noexcept -> void {
5758
if (metrics->active_functions != nullptr) {
5859
metrics->active_functions->fetch_ad 7E71 d(1);
5960
}
61+
return promise;
6062
}
6163

6264
auto ThreadRegistry::mark_for_deletion(PromiseInList* promise) noexcept
@@ -98,7 +100,7 @@ auto ThreadRegistry::cleanup() noexcept -> void {
98100
metrics->ready_for_deletion_functions->fetch_sub(1);
99101
}
100102
remove(current);
101-
current->destroy();
103+
delete current;
102104
}
103105
}
104106

lib/Async/Registry/thread_registry.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <cstdint>
77
#include <memory>
88
#include <mutex>
9+
#include <source_location>
910
#include <thread>
1011
#include "fmt/format.h"
1112
#include "fmt/std.h"
@@ -42,7 +43,7 @@ struct ThreadRegistry : std::enable_shared_from_this<ThreadRegistry> {
4243
Can only be called on the owning thread, crashes
4344
otherwise.
4445
*/
45-
auto add(PromiseInList* promise) noexcept -> void;
46+
auto add_promise(std::source_location location) noexcept -> PromiseInList*;
4647

4748
/**
4849
Executes a function on each promise in the registry that is not deleted yet

lib/Async/include/Async/async.h

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ template<typename T>
2121
struct async;
2222

2323
template<typename T>
24-
struct async_promise_base : async_registry::PromiseInList {
24+
struct async_promise_base {
2525
using promise_type = async_promise<T>;
2626

2727
async_promise_base(std::source_location loc)
28-
: PromiseInList(std::move(loc)) {}
28+
: promise_in_registry{async_registry::get_thread_registry().add_promise(
29+
std::move(loc))} {}
2930

3031
std::suspend_never initial_suspend() noexcept { return {}; }
3132
auto final_suspend() noexcept {
@@ -38,7 +39,8 @@ struct async_promise_base : async_registry::PromiseInList {
3839
if (addr == nullptr) {
3940
return std::noop_coroutine();
4041
} else if (addr == self.address()) {
41-
_promise->registry->mark_for_deletion(_promise);
42+
_promise->promise_in_registry->mark_for_deletion();
43+
self.destroy();
4244
return std::noop_coroutine();
4345
} else {
4446
return std::coroutine_handle<>::from_address(addr);
@@ -51,25 +53,13 @@ struct async_promise_base : async_registry::PromiseInList {
5153
}
5254
void unhandled_exception() { _value.set_exception(std::current_exception()); }
5355
auto get_return_object() {
54-
async_registry::get_thread_registry().add(this);
5556
return async<T>{std::coroutine_handle<promise_type>::from_promise(
5657
*static_cast<promise_type*>(this))};
5758
}
5859

59-
auto destroy() noexcept -> void override {
60-
try {
61-
std::coroutine_handle<promise_type>::from_promise(
62-
*static_cast<promise_type*>(this))
63-
.destroy();
64-
} catch (std::exception const& ex) {
65-
LOG_TOPIC("4b96e", WARN, Logger::CRASH)
66-
<< "caught exception when destorying coroutine promise: "
67-
<< ex.what();
68-
}
69-
}
70-
7160
std::atomic<void*> _continuation = nullptr;
7261
expected<T> _value;
62+
async_registry::PromiseInList* promise_in_registry = nullptr;
7363
};
7464

7565
template<typename T>
@@ -106,7 +96,8 @@ struct async {
10696
auto await_resume() {
10797
auto& promise = _handle.promise();
10898
expected<T> r = std::move(promise._value);
109-
promise.registry->mark_for_deletion(&promise);
99+
promise.promise_in_registry->mark_for_deletion();
100+
_handle.destroy();
110101
return std::move(r).get();
111102
}
112103
explicit awaitable(std::coroutine_handle<promise_type> handle)
@@ -126,7 +117,8 @@ struct async {
126117
auto& promise = _handle.promise();
127118
if (promise._continuation.exchange(
128119
_handle.address(), std::memory_order_release) != nullptr) {
129-
promise.registry->mark_for_deletion(&promise);
120+
promise.promise_in_registry->mark_for_deletion();
121+
_handle.destroy();
130122
}
131123
_handle = nullptr;
132124
}

lib/Futures/include/Futures/SharedState.h

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ namespace detail {
5151
/// | ---> OnlyCallback --- |
5252
/// +-------------------------------------------------------------+
5353
template<typename T>
54-
class SharedState : async_registry::PromiseInList {
54+
class SharedState {
5555
enum class State : uint8_t {
5656
Start = 1 << 0,
5757
OnlyResult = 1 << 1,
@@ -236,19 +236,15 @@ class SharedState : async_registry::PromiseInList {
236236
private:
237237
/// empty shared state
238238
SharedState(std::source_location loc)
239-
: async_registry::PromiseInList(std::move(loc)),
240-
_state(State::Start),
239+
: _state(State::Start),
241240
_attached(2),
242-
_managed_by_registry{true} {
243-
async_registry::get_thread_registry().add(this);
241+
promise_in_registry{
242+
async_registry::get_thread_registry().add_promise(std::move(loc))} {
244243
}
245244

246245
/// use to construct a ready future
247246
explicit SharedState(Try<T>&& t)
248-
: async_registry::PromiseInList(std::source_location::current()),
249-
_result(std::move(t)),
250-
_state(State::OnlyResult),
251-
_attached(1) {
247+
: _result(std::move(t)), _state(State::OnlyResult), _attached(1) {
252248
// is not added to coroutine thread registry because it is immediately
253249
// resolved
254250
}
@@ -257,8 +253,7 @@ class SharedState : async_registry::PromiseInList {
257253
template<typename... Args>
258254
explicit SharedState(std::in_place_t, Args&&... args) noexcept(
259255
std::is_nothrow_constructible<T, Args&&...>::value)
260-
: async_registry::PromiseInList(std::source_location::current()),
261-
_result(std::in_place, std::forward<Args>(args)...),
256+
: _result(std::in_place, std::forward<Args>(args)...),
262257
_state(State::OnlyResult),
263258
_attached(1) {
264259
// is not added to coroutine thread registry because it is immediately
@@ -277,13 +272,10 @@ class SharedState : async_registry::PromiseInList {
277272
TRI_ASSERT(a >= 1);
278273
if (a == 1) {
279274
_callback = nullptr;
280-
if (_managed_by_registry) {
281-
if (registry != nullptr) {
282-
registry->mark_for_deletion(this);
283-
}
284-
} else {
285-
destroy();
275+
if (promise_in_registry != nullptr) {
276+
promise_in_registry->mark_for_deletion();
286277
}
278+
delete this;
287279
}
288280
}
289281

@@ -297,8 +289,6 @@ class SharedState : async_registry::PromiseInList {
297289
_callback(std::move(_result));
298290
}
299291

300-
auto destroy() noexcept -> void override { delete this; }
301-
302292
private:
303293
using Callback = fu2::unique_function<void(Try<T>&&)>;
304294
Callback _callback;
@@ -307,7 +297,7 @@ class SharedState : async_registry::PromiseInList {
307297
};
308298
std::atomic<State> _state;
309299
std::atomic<uint8_t> _attached;
310-
const bool _managed_by_registry = false;
300+
async_registry::PromiseInList* promise_in_registry = nullptr;
311301
};
312302

313303
} // namespace detail

tests/Async/AsyncTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ TEST(AsyncTest, promises_are_registered) {
383383
std::vector<std::string> names;
384384
arangodb::async_registry::registry.for_promise(
385385
[&](arangodb::async_registry::PromiseInList* promise) {
386-
names.push_back(promise->_where.function_name());
386+
names.push_back(promise->entry_point.function_name());
387387
});
388388
EXPECT_EQ(names.size(), 3);
389389
EXPECT_TRUE(names[0].find("foo") != std::string::npos);

tests/Async/Registry/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
target_sources(arangodbtests
22
PRIVATE
3-
RegistryTest.cpp
3+
# RegistryTest.cpp
44
ThreadRegistryTest.cpp)

0 commit comments

Comments
 (0)
0