8000 [3.11] Detach AQL threads in UPSERT when waiting on replication for too long by goedderz · Pull Request #21094 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[3.11] Detach AQL threads in UPSERT when waiting on replication for too long #21094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasio 8000 nally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
v3.11.10 (XXXX-XX-XX)
---------------------

* Sort out a thread blockage on AQL upsert waiting for replication.

* BTS-1909, MDS-1232: Fixed a bug where COLLECT ... AGGREGATE x = UNIQUE(y)
could miss some results when multiple shards were aggregated in a cluster.

Expand Down
55 changes: 55 additions & 0 deletions arangod/Aql/ModificationExecutorHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@
#include "Aql/ModificationExecutorInfos.h"
#include "Basics/Result.h"
#include "Basics/StaticStrings.h"
#include "Basics/cpu-relax.h"
#include "Logger/LogLevel.h"
#include "Logger/LogMacros.h"
#include "Random/RandomGenerator.h"
#include "Scheduler/SchedulerFeature.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/OperationResult.h"

#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>

#include <chrono>
#include <string>

using namespace arangodb;
Expand Down Expand Up @@ -232,3 +238,52 @@ AqlValue ModificationExecutorHelpers::getDocumentOrNull(
}
return AqlValue(AqlValueHintNull());
}

// If we simply wait, it can happen that we get into a blockage in which
// all threads wait in the same place here and none can make progress,
// since the scheduler is full. This means we must detach the thread
// after some time. To avoid that all are detaching at the same time,
// we choose a random timeout for the detaching. But first we spin a
// while to avoid delays:
void ModificationExecutorHelpers::waitAndDetach(
futures::Future<OperationResult>& future) {
if (!future.isReady()) {
{
auto const spinTime = std::chrono::milliseconds(10);
auto const start = std::chrono::steady_clock::now();
while (!future.isReady() &&
std::chrono::steady_clock::now() - start < spinTime) {
basics::cpu_relax();
}
}
if (!future.isReady()) {
auto const detachTime = std::chrono::milliseconds(
1000 + RandomGenerator::interval(uint32_t(100)) * 100);
auto start = std::chrono::steady_clock::now();
while (!future.isReady() &&
std::chrono::steady_clock::now() - start < detachTime) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (!future.isReady()) {
LOG_TOPIC("afe32", INFO, Logger::THREADS)
<< "Did not get replication response within " << detachTime.count()
<< " milliseconds, detaching scheduler thread.";
uint64_t currentNumberDetached = 0;
uint64_t maximumNumberDetached = 0;
auto res = SchedulerFeature::SCHEDULER->detachThread(
&currentNumberDetached, &maximumNumberDetached);
if (res.is(TRI_ERROR_TOO_MANY_DETACHED_THREADS)) {
LOG_TOPIC("afe33", WARN, Logger::THREADS)
<< "Could not detach scheduler thread (currently detached "
"threads: "
<< currentNumberDetached
<< ", maximal number of detached threads: "
<< maximumNumberDetached
<< "), will continue to wait for replication in scheduler "
"thread, this can potentially lead to blockages!";
}
future.wait();
}
}
}
}
6 changes: 6 additions & 0 deletions arangod/Aql/ModificationExecutorHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include <string>

namespace arangodb {
namespace futures {
template<typename T>
class Future;
}
namespace aql {

struct ModificationExecutorInfos;
Expand Down Expand Up @@ -95,6 +99,8 @@ OperationOptions convertOptions(ModificationOptions const& in,

AqlValue getDocumentOrNull(velocypack::Slice elm, std::string const& key);

void waitAndDetach(futures::Future<OperationResult>& future);

} // namespace ModificationExecutorHelpers
} // namespace aql
} // namespace arangodb
45 changes: 1 addition & 44 deletions arangod/Aql/SimpleModifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,50 +195,7 @@ ExecutionState SimpleModifier<ModifierCompletion, Enable>::transact(
// operations blocking as in previous versions of ArangoDB.
// TODO: fix this and make it truly non-blocking (requires to
// fix some lifecycle issues for AQL queries first).

// If we simply wait, it can happen that we get into a blockage in which
// all threads wait in the same place here and none can make progress,
// since the scheduler is full. This means we must detach the thread
// after some time. To avoid that all are detaching at the same time,
// we choose a random timeout for the detaching. But first we spin a
// while to avoid delays:
if (!result.isReady()) {
auto const spinTime = std::chrono::milliseconds(10);
auto start = std::chrono::steady_clock::now();
while (!result.isReady() &&
std::chrono::steady_clock::now() - start < spinTime) {
basics::cpu_relax();
}
if (!result.isReady()) {
auto const detachTime = std::chrono::milliseconds(
1000 + RandomGenerator::interval(uint32_t(100)) * 100);
auto start = std::chrono::steady_clock::now();
while (!result.isReady() &&
std::chrono::steady_clock::now() - start < detachTime) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (!result.isReady()) {
LOG_TOPIC("afe32", INFO, Logger::THREADS)
<< "Did not get replication response within " << detachTime.count()
<< " microseconds, detaching scheduler thread.";
uint64_t currentNumberDetached = 0;
uint64_t maximumNumberDetached = 0;
auto res = SchedulerFeature::SCHEDULER->detachThread(
&currentNumberDetached, &maximumNumberDetached);
if (res.is(TRI_ERROR_TOO_MANY_DETACHED_THREADS)) {
LOG_TOPIC("afe33", WARN, Logger::THREADS)
<< "Could not detach scheduler thread (currently detached "
"threads: "
<< currentNumberDetached
<< ", maximal number of detached threads: "
<< maximumNumberDetached
<< "), will continue to wait for replication in scheduler "
"thread, this can potentially lead to blockages!";
}
result.wait();
}
}
}
waitAndDetach(result);

// The following will always be true with this code, but we leave the
// asynchronous code below for the future.
Expand Down
18 changes: 12 additions & 6 deletions arangod/Aql/UpsertModifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,25 @@ ExecutionState UpsertModifier::transact(transaction::Methods& trx) {

auto toInsert = _insertAccumulator.closeAndGetContents();
if (toInsert.isArray() && toInsert.length() > 0) {
_insertResults =
trx.insert(_infos._aqlCollection->name(), toInsert, _infos._options);
auto future = trx.insertAsync(_infos._aqlCollection->name(), toInsert,
_infos._options);
waitAndDetach(future);
_insertResults = std::move(future).get();
throwOperationResultException(_infos, _insertResults);
}

auto toUpdate = _updateAccumulator.closeAndGetContents();
if (toUpdate.isArray() && toUpdate.length() > 0) {
if (_infos._isReplace) {
_updateResults =
trx.replace(_infos._aqlCollection->name(), toUpdate, _infos._options);
auto future = trx.replaceAsync(_infos._aqlCollection->name(), toUpdate,
_infos._options);
waitAndDetach(future);
_updateResults = std::move(future).get();
} else {
_updateResults =
trx.update(_infos._aqlCollection->name(), toUpdate, _infos._options);
auto future = trx.updateAsync(_infos._aqlCollection->name(), toUpdate,
_infos._options);
waitAndDetach(future);
_updateResults = std::move(future).get();
}
throwOperationResultException(_infos, _updateResults);
}
Expand Down
Loading
0