10000 Actually use the given format to execute a query by mchacki · Pull Request #10484 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Actually use the given format to execute a query #10484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "EngineInfoContainerDBServerServerBased.h"

#include "Aql/AqlItemBlockSerializationFormat.h"
#include "Aql/Ast.h"
#include "Aql/Collection.h"
#include "Aql/ExecutionNode.h"
Expand Down Expand Up @@ -78,8 +79,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi
: _node(node), _hasShard(false) {
auto const& edges = _node->edgeColls();
TRI_ASSERT(!edges.empty());
std::unordered_set<std::string> const& restrictToShards =
query.queryOptions().shardIds;
std::unordered_set<std::string> const& restrictToShards = query.queryOptions().shardIds;
// Extract the local shards for edge collections.
for (auto const& col : edges) {
_edgeCollections.emplace_back(
Expand Down Expand Up @@ -197,7 +197,7 @@ void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* g
auto const& vCols = graphNode->vertexColls();
if (vCols.empty()) {
std::map<std::string, Collection*> const* allCollections =
_query.collections()->collections();
_query.collections()->collections();
auto& resolver = _query.resolver();
for (auto const& it : *allCollections) {
// If resolver cannot resolve this collection
Expand Down Expand Up @@ -292,11 +292,11 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
// Build Lookup Infos
VPackBuilder infoBuilder;
transaction::Methods* trx = _query.trx();

network::RequestOptions options;
options.database = _query.vocbase().name();
options.timeout = network::Timeout(SETUP_TIMEOUT);
options.skipScheduler = true; // hack to speed up future.get()
options.skipScheduler = true; // hack to speed up future.get()
options.param("ttl", std::to_string(_query.queryOptions().ttl));

for (auto const& server : dbServers) {
Expand All @@ -323,6 +323,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
TRI_ASSERT(didCreateEngine.size() == _graphNodes.size());
TRI_ASSERT(infoBuilder.isOpenObject());

infoBuilder.add(StaticStrings::SerializationFormat,
VPackValue(static_cast<int>(aql::SerializationFormat::SHADOWROWS)));
infoBuilder.close(); // Base object
TRI_ASSERT(infoBuilder.isClosed());

Expand Down Expand Up @@ -468,8 +470,8 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
network::RequestOptions options;
options.database = dbname;
options.timeout = network::Timeout(10.0); // Picked arbitrarily
options.skipScheduler = true; // hack to speed up future.get()
options.skipScheduler = true; // hack to speed up future.get()

// Shutdown query snippets
std::string url("/_api/aql/shutdown/");
VPackBuffer<uint8_t> body;
Expand All @@ -485,7 +487,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
for (auto const& shardId : serToSnippets.second) {
// fire and forget
network::sendRequest(pool, server, fuerte::RestVerb::Put, url + shardId,
/*copy*/body, options);
/*copy*/ body, options);
}
_query.incHttpRequests(serToSnippets.second.size());
}
Expand All @@ -500,8 +502,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
for (auto const& engine : *allEngines) {
// fire and forget
network::sendRequest(pool, engine.first, fuerte::RestVerb::Delete,
url + basics::StringUtils::itoa(engine.second),
noBody, options);
url + basics::StringUtils::itoa(engine.second), noBody, options);
}
_query.incHttpRequests(allEngines->size());
}
Expand Down
38 changes: 21 additions & 17 deletions arangod/Aql/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
#include "Basics/ScopeGuard.h"
#include "Cluster/ServerState.h"
#include "Futures/Utilities.h"
#include "Logger/Logger.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
Expand Down Expand Up @@ -478,7 +478,7 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
_dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL,
_query.vocbase().name(), queryIds);
});

std::unordered_map<size_t, size_t> nodeAliases;
ExecutionEngineResult res = _dbserverParts.buildEngines(queryIds, nodeAliases);
if (res.fail()) {
Expand All @@ -489,8 +489,8 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
// however every engine gets injected the list of locked shards.
std::vector<uint64_t> coordinatorQueryIds{};
res = _coordinatorParts.buildEngines(_query, registry, _query.vocbase().name(),
_query.queryOptions().shardIds, queryIds,
coordinatorQueryIds);
_query.queryOptions().shardIds,
queryIds, coordinatorQueryIds);

