8000 APM-164: Add basic overload control to arangod. by jsteemann · Pull Request #14796 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

APM-164: Add basic overload control to arangod. #14796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
devel
-----

* Add basic overload control to arangod.
This change adds the `x-arango-queue-time-seconds` header to all responses
sent by arangod. This header contains the most recent request dequeing time
(in seconds) as tracked by the scheduler. This value can be used by client
applications and drivers to detect server overload and react on it.
The new startup option `--http.return-queue-time-header` can be set to
`false` to suppress these headers in responses sent by arangod.

In addition, client applications and drivers can optionally augment their
requests sent to arangod with a header of the same name. If set, the
value of the header should contain the maximum queuing time (in seconds)
that the client is willing to accept. If the header is set in an incoming
request, arangod will compare the current dequeing time from its scheduler
with the maximum queue time value contained in the request. If the current
dequeing time exceeds the value set in the header, arangod will reject the
request and return HTTP 412 (precondition failed) with the new error code
21004 (queue time violated).

There is also a new metric `arangodb_scheduler_queue_time_violations_total`
that is increased whenever a request is dropped because of the requested
queue time not satisfiable.

* Fixed a bug for array indexes on update of documents. See BTS-548.

* Prevent some possible deadlocks under high load regarding transactions and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: arangodb_scheduler_queue_time_violations_total
introducedIn: "3.9.0"
help: |
Number of tasks/requests dropped and not added to internal queue
due to the client-specified queue time requirements not being
satisfiable.
unit: number
type: counter
category: Scheduler
complexity: simple
exposedBy:
- coordinator
- dbserver
- agent
- single
description: |
Number of tasks/requests dropped because the client-specified queue time
requirements, as indicated by client applications in the request header
"x-arango-queue-time-seconds" could not be satisfied by the receiving
server instance. This happens when the actual time need to queue/dequeue
requests on the scheduler queue exceeds the maximum time value that the
client has specified in the request.
Whenever this happens, the client application will get an HTTP 412 error
response back with error code 21004 ("queue time violated").
Although the metric is exposed on all instance types, it will very likely
always be 0 on DB servers, simply because coordinators do not forward the
"x-arango-queue-time-seconds" when they send internal requests to DB
servers.
88 changes: 61 additions & 27 deletions arangod/GeneralServer/CommTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,6 @@ inline bool startsWith(std::string const& path, char const* other) {
path.compare(0, size, other, size) == 0);
}

} // namespace

// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------

CommTask::CommTask(GeneralServer& server,
ConnectionInfo info)
: _server(server),
_connectionInfo(std::move(info)),
_connectionStatistics(ConnectionStatistics::acquire()),
_auth(AuthenticationFeature::instance()) {
TRI_ASSERT(_auth != nullptr);
_connectionStatistics.SET_START();
}

CommTask::~CommTask() {
_connectionStatistics.SET_END();
}

// -----------------------------------------------------------------------------
// --SECTION-- protected methods
// -----------------------------------------------------------------------------

namespace {
TRI_vocbase_t* lookupDatabaseFromRequest(application_features::ApplicationServer& server,
GeneralRequest& req) {
// get database name from request
Expand Down Expand Up @@ -127,8 +102,53 @@ bool resolveRequestContext(application_features::ApplicationServer& server,
// the "true" means the request is the owner of the context
return true;
}

bool queueTimeViolated(GeneralRequest const& req) {
// check if the client sent the "x-arango-queue-time-seconds" header
bool found = false;
std::string const& queueTimeValue = req.header(StaticStrings::XArangoQueueTimeSeconds, found);
if (found) {
// yes, now parse the sent time value. if the value sent by client cannot be
// parsed as a double, then it will be handled as if "0.0" was sent - i.e. no
// queuing time restriction
double requestedQueueTime = StringUtils::doubleDecimal(queueTimeValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the outcome if queueTimeValue is not a valid double? If I understand it will just get ignored, would it make sense signal/inform here (e.g., at least on higher log-levels)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, this is set by ArangoDB and not user specified

if (requestedQueueTime > 0.0) {
// value is > 0.0, so now check the last dequeue time that the scheduler reported
double lastDequeueTime = static_cast<double>(
SchedulerFeature::SCHEDULER->getLastLowPriorityDequeueTime()) / 1000.0;

if (lastDequeueTime > requestedQueueTime) {
// the log topic should actually be REQUESTS here, but the default log level
// for the REQUESTS log topic is FATAL, so if we logged here in INFO level,
// it would effectively be suppressed. thus we are using the Scheduler's
// log topic here, which is somewhat related.
SchedulerFeature::SCHEDULER->trackQueueTimeViolation();
LOG_TOPIC("1bbcc", WARN, Logger::THREADS)
<< "dropping incoming request because the client-specified maximum queue time requirement ("
<< requestedQueueTime << "s) would be violated by current queue time (" << lastDequeueTime << "s)";
return true;
}
}
}
return false;
}

} // namespace

CommTask::CommTask(GeneralServer& server,
ConnectionInfo info)
: _server(server),
_connectionInfo(std::move(info)),
_connectionStatistics(ConnectionStatistics::acquire()),
_auth(AuthenticationFeature::instance()) {
TRI_ASSERT(_auth != nullptr);
_connectionStatistics.SET_START();
}

CommTask::~CommTask() {
_connectionStatistics.SET_END();
}

/// Must be called before calling executeRequest, will send an error
/// response if execution is supposed to be aborted

Expand Down Expand Up @@ -311,6 +331,12 @@ void CommTask::finishExecution(GeneralResponse& res, std::string const& origin)
// use "IfNotSet" to not overwrite an existing response header
res.setHeaderNCIfNotSet(StaticStrings::XContentTypeOptions, StaticStrings::NoSniff);
}

// add "x-arango-queue-time-seconds" header
if (_server.server().getFeature<GeneralServerFeature>().returnQueueTimeHeader()) {
res.setHeaderNC(StaticStrings::XArangoQueueTimeSeconds,
std::to_string(static_cast<double>(SchedulerFeature::SCHEDULER->getLastLowPriorityDequeueTime()) / 1000.0));
}
}

