8000 Refactor stuff, add async batch extension task (#6875) (#6880) · mnemosdev/arangodb@6628a4e · GitHub
[go: up one dir, main page]

Skip to content

Commit 6628a4e

Browse files
graetzerjsteemann
authored andcommitted
Refactor stuff, add async batch extension task (arangodb#6875) (arangodb#6880)
1 parent 239771b commit 6628a4e

19 files changed

+258
-256
lines changed

arangod/MMFiles/MMFilesIncrementalSync.h

Lines changed: 27 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -258,27 +258,17 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
258258
"' from " + url;
259259
syncer.setProgress(progress);
260260

261-
std::unique_ptr<httpclient::SimpleHttpResult> response(
262-
syncer._state.connection.client->retryRequest(rest::RequestType::GET, url,
263-
nullptr, 0));
264-
265-
if (response == nullptr || !response->isComplete()) {
266-
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
267-
std::string("could not connect to master at ") +
268-
syncer._state.master.endpoint + ": " +
269-
syncer._state.connection.client->getErrorMessage());
270-
}
271-
272-
TRI_ASSERT(response != nullptr);
261+
std::unique_ptr<httpclient::SimpleHttpResult> response;
262+
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
263+
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
264+
});
273265

274-
if (response->wasHttpError()) {
275-
return Result(TRI_ERROR_REPLICATION_MASTER_ERROR,
276-
std::string("got invalid response from master at ") +
277-
syncer._state.master.endpoint + ": HTTP " +
278-
basics::StringUtils::itoa(response->getHttpReturnCode()) +
279-
": " + response->getHttpReturnMessage());
266+
if (replutils::hasFailed(response.get())) {
267+
return buildHttpError(response.get(), url, syncer._state.connection);
280268
}
281269

270+
TRI_ASSERT(response != nullptr);
271+
282272
VPackBuilder builder;
283273
Result r = replutils::parseResponse(builder, response.get());
284274

@@ -486,29 +476,18 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
486476
progress = "fetching keys chunk " + std::to_string(currentChunkId) +
487477
" for collection '" + coll->name() + "' from " + url;
488478
syncer.setProgress(progress);
489-
490-
std::unique_ptr<httpclient::SimpleHttpResult> response(
491-
syncer._state.connection.client->retryRequest(rest::RequestType::PUT,
492-
url, nullptr, 0));
493-
494-
if (response == nullptr || !response->isComplete()) {
495-
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
496-
std::string("could not connect to master at ") +
497-
syncer._state.master.endpoint + ": " +
498-
syncer._state.connection.client->getErrorMessage());
479+
480+
std::unique_ptr<httpclient::SimpleHttpResult> response;
481+
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
482+
response.reset(client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
483+
});
484+
485+
if (replutils::hasFailed(response.get())) {
486+
return buildHttpError(response.get(), url, syncer._state.connection);
499487
}
500488

501489
TRI_ASSERT(response != nullptr);
502-
503-
if (response->wasHttpError()) {
504-
return Result(
505-
TRI_ERROR_REPLICATION_MASTER_ERROR,
506-
std::string("got invalid response from master at ") +
507-
syncer._state.master.endpoint + ": HTTP " +
508-
basics::StringUtils::itoa(response->getHttpReturnCode()) +
509-
": " + response->getHttpReturnMessage());
510-
}
511-
490+
512491
VPackBuilder builder;
513492
Result r = replutils::parseResponse(builder, response.get());
514493

@@ -679,31 +658,19 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
679658
coll->name() + "' from " + url;
680659

681660
syncer.setProgress(progress);
682-
683-
std::unique_ptr<httpclient::SimpleHttpResult> response(
684-
syncer._state.connection.client->retryRequest(
685-
rest::RequestType::PUT, url, keyJsonString.c_str(),
686-
keyJsonString.size()));
687-
688-
if (response == nullptr || !response->isComplete()) {
689-
return Result(
690-
TRI_ERROR_REPLICATION_NO_RESPONSE,
691-
std::string("could not connect to master at ") +
692-
syncer._state.master.endpoint + ": " +
693-
syncer._state.connection.client->getErrorMessage());
661+
662+
std::unique_ptr<httpclient::SimpleHttpResult> response;
663+
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
664+
response.reset(client->retryRequest(rest::RequestType::PUT, url, keyJsonString.c_str(),
665+
keyJsonString.size()));
666+
});
667+
668+
if (replutils::hasFailed(response.get())) {
669+
return buildHttpError(response.get(), url, syncer._state.connection);
694670
}
695-
671+
696672
TRI_ASSERT(response != nullptr);
697673

