8000 Actually use the given format to execute a query by mchacki · Pull Request #10484 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Actually use the given format to execute a query #10484

New issue 8000

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Prev Previous commit
Send the serialization format of AQL from Coordinators.
  • Loading branch information
mchacki committed Nov 20, 2019
commit 2b4b2d1f4039a0bfefdc2f6ae32951a97cbf46d9
21 changes: 11 additions & 10 deletions arangod/Aql/EngineInfoContainerDBServerServerBased.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "EngineInfoContainerDBServerServerBased.h"

#include "Aql/AqlItemBlockSerializationFormat.h"
#include "Aql/Ast.h"
#include "Aql/Collection.h"
#include "Aql/ExecutionNode.h"
Expand Down Expand Up @@ -78,8 +79,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi
: _node(node), _hasShard(false) {
auto const& edges = _node->edgeColls();
TRI_ASSERT(!edges.empty());
std::unordered_set<std::string> const& restrictToShards =
query.queryOptions().shardIds;
std::unordered_set<std::string> const& restrictToShards = query.queryOptions().shardIds;
// Extract the local shards for edge collections.
for (auto const& col : edges) {
_edgeCollections.emplace_back(
Expand Down Expand Up @@ -197,7 +197,7 @@ void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* g
auto const& vCols = graphNode->vertexColls();
if (vCols.empty()) {
std::map<std::string, Collection*> const* allCollections =
_query.collections()->collections();
_query.collections()->collections();
auto& resolver = _query.resolver();
for (auto const& it : *allCollections) {
// If resolver cannot resolve this collection
Expand Down Expand Up @@ -292,11 +292,11 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
// Build Lookup Infos
VPackBuilder infoBuilder;
transaction::Methods* trx = _query.trx();

network::RequestOptions options;
options.database = _query.vocbase().name();
options.timeout = network::Timeout(SETUP_TIMEOUT);
options.skipScheduler = true; // hack to speed up future.get()
options.skipScheduler = true; // hack to speed up future.get()
options.param("ttl", std::to_string(_query.queryOptions().ttl));

for (auto const& server : dbServers) {
Expand All @@ -323,6 +323,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
TRI_ASSERT(didCreateEngine.size() == _graphNodes.size());
TRI_ASSERT(infoBuilder.isOpenObject());

infoBuilder.add(StaticStrings::SerializationFormat,
VPackValue(static_cast<int>(aql::SerializationFormat::SHADOWROWS)));
infoBuilder.close(); // Base object
TRI_ASSERT(infoBuilder.isClosed());

Expand Down Expand Up @@ -468,8 +470,8 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
network::RequestOptions options;
options.database = dbname;
options.timeout = network::Timeout(10.0); // Picked arbitrarily
options.skipScheduler = true; // hack to speed up future.get()
options.skipScheduler = true; // hack to speed up future.get()

// Shutdown query snippets
std::string url("/_api/aql/shutdown/");
VPackBuffer<uint8_t> body;
Expand All @@ -485,7 +487,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
for (auto const& shardId : serToSnippets.second) {
// fire and forget
network::sendRequest(pool, server, fuerte::RestVerb::Put, url + shardId,
/*copy*/body, options);
/*copy*/ body, options);
}
_query.incHttpRequests(serToSnippets.second.size());
}
Expand All @@ -500,8 +502,7 @@ void EngineInfoContainerDBServerServerBased::cleanupEngines(
for (auto const& engine : *allEngines) {
// fire and forget
network::sendRequest(pool, engine.first, fuerte::RestVerb::Delete,
url + basics::StringUtils::itoa(engine.second),
noBody, options);
url + basics::StringUtils::itoa(engine.second), noBody, options);
}
_query.incHttpRequests(allEngines->size());
}
Expand Down
33 changes: 14 additions & 19 deletions arangod/Aql/RestAqlHandler.cpp
8000
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ void RestAqlHandler::setupClusterQuery() {
// If we have a new format then it has to be included here.
// If not default to classic (old coordinator will not send it)
SerializationFormat format = static_cast<SerializationFormat>(
VelocyPackHelper::getNumericValue<int>(querySlice, "serializationFormat",
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::SerializationFormat,
static_cast<int>(SerializationFormat::CLASSIC)));

// Now we need to create shared_ptr<VPackBuilder>
// That contains the old-style cluster snippet in order
// to prepare create a Query object.
Expand Down Expand Up @@ -315,7 +314,6 @@ bool RestAqlHandler::registerSnippets(
}

try {

if (needToLock) {
// Directly try to lock only the first snippet is required to be locked.
// For all others locking is pointless
Expand All @@ -340,7 +338,7 @@ bool RestAqlHandler::registerSnippets(
// No need to cleanup...
}

QueryId qId = query->id(); // not true in general
QueryId qId = query->id(); // not true in general
TRI_ASSERT(qId > 0);
_queryRegistry->insert(qId, query.get(), ttl, true, false);
query.release();
Expand Down Expand Up @@ -443,17 +441,16 @@ RestStatus RestAqlHandler::useQuery(std::string const& operation, std::string co
return RestStatus::DONE;
}

if (!_query) { // the PUT verb
if (!_query) { // the PUT verb
TRI_ASSERT(this->state() == RestHandler::HandlerState::EXECUTE);

_query = findQuery(idString);
if (!_query) {
return RestStatus::DONE;
}
std::shared_ptr<SharedQueryState> ss = _query->sharedState();
ss->setWakeupHandler([self = shared_from_this()] {
return self->wakeupHandler();
});
ss->setWakeupHandler(
[self = shared_from_this()] { return self->wakeupHandler(); });
}

TRI_ASSERT(_qId > 0);
Expand Down Expand Up @@ -545,7 +542,7 @@ RestStatus RestAqlHandler::execute() {
}
break;
}

default: {
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
TRI_ERROR_NOT_IMPLEMENTED, "illegal method for /_api/aql");
Expand Down Expand Up @@ -628,7 +625,6 @@ Query* RestAqlHandler::findQuery(std::string const& idString) {
// handle for useQuery
RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
VPackSlice const querySlice) {

std::string const& shardId = _request->header("shard-id");

// upon first usage, the "initializeCursor" method must be called
Expand Down Expand Up @@ -656,7 +652,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,

VPackBuffer<uint8_t> answerBuffer;
VPackBuilder answerBuilder(answerBuffer);
answerBuilder.openObject(/*unindexed*/true);
answerBuilder.openObject(/*unindexed*/ true);

if (operation == "getSome") {
TRI_IF_FAILURE("RestAqlHandler::getSome") {
Expand Down Expand Up @@ -744,9 +740,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
answerBuilder.add(StaticStrings::Error, VPackValue(res.fail()));
answerBuilder.add(StaticStrings::Code, VPackValue(res.errorNumber()));
} else if (operation == "shutdown") {
int errorCode =
VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code, TRI_ERROR_INTERNAL);
int errorCode = VelocyPackHelper::getNumericValue<int>(querySlice, StaticStrings::Code,
TRI_ERROR_INTERNAL);

ExecutionState state;
Result res;
std::tie(state, res) = _query->engine()->shutdown(errorCode);
Expand Down Expand Up @@ -775,10 +771,9 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation,
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return RestStatus::DONE;
}

answerBuilder.close();
generateResult(rest::ResponseCode::OK, std::move(answerBuffer),
transactionContext);

generateResult(rest::ResponseCode::OK, std::move(answerBuffer), transactionContext);

return RestStatus::DONE;
}
7 changes: 4 additions & 3 deletions lib/Basics/StaticStrings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ std::string const StaticStrings::MimeTypeDump(
std::string const StaticStrings::MimeTypeHtml("text/html; charset=utf-8");
std::string const StaticStrings::MimeTypeJson(
"application/json; charset=utf-8");
std::string const StaticStrings::MimeTypeJsonNoEncoding(
"application/json");
std::string const StaticStrings::MimeTypeJsonNoEncoding("application/json");
std::string const StaticStrings::MimeTypeText("text/plain; charset=utf-8");
std::string const StaticStrings::MimeTypeVPack("application/x-velocypack");
std::string const StaticStrings::MultiPartContentType("multipart/form-data");
Expand Down Expand Up @@ -278,4 +277,6 @@ std::string const StaticStrings::Old("old");
std::string const StaticStrings::UpgradeEnvName(
"ARANGODB_UPGRADE_DURING_RESTORE");
std::string const StaticStrings::BackupToDeleteName("DIRECTORY_TO_DELETE");
std::string const StaticStrings::BackupSearchToDeleteName("DIRECTORY_TO_DELETE_SEARCH");
std::string const StaticStrings::BackupSearchToDeleteName(
"DIRECTORY_TO_DELETE_SEARCH");
std::string const StaticStrings::SerializationFormat("serializationFormat");
1 change: 1 addition & 0 deletions lib/Basics/StaticStrings.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class StaticStrings {
static std::string const UpgradeEnvName;
static std::string const BackupToDeleteName;
static std::string const BackupSearchToDeleteName;
static std::string const SerializationFormat;
};
} // namespace arangodb

Expand Down
0