/// Push this request into the execution pipeline
Expand All @@ -336,8 +362,17 @@ void CommTask::executeRequest(std::unique_ptr<GeneralRequest> request,
LOG_TOPIC("2cece", WARN, Logger::REQUESTS)
<< "could not find corresponding request/response";
}

rest::ContentType const respType = request->contentTypeResponse();

// check if "x-arango-queue-time-seconds" header was set, and its value
// is above the current dequeing time
if (::queueTimeViolated(*request)) {
sendErrorResponse(rest::ResponseCode::PRECONDITION_FAILED,
respType, messageId, TRI_ERROR_QUEUE_TIME_REQUIREMENT_VIOLATED);
return;
}

// create a handler, this takes ownership of request and response
auto& server = _server.server();
auto& factory = server.getFeature<GeneralServerFeature>().handlerFactory();
Expand All @@ -351,7 +386,6 @@ void CommTask::executeRequest(std::unique_ptr<GeneralRequest> request,
VPackBuffer<uint8_t>());
return;
}

// forward to correct server if necessary
bool forwarded;
auto res = handler->forwardRequest(forwarded);
Expand Down
21 changes: 14 additions & 7 deletions arangod/GeneralServer/GeneralServerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ GeneralServerFeature::GeneralServerFeature(application_features::ApplicationServ
: ApplicationFeature(server, "GeneralServer"),
_allowMethodOverride(false),
_proxyCheck(true),
_returnQueueTimeHeader(true),
_permanentRootRedirect(true),
_redirectRootTo("/_admin/aardvark/index.html"),
_supportInfoApiPolicy("hardened"),
Expand Down Expand Up @@ -219,6 +220,11 @@ void GeneralServerFeature::collectOptions(std::shared_ptr<ProgramOptions> option
"if true, use a permanent redirect. If false, use a temporary",
new BooleanParameter(&_permanentRootRedirect))
.setIntroducedIn(30712);

options->addOption("--http.return-queue-time-header",
"if true, return the 'x-arango-queue-time-seconds' response header",
new BooleanParameter(&_returnQueueTimeHeader))
.setIntroducedIn(30900);

options->addOption("--frontend.proxy-request-check",
"enable proxy request checking",
Expand Down Expand Up @@ -282,9 +288,8 @@ void GeneralServerFeature::prepare() {
}

void GeneralServerFeature::start() {
_jobManager.reset(new AsyncJobManager);

_handlerFactory.reset(new RestHandlerFactory());
_jobManager = std::make_unique<AsyncJobManager>();
_handlerFactory = std::make_unique<RestHandlerFactory>();

defineHandlers();
buildServers();
Expand Down Expand Up @@ -321,17 +326,19 @@ void GeneralServerFeature::unprepare() {
_jobManager.reset();
}

double GeneralServerFeature::keepAliveTimeout() const {
double GeneralServerFeature::keepAliveTimeout() const noexcept {
return _keepAliveTimeout;
}

bool GeneralServerFeature::proxyCheck() const { return _proxyCheck; }
bool GeneralServerFeature::proxyCheck() const noexcept { return _proxyCheck; }

bool GeneralServerFeature::returnQueueTimeHeader() const noexcept { return _returnQueueTimeHeader; }

std::vector<std::string> GeneralServerFeature::trustedProxies() const {
return _trustedProxies;
}

bool GeneralServerFeature::allowMethodOverride() const {
bool GeneralServerFeature::allowMethodOverride() const noexcept {
return _allowMethodOverride;
}

Expand All @@ -350,7 +357,7 @@ Result GeneralServerFeature::reloadTLS() { // reload TLS data from disk
return res;
}

bool GeneralServerFeature::permanentRootRedirect() const {
bool GeneralServerFeature::permanentRootRedirect() const noexcept {
return _permanentRootRedirect;
}

Expand Down
10 changes: 6 additions & 4 deletions arangod/GeneralServer/GeneralServerFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ class GeneralServerFeature final : public application_features::ApplicationFeatu
void stop() override final;
void unprepare() override final;

double keepAliveTimeout() const;
bool proxyCheck() const;
double keepAliveTimeout() const noexcept;
bool proxyCheck() const noexcept ;
bool returnQueueTimeHeader() const noexcept;
std::vector<std::string> trustedProxies() const;
bool allowMethodOverride() const;
bool allowMethodOverride() const noexcept;
std::vector<std::string> const& accessControlAllowOrigins() const;
Result reloadTLS();
bool permanentRootRedirect() const;
bool permanentRootRedirect() const noexcept;
std::string redirectRootTo() const;
std::string const& supportInfoApiPolicy() const noexcept;

Expand Down Expand Up @@ -87,6 +88,7 @@ class GeneralServerFeature final : public application_features::ApplicationFeatu
double _keepAliveTimeout = 300.0;
bool _allowMethodOverride;
bool _proxyCheck;
bool _returnQueueTimeHeader;
bool _permanentRootRedirect;
std::vector<std::string> _trustedProxies;
std::vector<std::string> _accessControlAllowOrigins;
Expand Down
3 changes: 3 additions & 0 deletions arangod/Scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ class Scheduler {

virtual void toVelocyPack(velocypack::Builder&) const = 0;
virtual QueueStatistics queueStatistics() const = 0;

/// @brief returns the last stored dequeue time [ms]
virtual uint64_t getLastLowPriorityDequeueTime() const noexcept = 0;

/// @brief approximate fill grade of the scheduler's queue (in %)
virtual double approximateQueueFillGrade() const = 0;
Expand Down
35 changes: 26 additions & 9 deletions arangod/Scheduler/SupervisedScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ DECLARE_GAUGE(
"Total number of ongoing RestHandlers coming from the low prio queue");
DECLARE_COUNTER(arangodb_scheduler_queue_full_failures_total,
"Tasks dropped and not added to internal queue");
DECLARE_COUNTER(arangodb_scheduler_queue_time_violations_total,
"Tasks dropped because the client-requested queue time restriction would be violated");
DECLARE_GAUGE(arangodb_scheduler_queue_length, uint64_t,
"Server's internal queue length");
DECLARE_COUNTER(arangodb_scheduler_threads_started_total,
Expand Down Expand Up @@ -238,6 +240,8 @@ SupervisedScheduler::SupervisedScheduler(application_features::ApplicationServer
arangodb_scheduler_threads_stopped_total{})),
_metricsQueueFull(server.getFeature<arangodb::MetricsFeature>().add(
arangodb_scheduler_queue_full_failures_total{})),
_metricsQueueTimeViolations(server.getFeature<arangodb::MetricsFeature>().add(
arangodb_scheduler_queue_time_violations_total{})),
_ongoingLowPriorityGauge(_server.getFeature<arangodb::MetricsFeature>().add(
arangodb_scheduler_ongoing_low_prio{})),
_metricsLastLowPriorityDequeueTime(
Expand Down Expand Up @@ -766,7 +770,8 @@ std::unique_ptr<SupervisedScheduler::WorkItemBase> SupervisedScheduler::getWork(
return nullptr;
};

// how often did we check for new work without success
// how often did we check for new work without success (note: this counter
// is used only to reduce the "last dequeue time" metric in case of inactivity)
uint64_t iterations = 0;
uint64_t maxCheckedQueue = 0;

Expand Down Expand Up @@ -822,13 +827,16 @@ std::unique_ptr<SupervisedScheduler::WorkItemBase> SupervisedScheduler::getWork(
}

// nothing to do for a long time, but the previously stored dequeue time
// is still set to something > 5ms (note: we use 5 here because a deque time
// of > 0ms is not very unlikely for any request)
if (maxCheckedQueue == LowPriorityQueue &&
iterations >= 10 &&
_metricsLastLowPriorityDequeueTime.load(std::memory_order_relaxed) > 5) {
// set the dequeue time back to 0.
setLastLowPriorityDequeueTime(0);
// is still set to something > 0ms.
// now gradually decrease the stored dequeue time, so that in a period
// of inactivity the dequeue time smoothly goes down back to 0, but not
// abruptly
if (maxCheckedQueue == LowPriorityQueue && iterations % 4 == 0) {
auto old = _metricsLastLowPriorityDequeueTime.load(std::memory_order_relaxed);
if (old > 0) {
// reduce dequeue time to 66%
setLastLowPriorityDequeueTime((old * 2) / 3);
}
}

if (state->_sleepTimeout_ms == 0) {
Expand Down Expand Up @@ -930,7 +938,7 @@ SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler)
_sleeping(false),
_ready(false),
_lastJobStarted(clock::now()),
_thread(new SupervisedSchedulerWorkerThread(scheduler._server, scheduler)) {}
_thread(std::make_unique<SupervisedSchedulerWorkerThread>(scheduler._server, scheduler)) {}

bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }

Expand Down Expand Up @@ -977,6 +985,15 @@ void SupervisedScheduler::trackEndOngoingLowPriorityTask() {
}
}

void SupervisedScheduler::trackQueueTimeViolation() {
++_metricsQueueTimeViolations;
}

/// @brief returns the last stored dequeue time [ms]
uint64_t SupervisedScheduler::getLastLowPriorityDequeueTime() const noexcept {
return _metricsLastLowPriorityDequeueTime.load();
}

void SupervisedScheduler::setLastLowPriorityDequeueTime(uint64_t time) noexcept {
// update only probabilistically, in order to reduce contention on the gauge
if ((_sharedPRNG.rand() & 7) == 0) {
Expand Down
6 changes: 6 additions & 0 deletions arangod/Scheduler/SupervisedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class SupervisedScheduler final : public Scheduler {
void trackBeginOngoingLowPriorityTask();
void trackEndOngoingLowPriorityTask();

void trackQueueTimeViolation();

/// @brief returns the last stored dequeue time [ms]
uint64_t getLastLowPriorityDequeueTime() const noexcept override;

/// @brief set the time it took for the last low prio item to be dequeued
/// (time between queuing and dequeing) [ms]
void setLastLowPriorityDequeueTime(uint64_t time) noexcept;
Expand Down Expand Up @@ -221,6 +226,7 @@ class SupervisedScheduler final : public Scheduler {
Counter& _metricsThreadsStarted;
Counter& _metricsThreadsStopped;
Counter& _metricsQueueFull;
Counter& _metricsQueueTimeViolations;
Gauge<uint64_t>& _ongoingLowPriorityGauge;

/// @brief amount of time it took for the last low prio item to be dequeued
Expand Down
4 changes: 2 additions & 2 deletions js/common/bootstrap/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@
"ERROR_AGENCY_CANNOT_REBUILD_DBS" : { "code" : 20021, "message" : "Cannot rebuild readDB and spearHead" },
"ERROR_AGENCY_MALFORMED_TRANSACTION" : { "code" : 20030, "message" : "malformed agency transaction" },
"ERROR_SUPERVISION_GENERAL_FAILURE" : { "code" : 20501, "message" : "general supervision failure" },
"ERROR_QUEUE_FULL" : { "code" : 21003, "message" : "named queue is full" },
"ERROR_QUEUE_FULL" : { "code" : 21003, "message" : "queue is full" },
"ERROR_QUEUE_TIME_REQUIREMENT_VIOLATED" : { "code" : 21004, "message" : "queue time violated" },
"ERROR_ACTION_OPERATION_UNABORTABLE" : { "code" : 6002, "message" : "this maintenance action cannot be stopped" },
"ERROR_ACTION_UNFINISHED" : { "code" : 6003, "message" : "maintenance action still processing" },
"ERROR_NO_SUCH_ACTION" : { "code" : 6004, "message" : "no such maintenance action" },
"ERROR_HOT_BACKUP_INTERNAL" : { "code" : 7001, "message" : "internal hot backup error" },
"ERROR_HOT_RESTORE_INTERNAL" : { "code" : 7002, "message" : "internal hot restore error" },
"ERROR_BACKUP_TOPOLOGY" : { "code" : 7003, "message" : "backup does not match this topology" },
Expand Down
Loading
0