8000 Refactor stuff, add async batch extension task (#6875) by graetzer · Pull Request #6880 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Refactor stuff, add async batch extension task (#6875) #6880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
8000
Diff view
Diff view
87 changes: 27 additions & 60 deletions arangod/MMFiles/MMFilesIncrementalSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,27 +258,17 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
"' from " + url;
syncer.setProgress(progress);

std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._state.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0));

if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());
}

TRI_ASSERT(response != nullptr);
std::unique_ptr<httpclient::SimpleHttpResult> response;
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
});

if (response->wasHttpError()) {
return Result(TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
if (replutils::hasFailed(response.get())) {
return buildHttpError(response.get(), url, syncer._state.connection);
}

TRI_ASSERT(response != nullptr);

VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());

Expand Down Expand Up @@ -486,29 +476,18 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
progress = "fetching keys chunk " + std::to_string(currentChunkId) +
" for collection '" + coll->name() + "' from " + url;
syncer.setProgress(progress);

std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._state.connection.client->retryRequest(rest::RequestType::PUT,
url, nullptr, 0));

if (response == nullptr || !response->isComplete()) {
return Result(TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());

std::unique_ptr<httpclient::SimpleHttpResult> response;
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url, nullptr, 0));
});

if (replutils::hasFailed(response.get())) {
return buildHttpError(response.get(), url, syncer._state.connection);
}

TRI_ASSERT(response != nullptr);

if (response->wasHttpError()) {
return Result(
TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}


VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());

Expand Down Expand Up @@ -679,31 +658,19 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
coll->name() + "' from " + url;

syncer.setProgress(progress);

std::unique_ptr<httpclient::SimpleHttpResult> response(
syncer._state.connection.client->retryRequest(
rest::RequestType::PUT, url, keyJsonString.c_str(),
keyJsonString.size()));

if (response == nullptr || !response->isComplete()) {
return Result(
TRI_ERROR_REPLICATION_NO_RESPONSE,
std::string("could not connect to master at ") +
syncer._state.master.endpoint + ": " +
syncer._state.connection.client->getErrorMessage());

std::unique_ptr<httpclient::SimpleHttpResult> response;
syncer._state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url, keyJsonString.c_str(),
keyJsonString.size()));
});

if (replutils::hasFailed(response.get())) {
return buildHttpError(response.get(), url, syncer._state.connection);
}

TRI_ASSERT(response != nullptr);

if (response->wasHttpError()) {
return Result(
TRI_ERROR_REPLICATION_MASTER_ERROR,
std::string("got invalid response from master at ") +
syncer._state.master.endpoint + ": HTTP " +
basics::StringUtils::itoa(response->getHttpReturnCode()) +
": " + response->getHttpReturnMessage());
}

VPackBuilder builder;
Result r = replutils::parseResponse(builder, response.get());

Expand Down
8 changes: 4 additions & 4 deletions arangod/Pregel/Conductor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,10 @@ void Conductor::startRecovery() {
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);