698-
if (response->wasHttpError()) {
699-
return Result(
700-
TRI_ERROR_REPLICATION_MASTER_ERROR,
701-
std::string("got invalid response from master at ") +
702-
syncer._state.master.endpoint + ": HTTP " +
703-
basics::StringUtils::itoa(response->getHttpReturnCode()) +
704-
": " + response->getHttpReturnMessage());
705-
}
706-
707674
VPackBuilder builder;
708675
Result r = replutils::parseResponse(builder, response.get());
709676

arangod/Pregel/Conductor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,10 +421,10 @@ void Conductor::startRecovery() {
421421
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
422422

423423
// let's wait for a final state in the cluster
424-
_boost_timer.reset(SchedulerFeature::SCHEDULER->newDeadlineTimer(
425-
boost::posix_time::seconds(2)));
426-
_boost_timer->async_wait([this](const asio::error_code& error) {
427-
_boost_timer.reset();
424+
_steady_timer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
425+
_steady_timer->expires_after(std::chrono::seconds(2));
426+
_steady_timer->async_wait([this](const asio::error_code& error) {
427+
_steady_timer.reset();
428428

429429
if (error == asio::error::operation_aborted ||
430430
_state != ExecutionState::RECOVERING) {

arangod/Pregel/Conductor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Conductor {
9191
double _startTimeSecs = 0;
9292
double _computationStartTimeSecs = 0;
9393
double _endTimeSecs = 0;
94-
std::unique_ptr<asio::deadline_timer> _boost_timer;
94+
std::unique_ptr<asio::steady_timer> _steady_timer;
9595

9696
bool _startGlobalStep();
9797
int _initializeWorkers(std::string const& path, VPackSlice additional);

arangod/Pregel/Worker.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -582,9 +582,9 @@ void Worker<V, E, M>::_continueAsync() {
582582
int64_t milli =
583583
_writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5;
584584
// start next iteration in $milli mseconds.
585-
_boost_timer.reset(SchedulerFeature::SCHEDULER->newDeadlineTimer(
586-
boost::posix_time::millisec(milli)));
587-
_boost_timer->async_wait([this](const asio::error_code& error) {
585+
_steady_timer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
586+
_steady_timer->expires_after(std::chrono::milliseconds(milli));
587+
_steady_timer->async_wait([this](const asio::error_code& error) {
588588
if (error != asio::error::operation_aborted) {
589589
{ // swap these pointers atomically
590590
MY_WRITE_LOCKER(guard, _cacheRWLock);
@@ -599,7 +599,7 @@ void Worker<V, E, M>::_continueAsync() {
599599
_conductorAggregators->aggregateValues(*_workerAggregators.get());
600600
_workerAggregators->resetValues();
601601
_startProcessing();
602-
_boost_timer.reset();
602+
_steady_timer.reset();
603603
}
604604
});
605605
}

arangod/Pregel/Worker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class Worker : public IWorker {
136136
std::atomic<uint64_t> _nextGSSSendMessageCount;
137137
/// if the worker has started sendng messages to the next GSS
138138
std::atomic<bool> _requestedNextGSS;
139-
std::unique_ptr<asio::deadline_timer> _boost_timer;
139+
std::unique_ptr<asio::steady_timer> _steady_timer;
140140

141141
void _initializeMessageCaches();
142142
void _initializeVertexContext(VertexContext<V, E, M>* ctx);

arangod/Replication/DatabaseInitialSyncer.cpp

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
198198
if (r.fail()) {
199199
return r;
200200
}
201+
202+
startRecurringBatchExtension();
201203
}
202204

203205
VPackSlice collections, views;
@@ -259,8 +261,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
259261
}
260262
}
261263

262-
/// @brief returns the inventory
263-
Result DatabaseInitialSyncer::inventory(VPackBuilder& builder) {
264+
/// @brief fetch the server's inventory, public method for TailingSyncer
265+
Result DatabaseInitialSyncer::getInventory(VPackBuilder& builder) {
264266
if (!_state.connection.valid()) {
265267
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
266268
}
@@ -331,9 +333,11 @@ Result DatabaseInitialSyncer::sendFlush() {
331333
// send request
332334
_config.progress.set("sending WAL flush command to url " + url);
333335

334-
std::unique_ptr<httpclient::SimpleHttpResult> response(
335-
_config.connection.client->retryRequest(rest::RequestType::PUT, url,
336-
body.c_str(), body.size()));
336+
std::unique_ptr<httpclient::SimpleHttpResult> response;
337+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
338+
response.reset(client->retryRequest(rest::RequestType::PUT, url,
339+
body.c_str(), body.size()));
340+
});
337341

338342
if (replutils::hasFailed(response.get())) {
339343
return replutils::buildHttpError(response.get(), url, _config.connection);
@@ -530,9 +534,12 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
530534
double t = TRI_microtime();
531535

532536
// send request
533-
std::unique_ptr<httpclient::SimpleHttpResult> response(
534-
_config.connection.client->retryRequest(rest::RequestType::GET, url,
535-
nullptr, 0, headers));
537+
std::unique_ptr<httpclient::SimpleHttpResult> response;
538+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
539+
response.reset(client->retryRequest(rest::RequestType::GET, url,
540+
nullptr, 0, headers));
541+
});
542+
536543

537544
if (replutils::hasFailed(response.get())) {
538545
stats.waitedForDump += TRI_microtime() - t;
@@ -564,8 +571,9 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
564571
}
565572

566573
std::string const jobUrl = "/_api/job/" + jobId;
567-
response.reset(_config.connection.client->request(
568-
rest::RequestType::PUT, jobUrl, nullptr, 0));
574+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
575+
response.reset(client->request(rest::RequestType::PUT, jobUrl, nullptr, 0));
576+
});
569577

570578
if (response != nullptr && response->isComplete()) {
571579
if (response->hasHeaderField("x-arango-async-id")) {
@@ -841,9 +849,12 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
841849
// so we're sending the x-arango-async header here
842850
auto headers = replutils::createHeaders();
843851
headers[StaticStrings::Async] = "store";
844-
std::unique_ptr<httpclient::SimpleHttpResult> response(
845-
_config.connection.client->retryRequest(rest::RequestType::POST, url,
846-
nullptr, 0, headers));
852+
853+
std::unique_ptr<httpclient::SimpleHttpResult> response;
854+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
855+
response.reset(client->retryRequest(rest::RequestType::POST, url,
856+
nullptr, 0, headers));
857+
});
847858

848859
if (replutils::hasFailed(response.get())) {
849860
return replutils::buildHttpError(response.get(), url, _config.connection);
@@ -868,8 +879,9 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
868879
}
869880

870881
std::string const jobUrl = "/_api/job/" + jobId;
871-
response.reset(_config.connection.client->request(rest::RequestType::PUT,
872-
jobUrl, nullptr, 0));
882+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
883+
response.reset(client->request(rest::RequestType::PUT, jobUrl, nullptr, 0));
884+
});
873885

874886
if (response != nullptr && response->isComplete()) {
875887
if (response->hasHeaderField("x-arango-async-id")) {
@@ -940,9 +952,11 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
940952
_config.progress.set(msg);
941953

942954
// now delete the keys we ordered
943-
std::unique_ptr<httpclient::SimpleHttpResult> response(
944-
_config.connection.client->retryRequest(rest::RequestType::DELETE_REQ,
945-
url, nullptr, 0));
955+
std::unique_ptr<httpclient::SimpleHttpResult> response;
956+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
957+
response.reset(client->retryRequest(rest::RequestType::DELETE_REQ,
958+
url, nullptr, 0));
959+
});
946960
};
947961

948962
TRI_DEFER(shutdown());
@@ -1364,9 +1378,11 @@ arangodb::Result DatabaseInitialSyncer::fetchInventory(VPackBuilder& builder) {
13641378

13651379
// send request
13661380
_config.progress.set("fetching master inventory from " + url);
1367-
std::unique_ptr<httpclient::SimpleHttpResult> response(
1368-
_config.connection.client->retryRequest(rest::RequestType::GET, url,
1369-
nullptr, 0));
1381+
std::unique_ptr<httpclient::SimpleHttpResult> response;
1382+
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
1383+
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
1384+
});
1385+
13701386
if (replutils::hasFailed(response.get())) {
13711387
if (!_config.isChild()) {
13721388
_config.batch.finish(_config.connection, _config.progress);

arangod/Replication/DatabaseInitialSyncer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "Cluster/ServerState.h"
2929
#include "Replication/InitialSyncer.h"
3030
#include "Replication/utilities.h"
31+
#include "Utils/SingleCollectionTransaction.h"
3132

3233
struct TRI_vocbase_t;
3334

@@ -161,7 +162,7 @@ class DatabaseInitialSyncer final : public InitialSyncer {
161162
double batchUpdateTime() const { return _config.batch.updateTime; }
162163

163164
/// @brief fetch the server's inventory, public method
164-
Result inventory(arangodb::velocypack::Builder& builder);
165+
Result getInventory(arangodb::velocypack::Builder& builder);
165166

166167
private:
167168
/// @brief order a new chunk from the /dump API

arangod/Replication/DatabaseTailingSyncer.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,10 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
133133
"&collection=" + StringUtils::urlEncode(collectionName);
134134

135135
// send request
136-
std::unique_ptr<SimpleHttpResult> response(
137-
_state.connection.client->request(rest::RequestType::GET, url, nullptr,
138-
0));
136+
std::unique_ptr<httpclient::SimpleHttpResult> response;
137+
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
138+
response.reset(client->request(rest::RequestType::GET, url, nullptr, 0));
139+
});
139140

140141
if (replutils::hasFailed(response.get())) {
141142
return replutils::buildHttpError(response.get(), url, _state.connection);
@@ -234,7 +235,7 @@ bool DatabaseTailingSyncer::skipMarker(VPackSlice const& slice) {
234235
VPackBuilder inventoryResponse;
235236

236237
auto init = std::make_shared<DatabaseInitialSyncer>(*_vocbase, _state.applier);
237-
Result res = init->inventory(inventoryResponse);
238+
Result res = init->getInventory(inventoryResponse);
238239
_queriedTranslations = true;
239240
if (res.fail()) {
240241
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage();

arangod/Replication/GlobalInitialSyncer.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,11 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
128128
if (r.fail()) {
129129
return r;
130130
}
131+
132+
startRecurringBatchExtension();
131133
}
132134
TRI_DEFER(if (!_state.isChildSyncer) {
135+
_batchPingTimer->cancel();
133136
_batch.finish(_state.connection, _progress);
134137
});
135138
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch done";
@@ -198,7 +201,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
198201
DatabaseGuard guard(nameSlice.copyString());
199202

200203
// change database name in place
201-
auto configurationCopy = _state.applier;
204+
ReplicationApplierConfiguration configurationCopy = _state.applier;
202205
configurationCopy._database = nameSlice.copyString();
203206

204207
auto syncer = std::make_shared<DatabaseInitialSyncer>(*vocbase, configurationCopy);
@@ -364,8 +367,8 @@ Result GlobalInitialSyncer::updateServerInventory(
364367
return TRI_ERROR_NO_ERROR;
365368
}
366369

367-
/// @brief returns the inventory
368-
Result GlobalInitialSyncer::inventory(VPackBuilder& builder) {
370+
/// @brief fetch the server's inventory, public method for TailingSyncer
371+
Result GlobalInitialSyncer::getInventory(VPackBuilder& builder) {
369372
if (!_state.connection.valid()) {
370373
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
371374
} else if (application_features::ApplicationServer::isStopping()) {
@@ -392,9 +395,10 @@ Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) {
392395
}
393396

394397
// send request
395-
std::unique_ptr<SimpleHttpResult> response(
396-
_state.connection.client->retryRequest(rest::RequestType::GET, url,
397-
nullptr, 0));
398+
std::unique_ptr<httpclient::SimpleHttpResult> response;
399+
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
400+
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
401+
});
398402

399403
if (replutils::hasFailed(response.get())) {
400404
if (!_state.isChildSyncer) {

arangod/Replication/GlobalInitialSyncer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class GlobalInitialSyncer final : public InitialSyncer {
4141
arangodb::Result run(bool incremental) override;
4242

4343
/// @brief fetch the server's inventory, public method
44-
Result inventory(arangodb::velocypack::Builder& builder);
44+
Result getInventory(arangodb::velocypack::Builder& builder);
4545

4646
private:
4747

0 commit comments

Comments
 (0)
0