8000 Actually use the given format to execute a query (#10484) · arangodb/arangodb@3090e49 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3090e49

Browse files
authored
Actually use the given format to execute a query (#10484)
* Actually use the given format to execute a query * Send the serialization format of AQL from Coordinators.
1 parent 688a680 commit 3090e49

7 files changed

+68
-64
lines changed

arangod/Aql/EngineInfoContainerDBServerServerBased.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "EngineInfoContainerDBServerServerBased.h"
2525

26+
#include "Aql/AqlItemBlockSerializationFormat.h"
2627
#include "Aql/Ast.h"
2728
#include "Aql/Collection.h"
2829
#include "Aql/ExecutionNode.h"
@@ -78,8 +79,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi
7879
: _node(node), _hasShard(false) {
7980
auto const& edges = _node->edgeColls();
8081
TRI_ASSERT(!edges.empty());
81-
std::unordered_set<std::string> const& restrictToShards =
82-
query.queryOptions().shardIds;
82+
std::unordered_set<std::string> const& restrictToShards = query.queryOptions().shardIds;
8383
// Extract the local shards for edge collections.
8484
for (auto const& col : edges) {
8585
_edgeCollections.emplace_back(
@@ -197,7 +197,7 @@ void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* g
197197
auto const& vCols = graphNode->vertexColls();
198198
if (vCols.empty()) {
199199
std::map<std::string, Collection*> const* allCollections =
200-
_query.collections()->collections();
200+
_query.collections()->collections();
201201
auto& resolver = _query.resolver();
202202
for (auto const& it : *allCollections) {
203203
// If resolver cannot resolve this collection
@@ -292,11 +292,11 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
292292
// Build Lookup Infos
293293
VPackBuilder infoBuilder;
294294
transaction::Methods* trx = _query.trx();
295-
295+
296296
network::RequestOptions options;
297297
options.database = _query.vocbase().name();
298298
options.timeout = network::Timeout(SETUP_TIMEOUT);
299-
options.skipScheduler = true; // hack to speed up future.get()
299+
options.skipScheduler = true; // hack to speed up future.get()
300300
options.param("ttl", std::to_string(_query.queryOptions().ttl));
301301

302302
for (auto const& server : dbServers) {
@@ -323,6 +323,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
323323
TRI_ASSERT(didCreateEngine.size() == _graphNodes.size());
324324
TRI_ASSERT(infoBuilder.isOpenObject());
325325

326+
infoBuilder.add(StaticStrings::SerializationFormat,
327+
VPackValue(static_c 6D4E ast<int>(aql::SerializationFormat::SHADOWROWS)));
326328
infoBuilder.close(); // Base object
327329
TRI_ASSERT(infoBuilder.isClosed());
328330

@@ -468,8 +470,8 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
468470
network::RequestOptions options;
469471
options.database = dbname;
470472
options.timeout = network::Timeout(10.0); // Picked arbitrarily
471-
options.skipScheduler = true; // hack to speed up future.get()
472-
473+
options.skipScheduler = true; // hack to speed up future.get()
474+
473475
// Shutdown query snippets
474476
std::string url("/_api/aql/shutdown/");
475477
VPackBuffer<uint8_t> body;
@@ -485,7 +487,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
485487
for (auto const& shardId : serToSnippets.second) {
486488
// fire and forget
487489
network::sendRequest(pool, server, fuerte::RestVerb::Put, url + shardId,
488-
/*copy< 9E7A span class="pl-c">*/body, options);
490+
/*copy*/ body, options);
489491
}
490492
_query.incHttpRequests(serToSnippets.second.size());
491493
}
@@ -500,8 +502,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
500502
for (auto const& engine : *allEngines) {
501503
// fire and forget
502504
network::sendRequest(pool, engine.first, fuerte::RestVerb::Delete,
503-
url + basics::StringUtils::itoa(engine.second),
504-
noBody, options);
505+
url + basics::StringUtils::itoa(engine.second), noBody, options);
505506
}
506507
_query.incHttpRequests(allEngines->size());
507508
}

arangod/Aql/ExecutionEngine.cpp

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
#include "Basics/ScopeGuard.h"
4242
#include "Cluster/ServerState.h"
4343
#include "Futures/Utilities.h"
44-
#include "Logger/Logger.h"
4544
#include "Logger/LogMacros.h"
45+
#include "Logger/Logger.h"
4646
#include "Network/Methods.h"
4747
#include "Network/NetworkFeature.h"
484 F438 8
#include "Network/Utils.h"
@@ -478,7 +478,7 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
478478
_dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL,
479479
_query.vocbase().name(), queryIds);
480480
});
481-
481+
482482
std::unordered_map<size_t, size_t> nodeAliases;
483483
ExecutionEngineResult res = _dbserverParts.buildEngines(queryIds, nodeAliases);
484484
if (res.fail()) {
@@ -489,8 +489,8 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
489489
// however every engine gets injected the list of locked shards.
490490
std::vector<uint64_t> coordinatorQueryIds{};
491491
res = _coordinatorParts.buildEngines(_query, registry, _query.vocbase().name(),
492-
_query.queryOptions().shardIds, queryIds,
493-
coordinatorQueryIds);
492+
_query.queryOptions().shardIds,
493+
queryIds, coordinatorQueryIds);
494494

495495
if (res.ok()) {
496496
TRI_ASSERT(_query.engine() != nullptr);
@@ -506,14 +506,16 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
506506

507507
void ExecutionEngine::kill() {
508508
// kill coordinator parts
509-
// TODO: this doesn't seem to be necessary and sometimes even show adverse effects
510-
// so leaving this deactivated for now
511-
// auto queryRegistry = QueryRegistryFeature::registry();
512-
// if (queryRegistry != nullptr) {
513-
// for (auto const& id : _coordinatorQueryIds) {
514-
// queryRegistry->kill(&(_query.vocbase()), id);
515-
// }
516-
// }
509+
// TODO: this doesn't seem to be necessary and sometimes even show adverse
510+
// effects so leaving this deactivated for now
511+
/*
512+
auto queryRegistry = QueryRegistryFeature::registry();
513+
if (queryRegistry != nullptr) {
514+
for (auto const& id : _coordinatorQueryIds) {
515+
queryRegistry->kill(&(_query.vocbase()), id);
516+
}
517+
}
518+
*/
517519

518520
// kill DB server parts
519521
// RemoteNodeId -> DBServerId -> [snippetId]
@@ -525,7 +527,7 @@ void ExecutionEngine::kill() {
525527

526528
VPackBuffer<uint8_t> body;
527529
std::vector<network::FutureRes> futures;
528-
530+
529531
for (auto const& it : _dbServerMapping) {
530532
for (auto const& it2 : it.second) {
531533
for (auto const& snippetId : it2.second) {
@@ -536,7 +538,7 @@ void ExecutionEngine::kill() {
536538
}
537539
}
538540
}
539-
541+
540542
if (!futures.empty()) {
541543
// killing is best-effort
542544
// we are ignoring all errors intentionally here
@@ -654,7 +656,8 @@ std::pair<ExecutionState, Result> ExecutionEngine::shutdown(int errorCode) {
654656
/// @brief create an execution engine from a plan
655657
ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegistry,
656658
Query& query, ExecutionPlan& plan,
657-
bool planRegisters) {
659+
bool planRegisters,
660+
SerializationFormat format) {
658661
auto role = arangodb::ServerState::instance()->getRole();
659662

660663
plan.findVarUsage();
@@ -688,7 +691,7 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist
688691
TRI_ASSERT(root != nullptr);
689692
} else {
690693
// instantiate the engine on a local server
691-
engine.reset(new ExecutionEngine(query, SerializationFormat::SHADOWROWS));
694+
engine.reset(new ExecutionEngine(query, format));
692695

693696
SingleServerQueryInstanciator inst(*engine);
694697
plan.root()->walk(inst);
@@ -709,7 +712,8 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist
709712

710713
bool const returnInheritedResults = !arangodb::ServerState::isDBServer(role);
711714
if (returnInheritedResults) {
712-
auto returnNode = dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
715+
auto returnNode =
716+
dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
713717
TRI_ASSERT(returnNode != nullptr);
714718
engine->resultRegister(returnNode->getOutputRegisterId());
715719
} else {

arangod/Aql/ExecutionEngine.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ class ExecutionEngine {
5858

5959
public:
6060
// @brief create an execution engine from a plan
61-
static ExecutionEngine* instantiateFromPlan(QueryRegistry&, Query&, ExecutionPlan&, bool);
61+
static ExecutionEngine* instantiateFromPlan(QueryRegistry& queryRegistry, Query& query,
62+
ExecutionPlan& plan, bool planRegisters,
63+
SerializationFormat format);
6264

6365
TEST_VIRTUAL Result createBlocks(std::vector<ExecutionNode*> const& nodes,
6466
std::unordered_set<std::string> const& restrictToShards,
@@ -72,14 +74,14 @@ class ExecutionEngine {
7274

7375
/// @brief get the query
7476
TEST_VIRTUAL Query* getQuery() const;
75-
77+
7678
/// @brief server to snippet mapping
7779
void snippetMapping(MapRemoteToSnippet&& dbServerMapping,
78-
std::vector<uint64_t>&& coordinatorQueryIds) {
79-
_dbServerMapping = std::move(dbServerMapping);
80-
_coordinatorQueryIds = std::move(coordinatorQueryIds);
80+
std::vector<uint64_t>&& coordinatorQueryIds) {
81+
_dbServerMapping = std::move(dbServerMapping);
82+
_coordinatorQueryIds = std::move(coordinatorQueryIds);
8183
}
82-
84+
8385
/// @brief kill the query
8486
void kill();
8587

@@ -142,10 +144,10 @@ class ExecutionEngine {
142144

143145
/// @brief whether or not shutdown() was executed
144146
bool _wasShutdown;
145-
147+
146148
/// @brief server to snippet mapping
147149
MapRemoteToSnippet _dbServerMapping;
148-
150+
149151
/// @brief ids of all coordinator query snippets
150152
std::vector<uint64_t> _coordinatorQueryIds;
151153
};

arangod/Aql/Query.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,8 @@ Query* Query::clone(QueryPart part, bool withPlan) {
276276
}
277277

278278
bool Query::killed() const {
279-
if(_queryOptions.timeout > std::numeric_limits<double>::epsilon()) {
280-
if(TRI_microtime() > (_startTime + _queryOptions.timeout)) {
279+
if (_queryOptions.timeout > std::numeric_limits<double>::epsilon()) {
280+
if (TRI_microtime() > (_startTime + _queryOptions.timeout)) {
281281
return true;
282282
}
283283
}
@@ -451,7 +451,7 @@ void Query::prepare(QueryRegistry* registry, SerializationFormat format) {
451451
// this is confusing and should be fixed!
452452
std::unique_ptr<ExecutionEngine> engine(
453453
ExecutionEngine::instantiateFromPlan(*registry, *this, *plan,
454-
!_queryString.empty()));
454+
!_queryString.empty(), format));
455455

456456
if (_engine == nullptr) {
457457
_engine = std::move(engine);
@@ -655,7 +655,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
655655
_resultBuilder->openArray();
656656
_executionPhase = ExecutionPhase::EXECUTE;
657657
}
658-
[[fallthrough]];
658+
[[fallthrough]];
659659
case ExecutionPhase::EXECUTE: {
660660
TRI_ASSERT(_resultBuilder != nullptr);
661661
TRI_ASSERT(_resultBuilder->isOpenArray());
@@ -734,7 +734,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
734734
_executionPhase = ExecutionPhase::FINALIZE;
735735
}
736736

737-
[[fallthrough]];
737+
[[fallthrough]];
738738
case ExecutionPhase::FINALIZE: {
739739
// will set warnings, stats, profile and cleanup plan and engine
740740
return finalize(queryResult);
@@ -784,7 +784,7 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
784784
QueryResult Query::executeSync(QueryRegistry* registry) {
785785
std::shared_ptr<SharedQueryState> ss = sharedState();
786786
ss->resetWakeupHandler();
787-
787+
788788
QueryResult queryResult;
789789
while (true) {
790790
auto state = execute(registry, queryResult);
@@ -872,7 +872,7 @@ QueryResultV8 Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry) {
872872
options.buildUnindexedArrays = true;
873873
options.buildUnindexedObjects = true;
874874
auto builder = std::make_shared<VPackBuilder>(&options);
875-
875+
876876
try {
877877
ss->resetWakeupHandler();
878878

arangod/Aql/RestAqlHandler.cpp

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,8 @@ void RestAqlHandler::setupClusterQuery() {
189189
// If we have a new format then it has to be included here.
190190
// If not default to classic (old coordinator will not send it)
191191
SerializationFormat format = static_cast<SerializationFormat>(
192-
VelocyPackHelper::getNumericValue<int>(querySlice, "serializationFormat",
192+
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::SerializationFormat,
193193
static_cast<int>(SerializationFormat::CLASSIC)));
194-
195194
// Now we need to create shared_ptr<VPackBuilder>
196195
// That contains the old-style cluster snippet in order
197196
// to prepare create a Query object.
@@ -315,7 +314,6 @@ bool RestAqlHandler::registerSnippets(
315314
}
316315

317316
try {
318-
319317
if (needToLock) {
320318
// Directly try to lock only the first snippet is required to be locked.
321319
// For all others locking is pointless
@@ -340,7 +338,7 @@ bool RestAqlHandler::registerSnippets(
340338
// No need to cleanup...
341339
}
342340

343-
QueryId qId = query->id(); // not true in general
341+
QueryId qId = query->id(); // not true in general
344342
TRI_ASSERT(qId > 0);
345343
_queryRegistry->insert(qId, query.get(), ttl, true, false);
346344
query.release();
@@ -443,17 +441,16 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, std::string co
443441
return RestStatus::DONE;
444442
}
445443

446-
if (!_query) { // the PUT verb
444+
if (!_query) { // the PUT verb
447445
TRI_ASSERT(this->state() == RestHandler::HandlerState::EXECUTE);
448-
446+
449447
_query = findQuery(idString);
450448
if (!_query) {
451449
return RestStatus::DONE;
452450
}
453451
std::shared_ptr<SharedQueryState> ss = _query->sharedState();
454-
ss->setWakeupHandler([self = shared_from_this()] {
455-
return self->wakeupHandler();
456-
});
452+
ss->setWakeupHandler(
453+
[self = shared_from_this()] { return self->wakeupHandler(); });
457454
}
458455

459456
TRI_ASSERT(_qId > 0);
@@ -545,7 +542,7 @@ RestStatus RestAqlHandler::execute() {
545542
}
546543
break;
547544
}
548-
545+
549546
default: {
550547
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
551548
TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/aql");
@@ -640,7 +637,6 @@ Query* RestAqlHandler::findQuery(std::string const& idString) {
640637
// handle for useQuery
641638
RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
642639
VPackSlice const querySlice) {
643-
644640
std::string const& shardId = _request->header("shard-id");
645641

646642
// upon first usage, the "initializeCursor" method must be called
@@ -668,7 +664,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
668664

669665
VPackBuffer<uint8_t> answerBuffer;
670666
VPackBuilder answerBuilder(answerBuffer);
671-
answerBuilder.openObject(/*unindexed*/true);
667+
answerBuilder.openObject(/*unindexed*/ true);
672668

673669
if (operation == "getSome") {
674670
TRI_IF_FAILURE("RestAqlHandler::getSome") {
@@ -756,9 +752,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
756752
answerBuilder.add(StaticStrings::Error, VPackValue(res.fail()));
757753
answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber()));
758754
} else if (operation == "shutdown") {
759-
int errorCode =
760-
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code, TRI_ERROR_INTERNAL);
761-
755+
int errorCode = VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code,
756+
TRI_ERROR_INTERNAL);
757+
762758
ExecutionState state;
763759
Result res;
764760
std::tie(state, res) = _query->engine()->shutdown(errorCode);
@@ -787,10 +783,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
787783
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
788784
return RestStatus::DONE;
789785
}
790-
786+
791787
answerBuilder.close();
792-
generateResult(rest::ResponseCode::OK, std::move(answerBuffer),
793-
transactionContext);
794-
788+
generateResult(rest::ResponseCode::OK, std::move(answerBuffer), transactionContext);
789+
795790
return RestStatus::DONE;
796791
}

0 commit comments

Comments
 (0)
0