// let's wait for a final state in the cluster
_boost_timer.reset(SchedulerFeature::SCHEDULER->newDeadlineTimer(
boost::posix_time::seconds(2)));
_boost_timer->async_wait([this](const asio::error_code& error) {
_boost_timer.reset();
_steady_timer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
_steady_timer->expires_after(std::chrono::seconds(2));
_steady_timer->async_wait([this](const asio::error_code& error) {
_steady_timer.reset();

if (error == asio::error::operation_aborted ||
_state != ExecutionState::RECOVERING) {
Expand Down
2 changes: 1 addition & 1 deletion arangod/Pregel/Conductor.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class Conductor {
double _startTimeSecs = 0;
double _computationStartTimeSecs = 0;
double _endTimeSecs = 0;
std::unique_ptr<asio::deadline_timer> _boost_timer;
std::unique_ptr<asio::steady_timer> _steady_timer;

bool _startGlobalStep();
int _initializeWorkers(std::string const& path, VPackSlice additional);
Expand Down
8 changes: 4 additions & 4 deletions arangod/Pregel/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,9 @@ void Worker<V, E, M>::_continueAsync() {
int64_t milli =
_writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5;
// start next iteration in $milli mseconds.
_boost_timer.reset(SchedulerFeature::SCHEDULER->newDeadlineTimer(
boost::posix_time::millisec(milli)));
_boost_timer->async_wait([this](const asio::error_code& error) {
_steady_timer.reset(SchedulerFeature::SCHEDULER->newSteadyTimer());
_steady_timer->expires_after(std::chrono::milliseconds(milli));
_steady_timer->async_wait([this](const asio::error_code& error) {
if (error != asio::error::operation_aborted) {
{ // swap these pointers atomically
MY_WRITE_LOCKER(guard, _cacheRWLock);
Expand All @@ -599,7 +599,7 @@ void Worker<V, E, M>::_continueAsync() {
_conductorAggregators->aggregateValues(*_workerAggregators.get());
_workerAggregators->resetValues();
_startProcessing();
_boost_timer.reset();
_steady_timer.reset();
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion arangod/Pregel/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Worker : public IWorker {
std::atomic<uint64_t> _nextGSSSendMessageCount;
/// if the worker has started sendng messages to the next GSS
std::atomic<bool> _requestedNextGSS;
std::unique_ptr<asio::deadline_timer> _boost_timer;
std::unique_ptr<asio::steady_timer> _steady_timer;

void _initializeMessageCaches();
void _initializeVertexContext(VertexContext<V, E, M>* ctx);
Expand Down
58 changes: 37 additions & 21 deletions arangod/Replication/DatabaseInitialSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
if (r.fail()) {
return r;
}

startRecurringBatchExtension();
}

VPackSlice collections, views;
Expand Down Expand Up @@ -259,8 +261,8 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
}
}

/// @brief returns the inventory
Result DatabaseInitialSyncer::inventory(VPackBuilder& builder) {
/// @brief fetch the server's inventory, public method for TailingSyncer
Result DatabaseInitialSyncer::getInventory(VPackBuilder& builder) {
if (!_state.connection.valid()) {
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
}
Expand Down Expand Up @@ -331,9 +333,11 @@ Result DatabaseInitialSyncer::sendFlush() {
// send request
_config.progress.set("sending WAL flush command to url " + url);

std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::PUT, url,
body.c_str(), body.size()));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::PUT, url,
body.c_str(), body.size()));
});

if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _config.connection);
Expand Down Expand Up @@ -530,9 +534,12 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
double t = TRI_microtime();

// send request
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0, headers));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url,
nullptr, 0, headers));
});


if (replutils::hasFailed(response.get())) {
stats.waitedForDump += TRI_microtime() - t;
Expand Down Expand Up @@ -564,8 +571,9 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
}

std::string const jobUrl = "/_api/job/" + jobId;
response.reset(_config.connection.client->request(
rest::RequestType::PUT, jobUrl, nullptr, 0));
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::PUT, jobUrl, nullptr, 0));
});

if (response != nullptr && response->isComplete()) {
if (response->hasHeaderField("x-arango-async-id")) {
Expand Down Expand Up @@ -841,9 +849,12 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
// so we're sending the x-arango-async header here
auto headers = replutils::createHeaders();
headers[StaticStrings::Async] = "store";
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::POST, url,
nullptr, 0, headers));

std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::POST, url,
nullptr, 0, headers));
});

if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _config.connection);
Expand All @@ -868,8 +879,9 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
}

std::string const jobUrl = "/_api/job/" + jobId;
response.reset(_config.connection.client->request(rest::RequestType::PUT,
jobUrl, nullptr, 0));
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::PUT, jobUrl, nullptr, 0));
});

if (response != nullptr && response->isComplete()) {
if (response->hasHeaderField("x-arango-async-id")) {
Expand Down Expand Up @@ -940,9 +952,11 @@ Result DatabaseInitialSyncer::fetchCollectionSync(
_config.progress.set(msg);

// now delete the keys we ordered
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::DELETE_REQ,
url, nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::DELETE_REQ,
url, nullptr, 0));
});
};

TRI_DEFER(shutdown());
Expand Down Expand Up @@ -1364,9 +1378,11 @@ arangodb::Result DatabaseInitialSyncer::fetchInventory(VPackBuilder& builder) {

// send request
_config.progress.set("fetching master inventory from " + url);
std::unique_ptr<httpclient::SimpleHttpResult> response(
_config.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_config.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
});

