8000 Cluster overwhelm countermeasures (#13108) · arangodb/arangodb@b9a5b52 · GitHub
[go: up one dir, main page]

Skip to content

Commit b9a5b52

Browse files
neunhoefjsteemannDan Larkin-YorkkvahedDan Larkin-York
authored
Cluster overwhelm countermeasures (#13108)
Co-authored-by: Jan <jsteemann@users.noreply.github.com> Co-authored-by: Dan Larkin-York <dan@arangodb.com> Co-authored-by: Kaveh Vahedipour <kaveh@arangodb.com> Co-authored-by: Dan Larkin-York <danielhlarkin@users.noreply.github.com> Co-authored-by: jsteemann <jan@arangodb.com>
1 parent 95d75ee commit b9a5b52

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1203
-630
lines changed

3rdParty/fuerte/include/fuerte/message.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ struct ResponseHeader final : public MessageHeader {
133133
// from (Response) a server.
134134
class Message {
135135
protected:
136-
Message() = default;
136+
Message() : _timestamp(std::chrono::steady_clock::now()) {}
137137
virtual ~Message() = default;
138138

139139
public:
@@ -174,6 +174,13 @@ class Message {
174174
bool isContentTypeVPack() const;
175175
bool isContentTypeHtml() const;
176176
bool isContentTypeText() const;
177+
178+
std::chrono::steady_clock::time_point timestamp() const { return _timestamp; }
179+
// set timestamp when it was sent
180+
void timestamp(std::chrono::steady_clock::time_point t) { _timestamp = t; }
181+
182+
private:
183+
std::chrono::steady_clock::time_point _timestamp;
177184
};
178185

179186
// Request contains the message send to a server in a request.

CHANGELOG

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
devel
22
-----
33

4+
* The scheduler will now run a minimum of 4 threads at all times, and the
5+
default and minimal value for `--server.maximal-threads` has been lowered from
6+
64 to the greater of 32 and twice the number of detected cores.
7+
8+
* Throttle work coming from low priority queue, according to a constant
9+
and to an estimate taking into account fanout for multi-shard operations.
10+
11+
* Move to 4 priority levels "low", "medium", "high" and "maintenance" in
12+
scheduler to ensure that maintenance work and diagnostics is always
13+
possible, even in the case of RocksDB throttles. Do not allow any
14+
RocksDB work on "maintenance".
15+
16+
* Commit replications on high priority queue.
17+
18+
* Essentially get rid of timeout in replication to drop followers. This
19+
is now entirely handled via reboot and failure tracking. The timeout
20+
has now a default minimum of 15 minutes but can still be configured via
21+
options.
22+
23+
* Additional metrics for all queue lengths and low prio ongoing work.
24+
25+
* New metric for number and total time of replication operations.
26+
27+
* New metrics for number of internal requests in flight, internal request
28+
duration, and internal request timeouts
29+
30+
* Fix `Gauge` assignment operators.
31+
432
* Fixed and extended LDAP log messages.
533

634
* Added LDAP_OFF if referrals and restart are false.
@@ -132,11 +160,11 @@ devel
132160
* Added metrics for the system CPU usage:
133161
- `arangodb_server_statistics_user_percent`: Percentage of time that the
134162
system CPUs have spent in user mode
135-
- `arangodb_server_statistics_system_percent`: Percentage of time that
163+
- `arangodb_server_statistics_system_percent`: Percentage of time that
136164
the system CPUs have spent in kernel mode
137-
- `arangodb_server_statistics_idle_percent`: Percentage of time that the
165+
- `arangodb_server_statistics_idle_percent`: Percentage of time that the
138166
system CPUs have been idle
139-
- `arangodb_server_statistics_iowait_percent`: Percentage of time that
167+
- `arangodb_server_statistics_iowait_percent`: Percentage of time that
140168
the system CPUs have been waiting for I/O
141169

142170
These metrics resemble the overall CPU usage metrics in `top`.
@@ -150,19 +178,19 @@ devel
150178
configured value was effectively clamped to a value of `1`.
151179

152180
* Improvements for the Pregel distributed graph processing feature:
153-
- during the loading/startup phase, the in-memory edge cache is now
154-
intentionally bypassed. The reason for this is that any edges are
181+
- during the loading/startup phase, the in-memory edge cache is now
182+
intentionally bypassed. The reason for this is that any edges are
155183
looked up exactly once, so caching them is not beneficial, but would
156184
only lead to cache pollution.
157185
- the loading/startup phase can now load multiple collections in parallel,
158186
whereas previously it was only loading multiple shards of the same
159187
collection in parallel. This change helps to reduce load times in case
160188
there are many collections with few shards, and on single server.
161-
- the loading and result storage phases code has been overhauled so that
189+
- the loading and result storage phases code has been overhauled so that
162190
it runs slightly faster.
163191
- for Pregel runs that are based on named graphs (in contrast to explicit
164192
naming of the to-be-used vertex and edge collections), only those edge
165-
collections are considered that, according to the graph definition, can
193+
collections are considered that, according to the graph definition, can
166194
have connections with the vertex. This change can reduce the loading
167195
time substantially in case the graph contains many edge definitions.
168196
- the number of executed rounds for the underlying Pregel algorithm now

arangod/Agency/AsyncAgencyComm.cpp

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -195,28 +195,27 @@ arangodb::AsyncAgencyComm::FutureResult agencyAsyncInquiry(AsyncAgencyCommManage
195195
return std::move(future).thenValue(
196196
[meta = std::move(meta), endpoint = std::move(endpoint), &man,
197197
body = std::move(body)](network::Response&& result) mutable {
198-
auto& resp = result.response;
199-
200-
switch (result.error) {
198+
199+
switch (result.error) {
201200
case Error::NoError:
202201
// handle inquiry response
203-
if (resp->statusCode() == fuerte::StatusNotFound) {
202+
if (result.statusCode() == fuerte::StatusNotFound) {
204203
return ::agencyAsyncSend(man, std::move(meta), std::move(body));
205204
}
206205

207-
if (resp->statusCode() == fuerte::StatusTemporaryRedirect) {
206+
if (result.statusCode() == fuerte::StatusTemporaryRedirect) {
208207
// get the Location header
209208
std::string const& location =
210-
resp->header.metaByKey(arangodb::StaticStrings::Location);
209+
result.response().header.metaByKey(arangodb::StaticStrings::Location);
211210
redirectOrError(man, endpoint, location);
212211
return ::agencyAsyncInquiry(man, std::move(meta), std::move(body));
213212
}
214-
return futures::makeFuture(AsyncAgencyCommResult{result.error,
215-
std::move(resp)}); // otherwise return error as is
213+
// otherwise return error as is
214+
return futures::makeFuture(AsyncAgencyCommResult{result.error, result.stealResponse()});
216215
case Error::ConnectionCanceled:
217216
if (man.server().isStopping()) {
218217
return futures::makeFuture(
219-
AsyncAgencyCommResult{result.error, std::move(resp)});
218+
AsyncAgencyCommResult{result.error, result.stealResponse()});
220219
}
221220
[[fallthrough]];
222221
case Error::CouldNotConnect:
@@ -236,8 +235,9 @@ arangodb::AsyncAgencyComm::FutureResult agencyAsyncInquiry(AsyncAgencyCommManage
236235
return agencyAsyncInquiry(man, std::move(meta), std::move(body));
237236

238237
case Error::VstUnauthorized:
238+
// unrecoverable error
239239
return futures::makeFuture(AsyncAgencyCommResult{result.error,
240-
std::move(resp)}); // unrecoverable error
240+
result.stealResponse()});
241241
}
242242

243243
ADB_UNREACHABLE;
@@ -295,88 +295,97 @@ arangodb::AsyncAgencyComm::FutureResult agencyAsyncSend(AsyncAgencyCommManager&
295295
<< "agencyAsyncSend [" << meta.requestId
296296
<< "] request done, result: " << to_string(result.error);
297297

298-
auto& req = result.request;
299-
auto& resp = result.response;
300-
auto& body = *req;
301-
302298
switch (result.error) {
303-
case Error::NoError:
299+
case Error::NoError: {
300+
TRI_ASSERT(result.hasRequest());
304301

305302
LOG_TOPIC("aac88", TRACE, Logger::AGENCYCOMM)
306303
<< "agencyAsyncSend [" << meta.requestId
307-
<< "] communication successful, statusCode: " << resp->statusCode();
304+
<< "] communication successful, statusCode: " << result.statusCode();
308305

309306
// success
310-
if ((resp->statusCode() >= 200 && resp->statusCode() <= 299)) {
307+
if ((result.statusCode() >= 200 && result.statusCode() <= 299)) {
311308
return futures::makeFuture(
312-
AsyncAgencyCommResult{result.error, std::move(resp)});
309+
AsyncAgencyCommResult{result.error, result.stealResponse()});
313310
}
314311
// user error
315-
if ((400 <= resp->statusCode() && resp->statusCode() <= 499)) {
312+
if ((400 <= result.statusCode() && result.statusCode() <= 499)) {
316313
return futures::makeFuture(
317-
AsyncAgencyCommResult{result.error, std::move(resp)});
314+
AsyncAgencyCommResult{result.error, result.stealResponse()});
318315
}
319316

320317
// 503 redirect
321-
if (resp->statusCode() == StatusTemporaryRedirect) {
318+
if (result.statusCode() == StatusTemporaryRedirect) {
322319
// get the Location header
323320
std::string const& location =
324-
resp->header.metaByKey(arangodb::StaticStrings::Location);
321+
result.response().header.metaByKey(arangodb::StaticStrings::Location);
325322
redirectOrError(man, endpoint, location);
326323

327324
// send again
328-
return ::agencyAsyncSend(man, std::move(meta), std::move(body).moveBuffer());
325+
return ::agencyAsyncSend(man, std::move(meta), std::move(result.request()).moveBuffer());
329326
}
330327

331328
// if we only did reads return here
332329
if (meta.type == RequestType::READ) {
333330
return futures::makeFuture(
334-
AsyncAgencyCommResult{result.error, std::move(resp)});
331+
AsyncAgencyCommResult{result.error, result.stealResponse()});
335332
}
336-
337-
[[fallthrough]];
338-
/* fallthrough */
339-
case Error::ConnectionCanceled:
340-
if (man.server().isStopping()) {
333+
}
334+
[[fallthrough]];
335+
336+
case Error::ConnectionCanceled: {
337+
if (man.server().isStopping() || !result.hasRequest()) {
341338
return futures::makeFuture(
342-
AsyncAgencyCommResult{result.error, std::move(resp)});
339+
AsyncAgencyCommResult{result.error, result.stealResponse()});
343340
}
344-
[[fallthrough]];
341+
342+
TRI_ASSERT(result.hasRequest());
343+
}
344+
[[fallthrough]];
345+
345346
case Error::RequestTimeout:
346347
case Error::ConnectionClosed:
347348
case Error::ProtocolError:
348349
case Error::WriteError:
349350
case Error::ReadError:
350-
case Error::CloseRequested:
351-
// inquiry the request
351+
case Error::CloseRequested: {
352+
TRI_ASSERT(result.hasRequest());
353+
354+
// inquire the request
352355
man.reportError(endpoint);
353356
// in case of a write transaction we have to do inquiry
354357
if (meta.isInquiryOnNoResponse()) {
355358
return ::agencyAsyncInquiry(man, std::move(meta),
356-
std::move(body).moveBuffer());
359+
std::move(result.request()).moveBuffer());
357360
} else if (!meta.isRetryOnNoResponse()) {
358361
// return error
359362
return futures::makeFuture(
360-
AsyncAgencyCommResult{result.error, std::move(resp)});
363+
AsyncAgencyCommResult{result.error, result.stealResponse()});
361364
}
362-
// otherwise just send again
363-
[[fallthrough]];
365+
}
366+
// otherwise just send again
367+
[[fallthrough]];
364368

365-
case Error::CouldNotConnect:
369+
case Error::CouldNotConnect: {
366370
LOG_TOPIC("aac90", TRACE, Logger::AGENCYCOMM)
367371
<< "agencyAsyncSend [" << meta.requestId << "] retry request soon";
368372
// retry to send the request
369373
man.reportError(endpoint);
370-
[[fallthrough]];
374+
}
375+
[[fallthrough]];
371376

372-
case Error::QueueCapacityExceeded:
373-
return ::agencyAsyncSend(man, std::move(meta),
374-
std::move(body).moveBuffer()); // retry always
377+
case Error::QueueCapacityExceeded: {
378+
TRI_ASSERT(result.hasRequest());
379+
return ::agencyAsyncSend(man, std::move(meta),
380+
std::move(result.request()).moveBuffer()); // retry always
381+
}
375382

376-
case Error::VstUnauthorized:
383+
case Error::VstUnauthorized: {
384+
// unrecoverable error
377385
return futures::makeFuture(
378386
AsyncAgencyCommResult{result.error,
379-
std::move(resp)}); // unrecoverable error
387+
result.stealResponse()});
388+
}
380389
}
381390

382391
ADB_UNREACHABLE;

arangod/Agency/Inception.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ void handleGossipResponse(arangodb::network::Response const& r,
4646
std::string newLocation;
4747

4848
if (r.ok()) {
49-
velocypack::Slice payload = r.response->slice();
49+
velocypack::Slice payload = r.slice();
5050

5151
switch (r.statusCode()) {
5252
case 200: // Digest other configuration
@@ -58,7 +58,7 @@ void handleGossipResponse(arangodb::network::Response const& r,
5858

5959
case 307: // Add new endpoint to gossip peers
6060
bool found;
61-
newLocation = r.response->header.metaByKey("location", found);
61+
newLocation = r.response().header.metaByKey("location", found);
6262

6363
if (found) {
6464
if (newLocation.compare(0, 5, "https") == 0) {

arangod/Aql/EngineInfoContainerDBServerServerBased.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
372372
return {code, message};
373373
}
374374

375-
VPackSlice response = res.response->slice();
375+
VPackSlice response = res.slice();
376376
if (response.isNone()) {
377377
return {TRI_ERROR_INTERNAL, "malformed response while building engines"};
378378
}

arangod/Aql/Query.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1179,7 +1179,8 @@ futures::Future<Result> finishDBServerParts(Query& query, int errorCode) {
11791179
network::RequestOptions options;
11801180
options.database = query.vocbase().name();
11811181
options.timeout = network::Timeout(60.0); // Picked arbitrarily
1182-
// options.skipScheduler = true;
1182+
options.continuationLane = RequestLane::CLUSTER_AQL_CONTINUATION;
1183+
// options.skipScheduler = true;
11831184

11841185
VPackBuffer<uint8_t> body;
11851186
VPackBuilder builder(body);

arangod/Aql/SharedQueryState.cpp

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -128,37 +128,36 @@ void SharedQueryState::queueHandler() {
128128
// We are shutting down
129129
return;
130130
}
131-
132-
bool queued = scheduler->queue(RequestLane::CLUSTER_AQL,
133-
[self = shared_from_this(),
134-
cb = _wakeupCb,
135-
v = _cbVersion]() {
136-
137-
std::unique_lock<std::mutex> lck(self->_mutex, std::defer_lock);
138-
139-
do {
140-
bool cntn = false;
141-
try {
142-
cntn = cb();
143-
} catch (...) {}
144-
145-
lck.lock();
146-
if (v == self->_cbVersion) {
147-
unsigned c = self->_numWakeups--;
148-
TRI_ASSERT(c > 0);
149-
if (c == 1 || !cntn || !self->_valid) {
150-
break;
151-
}
152-
} else {
153-
return;
154-
}
155-
lck.unlock();
156-
} while (true);
157-
158-
TRI_ASSERT(lck);
159-
self->queueHandler();
160-
});
161-
131+
132+
bool queued =
133+
scheduler->queue(RequestLane::CLUSTER_AQL_CONTINUATION,
134+
[self = shared_from_this(), cb = _wakeupCb, v = _cbVersion]() {
135+
std::unique_lock<std::mutex> lck(self->_mutex, std::defer_lock);
136+
137+
do {
138+
bool cntn = false;
139+
try {
140+
cntn = cb();
141+
} catch (...) {
142+
}
143+
144+
lck.lock();
145+
if (v == self->_cbVersion) {
146+
unsigned c = self->_numWakeups--;
147+
TRI_ASSERT(c > 0);
148+
if (c == 1 || !cntn || !self->_valid) {
149+
break;
150+
}
151+
} else {
152+
return;
153+
}
154+
lck.unlock();
155+
} while (true);
156+
157+
TRI_ASSERT(lck);
158+
self->queueHandler();
159+
});
160+
162161
if (!queued) { // just invalidate
163162
_wakeupCb = nullptr;
164163
_valid = false;

0 commit comments

Comments
 (0)
0