if (res.ok()) {
TRI_ASSERT(_query.engine() != nullptr);
Expand All @@ -506,14 +506,16 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {

void ExecutionEngine::kill() {
// kill coordinator parts
// TODO: this doesn't seem to be necessary and sometimes even show adverse effects
// so leaving this deactivated for now
// auto queryRegistry = QueryRegistryFeature::registry();
// if (queryRegistry != nullptr) {
// for (auto const& id : _coordinatorQueryIds) {
// queryRegistry->kill(&(_query.vocbase()), id);
// }
// }
// TODO: this doesn't seem to be necessary and sometimes even show adverse
// effects so leaving this deactivated for now
/*
auto queryRegistry = QueryRegistryFeature::registry();
if (queryRegistry != nullptr) {
for (auto const& id : _coordinatorQueryIds) {
queryRegistry->kill(&(_query.vocbase()), id);
}
}
*/

// kill DB server parts
// RemoteNodeId -> DBServerId -> [snippetId]
Expand All @@ -525,7 +527,7 @@ void ExecutionEngine::kill() {

VPackBuffer<uint8_t> body;
std::vector<network::FutureRes> futures;

for (auto const& it : _dbServerMapping) {
for (auto const& it2 : it.second) {
for (auto const& snippetId : it2.second) {
Expand All @@ -536,7 +538,7 @@ void ExecutionEngine::kill() {
}
}
}

if (!futures.empty()) {
// killing is best-effort
// we are ignoring all errors intentionally here
Expand Down Expand Up @@ -654,7 +656,8 @@ std::pair<ExecutionState, Result> ExecutionEngine::shutdown(int errorCode) {
/// @brief create an execution engine from a plan
ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegistry,
Query& query, ExecutionPlan& plan,
bool planRegisters) {
bool planRegisters,
SerializationFormat format) {
auto role = arangodb::ServerState::instance()->getRole();

plan.findVarUsage();
Expand Down Expand Up @@ -688,7 +691,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist
TRI_ASSERT(root != nullptr);
} else {
// instantiate the engine on a local server
engine.reset(new ExecutionEngine(query, SerializationFormat::SHADOWROWS));
engine.reset(new ExecutionEngine(query, format));

SingleServerQueryInstanciator inst(*engine);
plan.root()->walk(inst);
Expand All @@ -709,7 +712,8 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist

bool const returnInheritedResults = !arangodb::ServerState::isDBServer(role);
if (returnInheritedResults) {
auto returnNode = dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
auto returnNode =
dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
TRI_ASSERT(returnNode != nullptr);
engine->resultRegister(returnNode->getOutputRegisterId());
} else {
Expand Down
18 changes: 10 additions & 8 deletions arangod/Aql/ExecutionEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class ExecutionEngine {

public:
// @brief create an execution engine from a plan
static ExecutionEngine* instantiateFromPlan(QueryRegistry&, Query&, ExecutionPlan&, bool);
static ExecutionEngine* instantiateFromPlan(QueryRegistry& queryRegistry, Query& query,
ExecutionPlan& plan, bool planRegisters,
SerializationFormat format);

TEST_VIRTUAL Result createBlocks(std::vector<ExecutionNode*> const& nodes,
std::unordered_set<std::string> const& restrictToShards,
Expand All @@ -72,14 +74,14 @@ class ExecutionEngine {

/// @brief get the query
TEST_VIRTUAL Query* getQuery() const;

/// @brief server to snippet mapping
void snippetMapping(MapRemoteToSnippet&& dbServerMapping,
std::vector<uint64_t>&& coordinatorQueryIds) {
_dbServerMapping = std::move(dbServerMapping);
_coordinatorQueryIds = std::move(coordinatorQueryIds);
std::vector<uint64_t>&& coordinatorQueryIds) {
_dbServerMapping = std::move(dbServerMapping);
_coordinatorQueryIds = std::move(coordinatorQueryIds);
}

/// @brief kill the query
void kill();

Expand Down Expand Up @@ -142,10 +144,10 @@ class ExecutionEngine {

/// @brief whether or not shutdown() was executed
bool _wasShutdown;

/// @brief server to snippet mapping
MapRemoteToSnippet _dbServerMapping;

/// @brief ids of all coordinator query snippets
std::vector<uint64_t> _coordinatorQueryIds;
};
Expand Down
14 changes: 7 additions & 7 deletions arangod/Aql/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ Query* Query::clone(QueryPart part, bool withPlan) {
}

bool Query::killed() const {
if(_queryOptions.timeout > std::numeric_limits<double>::epsilon()) {
if(TRI_microtime() > (_startTime + _queryOptions.timeout)) {
if (_queryOptions.timeout > std::numeric_limits<double>::epsilon()) {
if (TRI_microtime() > (_startTime + _queryOptions.timeout)) {
return true;
}
}
Expand Down Expand Up @@ -451,7 +451,7 @@ void Query::prepare(QueryRegistry* registry, SerializationFormat format) {
// this is confusing and should be fixed!
std::unique_ptr<ExecutionEngine> engine(
ExecutionEngine::instantiateFromPlan(*registry, *this, *plan,
!_queryString.empty()));
!_queryString.empty(), format));

if (_engine == nullptr) {
_engine = std::move(engine);
Expand Down Expand Up @@ -655,7 +655,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
_resultBuilder->openArray();
_executionPhase = ExecutionPhase::EXECUTE;
}
[[fallthrough]];
[[fallthrough]];
case ExecutionPhase::EXECUTE: {
TRI_ASSERT(_resultBuilder != nullptr);
TRI_ASSERT(_resultBuilder->isOpenArray());
Expand Down Expand Up @@ -734,7 +734,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
_executionPhase = ExecutionPhase::FINALIZE;
}

[[fallthrough]];
[[fallthrough]];
case ExecutionPhase::FINALIZE: {
// will set warnings, stats, profile and cleanup plan and engine
return finalize(queryResult);
Expand Down Expand Up @@ -784,7 +784,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& 10000 amp; queryResult)
QueryResult Query::executeSync(QueryRegistry* registry) {
std::shared_ptr<SharedQueryState> ss = sharedState();
ss->resetWakeupHandler();

QueryResult queryResult;
while (true) {
auto state = execute(registry, queryResult);
Expand Down Expand Up @@ -872,7 +872,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
auto builder = std::make_shared<VPackBuilder>(&options);

try {
ss->resetWakeupHandler();

Expand Down
33 changes: 14 additions & 19 deletions arangod/Aql/RestAqlHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ void RestAqlHandler::setupClusterQuery() {
// If we have a new format then it has to be included here.
// If not default to classic (old coordinator will not send it)
SerializationFormat format = static_cast<SerializationFormat>(
VelocyPackHelper::getNumericValue<int>(querySlice, "serializationFormat",
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::SerializationFormat,
static_cast<int>(SerializationFormat::CLASSIC)));

// Now we need to create shared_ptr<VPackBuilder>
// That contains the old-style cluster snippet in order
// to prepare create a Query object.
Expand Down Expand Up @@ -315,7 +314,6 @@ bool RestAqlHandler::registerSnippets(
}

try {

if (needToLock) {
// Directly try to lock only the first snippet is required to be locked.
// For all others locking is pointless
Expand All @@ -340,7 +338,7 @@ bool RestAqlHandler::registerSnippets(
// No need to cleanup...
}

QueryId qId = query->id(); // not true in general
QueryId qId = query->id(); // not true in general
TRI_ASSERT(qId > 0);
_queryRegistry->insert(qId, query.get(), ttl, true, false);
query.release();
Expand Down Expand Up @@ -443,17 +441,16 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, std::string co
return RestStatus::DONE;
}

if (!_query) { // the PUT verb
if (!_query) { // the PUT verb
TRI_ASSERT(this->state() == RestHandler::HandlerState::EXECUTE);

_query = findQuery(idString);
if (!_query) {
return RestStatus::DONE;
}
std::shared_ptr<SharedQueryState> ss = _query->sharedState();
ss->setWakeupHandler([self = shared_from_this()] {
return self->wakeupHandler();
});
ss->setWakeupHandler(
[self = shared_from_this()] { return self->wakeupHandler(); });
}

TRI_ASSERT(_qId > 0);
Expand Down Expand Up @@ -545,7 +542,7 @@ RestStatus RestAqlHandler::execute() {
}
break;
}

default: {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/aql");
Expand Down Expand Up @@ -628,7 +625,6 @@ Query* RestAqlHandler::findQuery(std::string const& idString) {
// handle for useQuery
RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
VPackSlice const querySlice) {

std::string const& shardId = _request->header("shard-id");

// upon first usage, the "initializeCursor" method must be called
Expand Down Expand Up @@ -656,7 +652,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,

VPackBuffer<uint8_t> answerBuffer;
VPackBuilder answerBuilder(answerBuffer);
answerBuilder.openObject(/*unindexed*/true);
answerBuilder.openObject(/*unindexed*/ true);

if (operation == "getSome") {
TRI_IF_FAILURE("RestAqlHandler::getSome") {
Expand Down Expand Up @@ -744,9 +740,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
answerBuilder.add(StaticStrings::Error, VPackValue(res.fail()));
answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber()));
} else if (operation == "shutdown") {
int errorCode =
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code, TRI_ERROR_INTERNAL);
int errorCode = VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code,
TRI_ERROR_INTERNAL);

ExecutionState state;
Result res;
std::tie(state, res) = _query->engine()->shutdown(errorCode);
Expand Down Expand Up @@ -775,10 +771,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return RestStatus::DONE;
}

answerBuilder.close();
generateResult(rest::ResponseCode::OK, std::move(answerBuffer),
transactionContext);

generateResult(rest::ResponseCode::OK, std::move(answerBuffer), transactionContext);

return RestStatus::DONE;
}
Loading
0