if (replutils::hasFailed(response.get())) {
if (!_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);
Expand Down
3 changes: 2 additions & 1 deletion arangod/Replication/DatabaseInitialSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "Cluster/ServerState.h"
#include "Replication/InitialSyncer.h"
#include "Replication/utilities.h"
#include "Utils/SingleCollectionTransaction.h"

struct TRI_vocbase_t;

Expand Down Expand Up @@ -161,7 +162,7 @@ class DatabaseInitialSyncer final : public InitialSyncer {
double batchUpdateTime() const { return _config.batch.updateTime; }

/// @brief fetch the server's inventory, public method
Result inventory(arangodb::velocypack::Builder& builder);
Result getInventory(arangodb::velocypack::Builder& builder);

private:
/// @brief order a new chunk from the /dump API
Expand Down
9 changes: 5 additions & 4 deletions arangod/Replication/DatabaseTailingSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
"&collection=" + StringUtils::urlEncode(collectionName);

// send request
std::unique_ptr<SimpleHttpResult> response(
_state.connection.client->request(rest::RequestType::GET, url, nullptr,
0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->request(rest::RequestType::GET, url, nullptr, 0));
});

if (replutils::hasFailed(response.get())) {
return replutils::buildHttpError(response.get(), url, _state.connection);
Expand Down Expand Up @@ -234,7 +235,7 @@ bool DatabaseTailingSyncer::skipMarker(VPackSlice const& slice) {
VPackBuilder inventoryResponse;

auto init = std::make_shared<DatabaseInitialSyncer>(*_vocbase, _state.applier);
Result res = init->inventory(inventoryResponse);
Result res = init->getInventory(inventoryResponse);
_queriedTranslations = true;
if (res.fail()) {
LOG_TOPIC(ERR, Logger::REPLICATION) << "got error while fetching master inventory for collection name translations: " << res.errorMessage();
Expand Down
16 changes: 10 additions & 6 deletions arangod/Replication/GlobalInitialSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
if (r.fail()) {
return r;
}

startRecurringBatchExtension();
}
TRI_DEFER(if (!_state.isChildSyncer) {
_batchPingTimer->cancel();
_batch.finish(_state.connection, _progress);
});
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch done";
Expand Down Expand Up @@ -198,7 +201,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
DatabaseGuard guard(nameSlice.copyString());

// change database name in place
auto configurationCopy = _state.applier;
ReplicationApplierConfiguration configurationCopy = _state.applier;
configurationCopy._database = nameSlice.copyString();

auto syncer = std::make_shared<DatabaseInitialSyncer>(*vocbase, configurationCopy);
Expand Down Expand Up @@ -364,8 +367,8 @@ Result GlobalInitialSyncer::updateServerInventory(
return TRI_ERROR_NO_ERROR;
}

/// @brief returns the inventory
Result GlobalInitialSyncer::inventory(VPackBuilder& builder) {
/// @brief fetch the server's inventory, public method for TailingSyncer
Result GlobalInitialSyncer::getInventory(VPackBuilder& builder) {
if (!_state.connection.valid()) {
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
} else if (application_features::ApplicationServer::isStopping()) {
Expand All @@ -392,9 +395,10 @@ Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) {
}

// send request
std::unique_ptr<SimpleHttpResult> response(
_state.connection.client->retryRequest(rest::RequestType::GET, url,
nullptr, 0));
std::unique_ptr<httpclient::SimpleHttpResult> response;
_state.connection.lease([&](httpclient::SimpleHttpClient* client) {
response.reset(client->retryRequest(rest::RequestType::GET, url, nullptr, 0));
});

if (replutils::hasFailed(response.get())) {
if (!_state.isChildSyncer) {
Expand Down
2 changes: 1 addition & 1 deletion arangod/Replication/GlobalInitialSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GlobalInitialSyncer final : public InitialSyncer {
arangodb::Result run(bool incremental) override;

/// @brief fetch the server's inventory, public method
Result inventory(arangodb::velocypack::Builder& builder);
Result getInventory(arangodb::velocypack::Builder& builder);

private:

Expand Down
Loading
0