8000 Convert many uses of ClusterComm to Fuerte by dhly-etc · Pull Request #10154 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Convert many uses of ClusterComm to Fuerte #101 8000 54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9aff2a4
Remove ClusterComm from v8-actions.cpp.
Sep 25, 2019
14d2cd0
Remove ClusterComm from GeneralServer.
Sep 25, 2019
0566cb2
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Sep 30, 2019
0f652d1
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Sep 30, 2019
ec0c1e4
Remove header.
Sep 30, 2019
52f50d9
Remove ClusterComm from Sharding.
Sep 30, 2019
0798b8b
Remove ClusterComm from RestHandler.
Sep 30, 2019
43c2c57
Remove ClusterComm from Graph.
Sep 30, 2019
f9dc2e7
Remove ClusterComm from IResearch.
Sep 30, 2019
3b38d4c
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Oct 1, 2019
8bc65ce
Fix replication trampolining and request forwarding in general.
Oct 2, 2019
c80e7ee
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Oct 2, 2019
e2acc9b
Remove ClusterComm from Transaction.
Oct 2, 2019
e765a25
Remove ClusterComm from Aql.
Oct 2, 2019
ab9b1ee
Remove ClusterComm from several files in Cluster.
Oct 2, 2019
9e1b0e4
Remove ClusterComm from SynchronizeShard.
Oct 2, 2019
ac6fafe
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Oct 2, 2019
4a9c798
Fix wrong status code.
Oct 3, 2019
0788605
Fix handling of server IDs.
Oct 3, 2019
9930d56
Revert logging change.
Oct 3, 2019
cbd8e8e
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Oct 3, 2019
e499939
Minor adjustment.
Oct 3, 2019
511b06f
Add waitForAny futures utility.
Oct 4, 2019
6e4522e
Address some review comments.
Oct 9, 2019
2b122f0
Address review comment.
Oct 9, 2019
1928630
Address more review comments.
Oct 9, 2019
c1179d9
Remove less-than-general method.
Oct 9, 2019
d7420a0
Merge branch 'devel' into feature/convert-clustercomm-use-to-network-…
Oct 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion 3rdParty/fuerte/include/fuerte/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct RequestHeader final : public MessageHeader {
// accept header accessors
ContentType acceptType() const;
void acceptType(ContentType type);
void acceptType(std::string const& type);

// query parameter helpers
void addParameter(std::string const& key, std::string const& value);
Expand Down Expand Up @@ -208,7 +209,7 @@ class Request final : public Message {
};

// Response contains the message resulting from a request to a server.
class Response final : public Message {
class Response : public Message {
public:
Response(ResponseHeader&& reqHeader = ResponseHeader())
: header(std::move(reqHeader)), _payloadOffset(0) {}
Expand Down
4 changes: 4 additions & 0 deletions 3rdParty/fuerte/src/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ ContentType RequestHeader::acceptType() const { return _acceptType; }

void RequestHeader::acceptType(ContentType type) { _acceptType = type; }

void RequestHeader::acceptType(std::string const& type) {
_acceptType = to_ContentType(type);
}

void RequestHeader::addParameter(std::string const& key,
std::string const& value) {
parameters.emplace(key, value);
Expand Down
2 changes: 0 additions & 2 deletions arangod/Aql/BlocksWithClients.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ namespace transaction {
class Methods;
}

struct ClusterCommResult;

namespace aql {
class AqlItemBlock;
struct Collection;
Expand Down
9 changes: 6 additions & 3 deletions arangod/Aql/ClusterNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
/// @author Max Neunhoeffer
////////////////////////////////////////////////////////////////////////////////

#include <type_traits>

#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>

#include "ClusterNodes.h"

#include "Aql/AqlValue.h"
Expand All @@ -45,11 +50,9 @@
#include "Aql/types.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ServerState.h"

#include "Logger/LogMacros.h"
#include "Transaction/Methods.h"

#include <type_traits>

using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::aql;
Expand Down
2 changes: 1 addition & 1 deletion arangod/Aql/Collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#ifndef ARANGOD_AQL_COLLECTION_H
#define ARANGOD_AQL_COLLECTION_H 1

#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterTypes.h"
#include "VocBase/AccessMode.h"
#include "VocBase/vocbase.h"

Expand Down
84 changes: 49 additions & 35 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
#include "Aql/GraphNode.h"
#include "Aql/Query.h"
#include "Aql/QuerySnippet.h"
#include "Cluster/ClusterComm.h"
#include "Basics/StringUtils.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterTrxMethods.h"
#include "Graph/BaseOptions.h"
#include "Logger/LogMacros.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "StorageEngine/TransactionState.h"
#include "Utils/CollectionNameResolver.h"

Expand Down Expand Up @@ -66,10 +70,6 @@ Result ExtractRemoteAndShard(VPackSlice keySlice, size_t& remoteId, std::string&
return {TRI_ERROR_NO_ERROR};
}

struct NoopCb final : public arangodb::ClusterCommCallback {
bool operator()(ClusterCommResult*) override { return true; }
};

} // namespace

EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngineShardLists(
Expand Down Expand Up @@ -278,8 +278,9 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
// Otherwise the locking needs to be empty.
TRI_ASSERT(!_closedSnippets.empty() || !_graphNodes.empty());

auto cc = ClusterComm::instance();
if (cc == nullptr) {
NetworkFeature const& nf = _query.vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
if (pool == nullptr) {
// nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN};
}
Expand All @@ -290,13 +291,15 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
"/_db/" + arangodb::basics::StringUtils::urlEncode(_query.vocbase().name()) +
"/_api/aql/setup?ttl=" + std::to_string(ttl));

auto cleanupGuard = scopeGuard([this, &cc, &queryIds]() {
cleanupEngines(cc, TRI_ERROR_INTERNAL, _query.vocbase().name(), queryIds);
auto cleanupGuard = scopeGuard([this, pool, &queryIds]() {
cleanupEngines(pool, TRI_ERROR_INTERNAL, _query.vocbase().name(), queryIds);
});

// Build Lookup Infos
VPackBuilder infoBuilder;
transaction::Methods* trx = _query.trx();
network::RequestOptions options;
options.timeout = network::Timeout(SETUP_TIMEOUT);

for (auto const& server : dbServers) {
std::string const serverDest = "server:" + server;
Expand Down Expand Up @@ -344,22 +347,28 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
!infoSlice.get("snippets").isEmptyObject()) ||
infoSlice.hasKey("traverserEngines"));

VPackBuffer<uint8_t> buffer(infoSlice.byteSize());
buffer.append(infoSlice.begin(), infoSlice.byteSize());

// add the transaction ID header
std::unordered_map<std::string, std::string> headers;
network::Headers headers;
ClusterTrxMethods::addAQLTransactionHeader(*trx, server, headers);
CoordTransactionID coordTransactionID = TRI_NewTickServer();
auto res = cc->syncRequest(coordTransactionID, serverDest, RequestType::POST,
url, infoSlice.toJson(), headers, SETUP_TIMEOUT);
auto res = network::sendRequest(pool, serverDest, fuerte::RestVerb::Post,
url, std::move(buffer), headers, options)
.get();
_query.incHttpRequests(1);
if (res->getErrorCode() != TRI_ERROR_NO_ERROR) {
LOG_TOPIC("f9a77", DEBUG, Logger::AQL)
<< server << " responded with " << res->getErrorCode() << " -> "
<< res->stringifyErrorMessage();
if (res.fail() || !res.response) {
int code = network::fuerteToArangoErrorCode(res);
LOG_TOPIC("f9a77", DEBUG, Logger::AQL) << server << " responded with " << code
<< " -> " << TRI_errno_string(code);
LOG_TOPIC("41082", TRACE, Logger::AQL) << infoSlice.toJson();
return {res->getErrorCode(), res->stringifyErrorMessage()};
return {code};
}
auto slices = res.response->slices();
if (slices.empty()) {
return {TRI_ERROR_INTERNAL, "malformed response while building engines"};
}
std::shared_ptr<VPackBuilder> builder = res->result->getBodyVelocyPack();
VPackSlice response = builder->slice();
VPackSlice response = slices[0];
auto result = parseResponse(response, queryIds, server, serverDest, didCreateEngine);
if (!result.ok()) {
return result;
Expand Down Expand Up @@ -447,48 +456,53 @@ Result EngineInfoContainerDBServerServerBased::parseResponse(
* they may be leftovers from Coordinator.
* Will also clear the list of queryIds after return.
*
* @param cc The ClusterComm
* @param pool The ConnectionPool
* @param errorCode error Code to be send to DBServers for logging.
* @param dbname Name of the database this query is executed in.
* @param queryIds A map of QueryIds of the format: (remoteNodeId:shardId)
* -> queryid.
*/
void EngineInfoContainerDBServerServerBased::cleanupEngines(
std::shared_ptr<ClusterComm> cc, int errorCode, std::string const& dbname,
network::ConnectionPool* pool, int errorCode, std::string const& dbname,
MapRemoteToSnippet& queryIds) const {
network::RequestOptions options;
options.timeout = network::Timeout(10.0); // Picked arbitrarily
network::Headers headers;

// Shutdown query snippets
std::string url("/_db/" + arangodb::basics::StringUtils::urlEncode(dbname) +
"/_api/aql/shutdown/");
std::vector<ClusterCommRequest> requests;
auto body = std::make_shared<std::string>(
"{\"code\":" + std::to_string(errorCode) + "}");
VPackBuffer<uint8_t> body;
VPackBuilder builder(body);
builder.openObject();
builder.add("code", VPackValue(std::to_string(errorCode)));
builder.close();
for (auto const& it : queryIds) {
// it.first == RemoteNodeId, we don't need this
// it.second server -> [snippets]
for (auto const& serToSnippets : it.second) {
auto server = serToSnippets.first;
for (auto const& shardId : serToSnippets.second) {
requests.emplace_back(server, rest::RequestType::PUT, url + shardId, body);
// fire and forget
network::sendRequest(pool, server, fuerte::RestVerb::Put, url + shardId,
body, headers, options);
}
_query.incHttpRequests(serToSnippets.second.size());
}
}

// Shutdown traverser engines
url = "/_db/" + arangodb::basics::StringUtils::urlEncode(dbname) +
"/_internal/traverser/";
std::unordered_map<std::string, std::string> headers;
std::shared_ptr<std::string> noBody;

CoordTransactionID coordinatorTransactionID = TRI_NewTickServer();
auto cb = std::make_shared<::NoopCb>();
VPackBuffer<uint8_t> noBody;

constexpr double shortTimeout = 10.0; // Picked arbitrarily
for (auto const& gn : _graphNodes) {
auto allEngines = gn->engines();
for (auto const& engine : *allEngines) {
cc->asyncRequest(coordinatorTransactionID, engine.first, rest::RequestType::DELETE_REQ,
url + basics::StringUtils::itoa(engine.second), noBody,
headers, cb, shortTimeout, false, 2.0);
// fire and forget
network::sendRequestRetry(pool, engine.first, fuerte::RestVerb::Delete,
url + basics::StringUtils::itoa(engine.second),
noBody, headers, options);
}
_query.incHttpRequests(allEngines->size());
}
Expand Down
8 changes: 5 additions & 3 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
#include <stack>

namespace arangodb {
class ClusterComm;
namespace network {
class ConnectionPool;
}

namespace velocypack {
class Builder;
Expand Down Expand Up @@ -133,13 +135,13 @@ class EngineInfoContainerDBServerServerBased {
* they may be leftovers from Coordinator.
* Will also clear the list of queryIds after return.
*
* @param cc The ClusterComm
* @param pool The ConnectionPool
* @param errorCode error Code to be send to DBServers for logging.
* @param dbname Name of the database this query is executed in.
* @param queryIds A map of QueryIds of the format: (remoteNodeId:shardId)
* -> queryid.
*/
void cleanupEngines(std::shared_ptr<ClusterComm> cc, int errorCode,
void cleanupEngines(network::ConnectionPool* pool, int errorCode,
std::string const& dbname, MapRemoteToSnippet& queryIds) const;

// Insert a GraphNode that needs to generate TraverserEngines on
Expand Down
2 changes: 0 additions & 2 deletions arangod/Aql/ExecutionBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
#include <vector>

namespace arangodb {
struct ClusterCommResult;

namespace transaction {
class Methods;
}
Expand Down
8 changes: 5 additions & 3 deletions arangod/Aql/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
#include "Aql/ReturnExecutor.h"
#include "Aql/WalkerWorker.h"
#include "Basics/ScopeGuard.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "Network/NetworkFeature.h"

using namespace arangodb;
using namespace arangodb::aql;
Expand Down Expand Up @@ -467,8 +467,10 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
// QueryIds are filled by responses of DBServer parts.
MapRemoteToSnippet queryIds{};

auto cleanupGuard = scopeGuard([this, &queryIds]() {
_dbserverParts.cleanupEngines(ClusterComm::instance(), TRI_ERROR_INTERNAL,
NetworkFeature const& nf = _query.vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
auto cleanupGuard = scopeGuard([this, pool, &queryIds]() {
_dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL,
_query.vocbase().name(), queryIds);
});
std::unordered_map<size_t, size_t> nodeAliases;
Expand Down
1 change: 1 addition & 0 deletions arangod/Aql/ExecutionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "Aql/SubqueryStartExecutionNode.h"
#include "Aql/TraversalNode.h"
#include "Aql/WalkerWorker.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/system-compiler.h"
#include "Cluster/ServerState.h"
#include "Meta/static_assert_size.h"
Expand Down
5 changes: 3 additions & 2 deletions arangod/Aql/ShardLocking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////

#include <algorithm>

#include "ShardLocking.h"

#include "Aql/Collection.h"
Expand All @@ -30,8 +32,7 @@
#include "Aql/ModificationNodes.h"
#include "Aql/Query.h"
#include "Cluster/ClusterFeature.h"

#include <algorithm>
#include "Logger/LogMacros.h"

using namespace arangodb;
using namespace arangodb::aql;
Expand Down
1 change: 0 additions & 1 deletion arangod/Aql/SingleRemoteModificationExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "Aql/SingleRowFetcher.h"
#include "Basics/Common.h"
#include "Basics/StaticStrings.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "ModificationExecutorTraits.h"
Expand Down
4 changes: 2 additions & 2 deletions arangod/Cluster/ClusterMethods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,11 @@ int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) {
/// @brief creates a copy of all HTTP headers to forward
////////////////////////////////////////////////////////////////////////////////

std::unordered_map<std::string, std::string> getForwardableRequestHeaders(arangodb::GeneralRequest* request) {
network::Head D7F4 ers getForwardableRequestHeaders(arangodb::GeneralRequest* request) {
std::unordered_map<std::string, std::string> const& headers = request->headers();
std::unordered_map<std::string, std::string>::const_iterator it = headers.begin();

std::unordered_map<std::string, std::string> result;
network::Headers result;

while (it != headers.end()) {
std::string const& key = (*it).first;
Expand Down
3 changes: 2 additions & 1 deletion arangod/Cluster/ClusterMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/TraverserEngineRegistry.h"
#include "Futures/Future.h"
#include "Network/types.h"
#include "Rest/CommonDefines.h"
#include "Rest/GeneralResponse.h"
#include "Utils/OperationResult.h"
Expand Down Expand Up @@ -57,7 +58,7 @@ int handleGeneralCommErrors(arangodb::ClusterCommResult const* res);
/// @brief creates a copy of all HTTP headers to forward
////////////////////////////////////////////////////////////////////////////////

std::unordered_map<std::string, std::string> getForwardableRequestHeaders(GeneralRequest*);
network::Headers getForwardableRequestHeaders(GeneralRequest*);

////////////////////////////////////////////////////////////////////////////////
/// @brief check if a list of attributes have the same values in two vpack
Expand Down
Loading
0