diff --git a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp index a64039a74f41..33158ceec7c9 100644 --- a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp +++ b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp @@ -36,8 +36,8 @@ #include "StorageEngine/TransactionState.h" #include "Utils/CollectionNameResolver.h" -#include #include +#include using namespace arangodb; using namespace arangodb::aql; @@ -80,10 +80,13 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi auto const& edges = _node->edgeColls(); TRI_ASSERT(!edges.empty()); auto const& restrictToShards = query.queryOptions().restrictToShards; +#ifdef USE_ENTERPRISE + transaction::Methods trx{query.newTrxContext()}; +#endif // Extract the local shards for edge collections. for (auto const& col : edges) { #ifdef USE_ENTERPRISE - if (query.trxForOptimization().isInaccessibleCollection(col->id())) { + if (trx.isInaccessibleCollection(col->id())) { _inaccessible.insert(col->name()); _inaccessible.insert(std::to_string(col->id().id())); } @@ -100,7 +103,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi // Or if we guarantee to never read vertex data. for (auto const& col : vertices) { #ifdef USE_ENTERPRISE - if (query.trxForOptimization().isInaccessibleCollection(col->id())) { + if (trx.isInaccessibleCollection(col->id())) { _inaccessible.insert(col->name()); _inaccessible.insert(std::to_string(col->id().id())); } @@ -112,8 +115,8 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi } std::vector EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::getAllLocalShards( - std::unordered_map const& shardMapping, - ServerID const& server, std::shared_ptr> shardIds, bool colIsSatellite) { + std::unordered_map const& shardMapping, ServerID const& server, + std::shared_ptr> shardIds, bool colIsSatellite) { std::vector localShards; for (auto const& shard : *shardIds) { auto const& it = shardMapping.find(shard); @@ -255,8 +258,7 @@ void EngineInfoContainerDBServerServerBased::closeSnippet(QueryId inputSnippet) } std::vector EngineInfoContainerDBServerServerBased::buildEngineInfo( - QueryId clusterQueryId, - VPackBuilder& infoBuilder, ServerID const& server, + QueryId clusterQueryId, VPackBuilder& infoBuilder, ServerID const& server, std::unordered_map const& nodesById, std::map& nodeAliases) { LOG_TOPIC("4bbe6", DEBUG, arangodb::Logger::AQL) @@ -322,11 +324,11 @@ arangodb::futures::Future EngineInfoContainerDBServerServerBased::buildS auto buildCallback = [this, server, serverDest, didCreateEngine = std::move(didCreateEngine), - &serverToQueryId, &serverToQueryIdLock, &snippetIds, globalId]( - arangodb::futures::Try const& response) -> Result { + &serverToQueryId, &serverToQueryIdLock, &snippetIds, + globalId](arangodb::futures::Try const& response) -> Result { auto const& resolvedResponse = response.get(); auto queryId = globalId; - + std::unique_lock guard{serverToQueryIdLock}; if (resolvedResponse.fail()) { @@ -334,7 +336,7 @@ arangodb::futures::Future EngineInfoContainerDBServerServerBased::buildS LOG_TOPIC("f9a77", DEBUG, Logger::AQL) << server << " responded with " << res.errorNumber() << ": " << res.errorMessage(); - + serverToQueryId.emplace_back(serverDest, globalId); return res; } @@ -351,8 +353,8 @@ arangodb::futures::Future EngineInfoContainerDBServerServerBased::buildS }; return network::sendRequestRetry(pool, serverDest, fuerte::RestVerb::Post, - "/_api/aql/setup", std::move(buffer), options, - std::move(headers)) + "/_api/aql/setup", std::move(buffer), + options, std::move(headers)) .then([buildCallback = std::move(buildCallback)](futures::Try&& resp) mutable { return buildCallback(resp); }); @@ -373,7 +375,8 @@ bool EngineInfoContainerDBServerServerBased::isNotSatelliteLeader(VPackSlice inf return true; } - TRI_ASSERT((infoSlice.get("snippets").isObject() && !infoSlice.get("snippets").isEmptyObject()) || + TRI_ASSERT((infoSlice.get("snippets").isObject() && + !infoSlice.get("snippets").isEmptyObject()) || infoSlice.hasKey("traverserEngines")); return false; @@ -406,7 +409,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( TRI_ASSERT(!_closedSnippets.empty() || !_graphNodes.empty()); ErrorCode cleanupReason = TRI_ERROR_CLUSTER_TIMEOUT; - + auto cleanupGuard = scopeGuard([this, &serverToQueryId, &cleanupReason]() { // Fire and forget std::ignore = cleanupEngines(cleanupReason, _query.vocbase().name(), serverToQueryId); @@ -427,19 +430,21 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( options.timeout = network::Timeout(SETUP_TIMEOUT); options.skipScheduler = true; // hack to speed up future.get() options.param("ttl", std::to_string(_query.queryOptions().ttl)); - + TRI_IF_FAILURE("Query::setupTimeout") { - options.timeout = network::Timeout(0.01 + (double) RandomGenerator::interval(uint32_t(10))); + options.timeout = + network::Timeout(0.01 + (double)RandomGenerator::interval(uint32_t(10))); } - + TRI_IF_FAILURE("Query::setupTimeoutFailSequence") { options.timeout = network::Timeout(0.5); } - + /// cluster global query id, under which the query will be registered /// on DB servers from 3.8 onwards. - QueryId clusterQueryId = _query.vocbase().server().getFeature().clusterInfo().uniqid(); - + QueryId clusterQueryId = + _query.vocbase().server().getFeature().clusterInfo().uniqid(); + // decreases lock timeout manually for fast path auto oldLockTimeout = _query.getLockTimeout(); _query.setLockTimeout(FAST_PATH_LOCK_TIMEOUT); @@ -449,7 +454,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( for (ServerID const& server : dbServers) { // Build Lookup Infos VPackBuilder infoBuilder; - auto didCreateEngine = buildEngineInfo(clusterQueryId, infoBuilder, server, nodesById, nodeAliases); + auto didCreateEngine = + buildEngineInfo(clusterQueryId, infoBuilder, server, nodesById, nodeAliases); VPackSlice infoSlice = infoBuilder.slice(); if (isNotSatelliteLeader(infoSlice)) { @@ -468,7 +474,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( .thenValue([](std::vector>&& responses) -> Result { // We can directly report a non TRI_ERROR_LOCK_TIMEOUT // error as we need to abort after. - // Otherwise we need to report + // Otherwise we need to report Result res{TRI_ERROR_NO_ERROR}; for (auto const& tryRes : responses) { auto response = tryRes.get(); @@ -498,16 +504,18 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( { // in case of fast path failure, we need to cleanup engines - auto requests = cleanupEngines(fastPathResult.get().errorNumber(), _query.vocbase().name(), serverToQueryId); + auto requests = cleanupEngines(fastPathResult.get().errorNumber(), + _query.vocbase().name(), serverToQueryId); // Wait for all requests to complete. // So we know that all Transactions are aborted. // We do NOT care for the actual result. futures::collectAll(requests).wait(); snippetIds.clear(); } - + // we must generate a new query id, because the fast path setup has failed - clusterQueryId = _query.vocbase().server().getFeature().clusterInfo().uniqid(); + clusterQueryId = + _query.vocbase().server().getFeature().clusterInfo().uniqid(); // set back to default lock timeout for slow path fallback _query.setLockTimeout(oldLockTimeout); @@ -521,7 +529,8 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( std::sort(engineInformation.begin(), engineInformation.end(), [](auto const& lhs, auto const& rhs) { // Entry <0> is the Server - return TransactionState::ServerIdLessThan(std::get<0>(lhs), std::get<0>(rhs)); + return TransactionState::ServerIdLessThan(std::get<0>(lhs), + std::get<0>(rhs)); }); #ifdef ARANGODB_ENABLE_MAINTAINER_MODE // Make sure we always maintain the correct ordering of servers @@ -547,7 +556,9 @@ Result EngineInfoContainerDBServerServerBased::buildEngines( overwrittenOptions.add("clusterQueryId", VPackValue(clusterQueryId)); addOptionsPart(overwrittenOptions, server); overwrittenOptions.close(); - auto newRequest = arangodb::velocypack::Collection::merge(infoSlice, overwrittenOptions.slice(), false); + auto newRequest = + arangodb::velocypack::Collection::merge(infoSlice, + overwrittenOptions.slice(), false); auto request = buildSetupRequest(trx, std::move(server), newRequest.slice(), std::move(didCreateEngine), snippetIds, serverToQueryId, @@ -680,7 +691,7 @@ std::vector EngineInfoContainerDBServerServerBased options.database = dbname; options.timeout = network::Timeout(10.0); // Picked arbitrarily options.skipScheduler = true; // hack to speed up future.get() - + // Shutdown query snippets std::string url("/_api/aql/finish/"); VPackBuffer body; @@ -691,8 +702,8 @@ std::vector EngineInfoContainerDBServerServerBased requests.reserve(serverQueryIds.size()); for (auto const& [server, queryId] : serverQueryIds) { requests.emplace_back(network::sendRequestRetry(pool, server, fuerte::RestVerb::Delete, - url + std::to_string(queryId), - /*copy*/ body, options)); + url + std::to_string(queryId), + /*copy*/ body, options)); } _query.incHttpRequests(static_cast(serverQueryIds.size())); @@ -703,8 +714,9 @@ std::vector EngineInfoContainerDBServerServerBased for (auto& gn : _graphNodes) { auto allEngines = gn->engines(); for (auto const& engine : *allEngines) { - requests.emplace_back(network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Delete, - url + basics::StringUtils::itoa(engine.second), noBody, options)); + requests.emplace_back(network::sendRequestRetry( + pool, "server:" + engine.first, fuerte::RestVerb::Delete, + url + basics::StringUtils::itoa(engine.second), noBody, options)); } _query.incHttpRequests(static_cast(allEngines->size())); gn->clearEngines(); diff --git a/arangod/Aql/EngineInfoContainerDBServerServerBased.h b/arangod/Aql/EngineInfoContainerDBServerServerBased.h index 93cc23a4ca2f..83739592330f 100644 --- a/arangod/Aql/EngineInfoContainerDBServerServerBased.h +++ b/arangod/Aql/EngineInfoContainerDBServerServerBased.h @@ -40,7 +40,7 @@ namespace network { class ConnectionPool; struct RequestOptions; struct Response; -} +} // namespace network namespace velocypack { class Builder; @@ -56,6 +56,9 @@ class QuerySnippet; class EngineInfoContainerDBServerServerBased { private: + // TODO Temporary. Do not check in!! + // We need to access the TraverserEngineShardList nothing else. + public: // @brief Local struct to create the // information required to build traverser engines // on DB servers. @@ -123,7 +126,6 @@ class EngineInfoContainerDBServerServerBased { // the given queryid of the coordinator as data provider. void closeSnippet(QueryId inputSnippet); - // Build the Engines for the DBServer // * Creates one Query-Entry for each Snippet per Shard (multiple on the // same DB) @@ -140,28 +142,28 @@ class EngineInfoContainerDBServerServerBased { MapRemoteToSnippet& snippetIds, aql::ServerQueryIdList& serverQueryIds, std::map& nodeAliases); - // Insert a GraphNode that needs to generate TraverserEngines on // the DBServers. The GraphNode itself will retain on the coordinator. void addGraphNode(GraphNode* node, bool pushToSingleServer); private: /** - * @brief Helper method to generate the Request to be send to a specific database server. - * this request contains all the necessary information to create a transaction with correct shard - * locking, as well as QuerySnippets and GraphEngines on the receiver side. - * - * @param clusterQueryId cluster-wide query id (used from 3.8 onwards) - * @param infoBuilder (mutable) the request body will be written into this builder. - * @param server The DatabaseServer we suppose to send the request to, used to identify the shards on this server - * @param nodesById A vector to get Nodes by their id. - * @param nodeAliases (mutable) A map of node-aliases, if a server is responsible for more then one shard we need to duplicate some nodes in the query (e.g. an IndexNode can only access one shard at a time) this list can map cloned node -> original node ids. - * - * @return A vector with one entry per GraphNode in the query (in order) it indicates if this Server has created a GraphEngine for this Node and needs to participate in the GraphOperation or not. - */ - std::vector buildEngineInfo(QueryId clusterQueryId, VPackBuilder& infoBuilder, ServerID const& server, - std::unordered_map const& nodesById, - std::map& nodeAliases); + * @brief Helper method to generate the Request to be send to a specific database server. + * this request contains all the necessary information to create a transaction with correct shard + * locking, as well as QuerySnippets and GraphEngines on the receiver side. + * + * @param clusterQueryId cluster-wide query id (used from 3.8 onwards) + * @param infoBuilder (mutable) the request body will be written into this builder. + * @param server The DatabaseServer we suppose to send the request to, used to identify the shards on this server + * @param nodesById A vector to get Nodes by their id. + * @param nodeAliases (mutable) A map of node-aliases, if a server is responsible for more then one shard we need to duplicate some nodes in the query (e.g. an IndexNode can only access one shard at a time) this list can map cloned node -> original node ids. + * + * @return A vector with one entry per GraphNode in the query (in order) it indicates if this Server has created a GraphEngine for this Node and needs to participate in the GraphOperation or not. + */ + std::vector buildEngineInfo( + QueryId clusterQueryId, VPackBuilder& infoBuilder, ServerID const& server, + std::unordered_map const& nodesById, + std::map& nodeAliases); arangodb::futures::Future buildSetupRequest( transaction::Methods& trx, ServerID const& server, VPackSlice infoSlice, @@ -171,7 +173,6 @@ class EngineInfoContainerDBServerServerBased { [[nodiscard]] bool isNotSatelliteLeader(VPackSlice infoSlice) const; - /** * @brief Will send a shutdown to all engines registered in the list of * queryIds. @@ -186,8 +187,8 @@ class EngineInfoContainerDBServerServerBased { * -> queryid. */ std::vector> cleanupEngines( - ErrorCode errorCode, std::string const& dbname, aql::ServerQueryIdList& serverQueryIds) const; - + ErrorCode errorCode, std::string const& dbname, + aql::ServerQueryIdList& serverQueryIds) const; // Insert the Locking information into the message to be send to DBServers void addLockingPart(arangodb::velocypack::Builder& builder, ServerID const& server) const; diff --git a/arangod/ClusterEngine/ClusterIndexFactory.cpp b/arangod/ClusterEngine/ClusterIndexFactory.cpp index cdf75062ccc6..bab36516d24c 100644 --- a/arangod/ClusterEngine/ClusterIndexFactory.cpp +++ b/arangod/ClusterEngine/ClusterIndexFactory.cpp @@ -52,8 +52,7 @@ struct DefaultIndexFactory : public arangodb::IndexTypeFactory { : IndexTypeFactory(server), _type(type) {} bool equal(arangodb::velocypack::Slice const& lhs, - arangodb::velocypack::Slice const& rhs, - std::string const& dbname) const override { + arangodb::velocypack::Slice const& rhs, std::string const& dbname) const override { auto& clusterEngine = _server.getFeature().engine(); auto* engine = clusterEngine.actualEngine(); @@ -76,15 +75,14 @@ struct DefaultIndexFactory : public arangodb::IndexTypeFactory { auto ct = clusterEngine.engineType(); return std::make_shared(id, collection, ct, - arangodb::Index::type(_type), - definition); + arangodb::Index::type(_type), definition); } - virtual arangodb::Result normalize( // normalize definition - arangodb::velocypack::Builder& normalized, // normalized definition (out-param) - arangodb::velocypack::Slice definition, // source definition - bool isCreation, // definition for index creation - TRI_vocbase_t const& vocbase // index vocbase + virtual arangodb::Result normalize( // normalize definition + arangodb::velocypack::Builder& normalized, // normalized definition (out-param) + arangodb::velocypack::Slice definition, // source definition + bool isCreation, // definition for index creation + TRI_vocbase_t const& vocbase // index vocbase ) const override { auto& clusterEngine = _server.getFeature().engine(); @@ -96,8 +94,8 @@ struct DefaultIndexFactory : public arangodb::IndexTypeFactory { "cannot find storage engine while normalizing index"); } - return engine->indexFactory().factory(_type).normalize( // normalize definition - normalized, definition, isCreation, vocbase // args + return engine->indexFactory().factory(_type).normalize( // normalize definition + normalized, definition, isCreation, vocbase // args ); } }; @@ -113,7 +111,8 @@ struct EdgeIndexFactory : public DefaultIndexFactory { bool isClusterConstructor) const override { if (!isClusterConstructor) { // this index type cannot be created directly - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "cannot create edge index"); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "cannot create edge index"); } auto& clusterEngine = @@ -137,7 +136,8 @@ struct PrimaryIndexFactory : public DefaultIndexFactory { bool isClusterConstructor) const override { if (!isClusterConstructor) { // this index type cannot be created directly - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "cannot create primary index"); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + "cannot create primary index"); } auto& clusterEngine = @@ -154,8 +154,8 @@ struct PrimaryIndexFactory : public DefaultIndexFactory { namespace arangodb { -ClusterIndexFactory::ClusterIndexFactory(application_features::ApplicationServer& server) - : IndexFactory(server) { +void ClusterIndexFactory::LinkIndexFactories(application_features::ApplicationServer& server, + IndexFactory& factory) { static const EdgeIndexFactory edgeIndexFactory(server, "edge"); static const DefaultIndexFactory fulltextIndexFactory(server, "fulltext"); static const DefaultIndexFactory geoIndexFactory(server, "geo"); @@ -167,16 +167,21 @@ ClusterIndexFactory::ClusterIndexFactory(application_features::ApplicationServer static const DefaultIndexFactory skiplistIndexFactory(server, "skiplist"); static const DefaultIndexFactory ttlIndexFactory(server, "ttl"); - emplace(edgeIndexFactory._type, edgeIndexFactory); - emplace(fulltextIndexFactory._type, fulltextIndexFactory); - emplace(geoIndexFactory._type, geoIndexFactory); - emplace(geo1IndexFactory._type, geo1IndexFactory); - emplace(geo2IndexFactory._type, geo2IndexFactory); - emplace(hashIndexFactory._type, hashIndexFactory); - emplace(persistentIndexFactory._type, persistentIndexFactory); - emplace(primaryIndexFactory._type, primaryIndexFactory); - emplace(skiplistIndexFactory._type, skiplistIndexFactory); - emplace(ttlIndexFactory._type, ttlIndexFactory); + factory.emplace(edgeIndexFactory._type, edgeIndexFactory); + factory.emplace(fulltextIndexFactory._type, fulltextIndexFactory); + factory.emplace(geoIndexFactory._type, geoIndexFactory); + factory.emplace(geo1IndexFactory._type, geo1IndexFactory); + factory.emplace(geo2IndexFactory._type, geo2IndexFactory); + factory.emplace(hashIndexFactory._type, hashIndexFactory); + factory.emplace(persistentIndexFactory._type, persistentIndexFactory); + factory.emplace(primaryIndexFactory._type, primaryIndexFactory); + factory.emplace(skiplistIndexFactory._type, skiplistIndexFactory); + factory.emplace(ttlIndexFactory._type, ttlIndexFactory); +} + +ClusterIndexFactory::ClusterIndexFactory(application_features::ApplicationServer& server) + : IndexFactory(server) { + LinkIndexFactories(server, *this); } /// @brief index name aliases (e.g. "persistent" => "hash", "skiplist" => @@ -191,11 +196,11 @@ std::unordered_map ClusterIndexFactory::indexAliases() return ae->indexFactory().indexAliases(); } -Result ClusterIndexFactory::enhanceIndexDefinition( // normalize definition - velocypack::Slice const definition, // source definition - velocypack::Builder& normalized, // normalized definition (out-param) - bool isCreation, // definition for index creation - TRI_vocbase_t const& vocbase // index vocbase +Result ClusterIndexFactory::enhanceIndexDefinition( // normalize definition + velocypack::Slice const definition, // source definition + velocypack::Builder& normalized, // normalized definition (out-param) + bool isCreation, // definition for index creation + TRI_vocbase_t const& vocbase // index vocbase ) const { auto& ce = _server.getFeature().engine(); @@ -205,8 +210,8 @@ Result ClusterIndexFactory::enhanceIndexDefinition( // normalize definition return TRI_ERROR_INTERNAL; } - return ae->indexFactory().enhanceIndexDefinition( // normalize definition - definition, normalized, isCreation, vocbase // args + return ae->indexFactory().enhanceIndexDefinition( // normalize definition + definition, normalized, isCreation, vocbase // args ); } @@ -299,8 +304,8 @@ void ClusterIndexFactory::prepareIndexes( indexes.emplace_back(std::move(idx)); } catch (std::exception const& ex) { LOG_TOPIC("7ed52", ERR, arangodb::Logger::ENGINES) - << "error creating index from definition '" << v.toString() << "': " << ex.what(); - + << "error creating index from definition '" << v.toString() + << "': " << ex.what(); } } } diff --git a/arangod/ClusterEngine/ClusterIndexFactory.h b/arangod/ClusterEngine/ClusterIndexFactory.h index 780cf1d7ee94..ee4f683c762f 100644 --- a/arangod/ClusterEngine/ClusterIndexFactory.h +++ b/arangod/ClusterEngine/ClusterIndexFactory.h @@ -29,18 +29,20 @@ namespace arangodb { class ClusterIndexFactory final : public IndexFactory { public: + static void LinkIndexFactories(application_features::ApplicationServer& server, + IndexFactory& factory); explicit ClusterIndexFactory(application_features::ApplicationServer&); ~ClusterIndexFactory() = default; - Result enhanceIndexDefinition( // normalize definition - velocypack::Slice const definition, // source definition - velocypack::Builder& normalized, // normalized definition (out-param) - bool isCreation, // definition for index creation - TRI_vocbase_t const& vocbase // index vocbase + Result enhanceIndexDefinition( // normalize definition + velocypack::Slice const definition, // source definition + velocypack::Builder& normalized, // normalized definition (out-param) + bool isCreation, // definition for index creation + TRI_vocbase_t const& vocbase // index vocbase ) const override; - /// @brief index name aliases (e.g. "persistent" => "hash", "skiplist" => "hash") - /// used to display storage engine capabilities + /// @brief index name aliases (e.g. "persistent" => "hash", "skiplist" => + /// "hash") used to display storage engine capabilities std::unordered_map indexAliases() const override; void fillSystemIndexes(arangodb::LogicalCollection& col, diff --git a/arangod/Graph/Enumerators/OneSidedEnumerator.cpp b/arangod/Graph/Enumerators/OneSidedEnumerator.cpp index 1cbc21ce5b5d..2b573453746f 100644 --- a/arangod/Graph/Enumerators/OneSidedEnumerator.cpp +++ b/arangod/Graph/Enumerators/OneSidedEnumerator.cpp @@ -269,3 +269,4 @@ template class ::arangodb::graph::OneSidedEnumerator>; template class ::arangodb::graph::OneSidedEnumerator>; template class ::arangodb::graph::OneSidedEnumerator>; + diff --git a/arangod/InternalRestHandler/InternalRestTraverserHandler.cpp b/arangod/InternalRestHandler/InternalRestTraverserHandler.cpp index 753cc29d9ea2..981f7b80fae3 100644 --- a/arangod/InternalRestHandler/InternalRestTraverserHandler.cpp +++ b/arangod/InternalRestHandler/InternalRestTraverserHandler.cpp @@ -32,6 +32,7 @@ #include "Rest/GeneralResponse.h" #include "Transaction/StandaloneContext.h" +#include #include #include @@ -85,8 +86,9 @@ RestStatus InternalRestTraverserHandler::execute() { } void InternalRestTraverserHandler::createEngine() { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_NOT_IMPLEMENTED, - "API traversal engine creation no longer supported"); + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_NOT_IMPLEMENTED, + "API traversal engine creation no longer supported"); } void InternalRestTraverserHandler::queryEngine() { @@ -117,7 +119,8 @@ void InternalRestTraverserHandler::queryEngine() { return; } - std::chrono::time_point start = std::chrono::steady_clock::now(); + std::chrono::time_point start = + std::chrono::steady_clock::now(); traverser::BaseEngine* engine = nullptr; while (true) { @@ -127,7 +130,8 @@ void InternalRestTraverserHandler::queryEngine() { break; } generateError(ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, - "invalid TraverserEngine id - potentially the AQL query was already aborted or timed out"); + "invalid TraverserEngine id - potentially the AQL query " + "was already aborted or timed out"); return; } catch (basics::Exception const& ex) { // it is possible that the engine is already in use @@ -147,18 +151,18 @@ void InternalRestTraverserHandler::queryEngine() { generateError(ResponseCode::SERVER_ERROR, TRI_ERROR_LOCK_TIMEOUT); return; } - } + } TRI_ASSERT(engine != nullptr); auto& registry = _registry; // For the guard - auto cleanup = scopeGuard([registry, &engineId]() { - registry->closeEngine(engineId); - }); + auto cleanup = + scopeGuard([registry, &engineId]() { registry->closeEngine(engineId); }); if (option == "lock") { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_NOT_IMPLEMENTED, - "API for traversal engine locking no longer supported"); + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_NOT_IMPLEMENTED, + "API for traversal engine locking no longer supported"); } VPackBuilder result; @@ -182,7 +186,7 @@ void InternalRestTraverserHandler::queryEngine() { // Safe cast BaseTraverserEngines are all of type TRAVERSER auto eng = static_cast(engine); TRI_ASSERT(eng != nullptr); - + VPackSlice variables = body.get("variables"); eng->injectVariables(variables); diff --git a/arangod/StorageEngine/EngineSelectorFeature.cpp b/arangod/StorageEngine/EngineSelectorFeature.cpp index ea0f4df8b923..77825a441b7a 100644 --- a/arangod/StorageEngine/EngineSelectorFeature.cpp +++ b/arangod/StorageEngine/EngineSelectorFeature.cpp @@ -55,10 +55,10 @@ std::unordered_map createEngineMap() { std::unordered_map map; // rocksdb is not deprecated and the engine of choice map.try_emplace(arangodb::RocksDBEngine::EngineName, - EngineInfo{ std::type_index(typeid(arangodb::RocksDBEngine)), false, true }); + EngineInfo{std::type_index(typeid(arangodb::RocksDBEngine)), false, true}); return map; } -} +} // namespace namespace arangodb { @@ -134,7 +134,8 @@ void EngineSelectorFeature::prepare() { if (selected == engines.end()) { if (_engineName == "mmfiles") { LOG_TOPIC("10eb6", FATAL, Logger::STARTUP) - << "the mmfiles storage engine is unavailable from version v3.7.0 onwards"; + << "the mmfiles storage engine is unavailable from version v3.7.0 " + "onwards"; } else { // should not happen LOG_TOPIC("3e975", FATAL, Logger::STARTUP) @@ -151,8 +152,7 @@ void EngineSelectorFeature::prepare() { "engine."; if (!ServerState::instance()->isCoordinator() && - !basics::FileUtils::isRegularFile(_engineFilePath) && - !_allowDeprecatedDeployments) { + !basics::FileUtils::isRegularFile(_engineFilePath) && !_allowDeprecatedDeployments) { LOG_TOPIC("ca0a7", FATAL, Logger::STARTUP) << "The " << _engineName << " storage engine cannot be used for new deployments."; @@ -173,10 +173,12 @@ void EngineSelectorFeature::prepare() { for (auto& engine : engines) { StorageEngine& e = server().getFeature(engine.second.type); // turn off all other storage engines - LOG_TOPIC("001b6", TRACE, Logger::STARTUP) << "disabling storage engine " << engine.first; + LOG_TOPIC("001b6", TRACE, Logger::STARTUP) + << "disabling storage engine " << engine.first; e.disable(); if (engine.first == _engineName) { - LOG_TOPIC("4a3fc", INFO, Logger::FIXME) << "using storage engine " << engine.first; + LOG_TOPIC("4a3fc", INFO, Logger::FIXME) + << "using storage engine " << engine.first; ce.setActualEngine(&e); } } @@ -239,8 +241,14 @@ void EngineSelectorFeature::unprepare() { _engine = nullptr; if (ServerState::instance()->isCoordinator()) { - ClusterEngine& ce = server().getFeature(); - ce.setActualEngine(nullptr); +#ifdef ARANGODB_USE_GOOGLE_TESTS + if (!arangodb::ClusterEngine::Mocking) { +#endif + ClusterEngine& ce = server().getFeature(); + ce.setActualEngine(nullptr); +#ifdef ARANGODB_USE_GOOGLE_TESTS + } +#endif } } diff --git a/tests/Mocks/Servers.cpp b/tests/Mocks/Servers.cpp index 4fc552aeb292..49e8165c3c06 100644 --- a/tests/Mocks/Servers.cpp +++ b/tests/Mocks/Servers.cpp @@ -181,11 +181,16 @@ static void SetupAqlPhase(MockServer& server) { #endif } -MockServer::MockServer() +MockServer::MockServer(arangodb::ServerState::RoleEnum myRole, bool injectClusterIndexes) : _server(std::make_shared("", "", "", nullptr), nullptr), - _engine(_server), + _engine(_server, injectClusterIndexes), _oldRebootId(0), _started(false) { + _oldRole = arangodb::ServerState::instance()->getRole(); + arangodb::ServerState::instance()->setRole(myRole); + if (arangodb::ServerState::instance()->isCoordinator()) { + arangodb::ClusterEngine::Mocking = true; + } init(); } @@ -193,6 +198,10 @@ MockServer::~MockServer() { stopFeatures(); _server.setStateUnsafe(_oldApplicationServerState); + if (arangodb::ServerState::instance()->isCoordinator()) { + arangodb::ClusterEngine::Mocking = false; + } + arangodb::ServerState::instance()->setRole(_oldRole); arangodb::ServerState::instance()->setRebootId(_oldRebootId); } @@ -454,10 +463,10 @@ std::pair, consensus::index_t> AgencyCache:: consensus::Store& AgencyCache::store() { return _readDB; } -MockClusterServer::MockClusterServer(bool useAgencyMockPool) - : MockServer(), _useAgencyMockPool(useAgencyMockPool) { - _oldRole = arangodb::ServerState::instance()->getRole(); - +MockClusterServer::MockClusterServer(bool useAgencyMockPool, + arangodb::ServerState::RoleEnum newRole, + bool injectClusterIndexes) + : MockServer(newRole, injectClusterIndexes), _useAgencyMockPool(useAgencyMockPool) { // Add features SetupAqlPhase(*this); @@ -478,7 +487,6 @@ MockClusterServer::~MockClusterServer() { ci.shutdownSyncers(); ci.waitForSyncersToStop(); _server.getFeature().shutdownAgencyCache(); - arangodb::ServerState::instance()->setRole(_oldRole); } void MockClusterServer::startFeatures() { @@ -518,6 +526,32 @@ void MockClusterServer::startFeatures() { _server.getFeature().clusterInfo().startSyncers(); } +std::unique_ptr MockClusterServer::createFakeQuery( + bool activateTracing, std::string queryString, + std::function callback) const { + auto bindParams = std::make_shared(); + bindParams->openObject(); + bindParams->close(); + VPackBuilder queryOptions; + queryOptions.openObject(); + if (activateTracing) { + queryOptions.add("profile", VPackValue(int(aql::ProfileLevel::TraceTwo))); + } + queryOptions.close(); + if (queryString.empty()) { + queryString = "RETURN 1"; + } + + aql::QueryString fakeQueryString(queryString); + auto query = std::make_unique( + arangodb::transaction::StandaloneContext::Create(getSystemDatabase()), + fakeQueryString, bindParams, queryOptions.slice()); + callback(*query); + query->prepareQuery(aql::SerializationFormat::SHADOWROWS); + + return query; +} + consensus::index_t MockClusterServer::agencyTrx(std::string const& key, std::string const& value) { // Build an agency transaction: @@ -710,8 +744,7 @@ std::shared_ptr MockClusterServer::createCollection( } MockDBServer::MockDBServer(bool start, bool useAgencyMock) - : MockClusterServer(useAgencyMock) { - arangodb::ServerState::instance()->setRole(arangodb::ServerState::RoleEnum::ROLE_DBSERVER); + : MockClusterServer(useAgencyMock, arangodb::ServerState::RoleEnum::ROLE_DBSERVER) { addFeature(false); // do not start the thread addFeature(false); // do not start the thread if (start) { @@ -824,9 +857,9 @@ void MockDBServer::createShard(std::string const& dbName, std::string shardName, } } -MockCoordinator::MockCoordinator(bool start, bool useAgencyMock) - : MockClusterServer(useAgencyMock) { - arangodb::ServerState::instance()->setRole(arangodb::ServerState::RoleEnum::ROLE_COORDINATOR); +MockCoordinator::MockCoordinator(bool start, bool useAgencyMock, bool injectClusterIndexes) + : MockClusterServer(useAgencyMock, arangodb::ServerState::RoleEnum::ROLE_COORDINATOR, + injectClusterIndexes) { if (start) { MockCoordinator::startFeatures(); MockCoordinator::createDatabase("_system"); diff --git a/tests/Mocks/Servers.h b/tests/Mocks/Servers.h index 2a8c2c94465b..97471b89627b 100644 --- a/tests/Mocks/Servers.h +++ b/tests/Mocks/Servers.h @@ -58,7 +58,13 @@ namespace mocks { /// @brief mock application server with no features added class MockServer { public: - MockServer(); + // Note, setting injectClusterIndexes causes the "create" methods to fail. + // this is all hardly worked around the Cluster engine and needs a proper + // clean up. It is highly recommended to not set injectClusterIndexes unless + // you want to specificly test something that selects an index, but cannot use + // it. Use with care for now. + MockServer(arangodb::ServerState::RoleEnum = arangodb::ServerState::RoleEnum::ROLE_SINGLE, + bool injectClusterIndexes = false); virtual ~MockServer(); application_features::ApplicationServer& server(); @@ -121,6 +127,7 @@ class MockServer { private: bool _started; + arangodb::ServerState::RoleEnum _oldRole; }; /// @brief a server with almost no features added (Metrics are available @@ -213,9 +220,14 @@ class MockClusterServer : public MockServer, DataSourceId const& planId, std::vector> shardNameToServerNamePairs); + std::unique_ptr createFakeQuery( + bool activateTracing = false, std::string queryString = "", + std::function runBeforePrepare = + [](arangodb::aql::Query&) {}) const; // You can only create specialized types protected: - MockClusterServer(bool useAgencyMockConnection); + MockClusterServer(bool useAgencyMockConnection, arangodb::ServerState::RoleEnum role, + bool injectClusterIndexes = false); ~MockClusterServer(); protected: @@ -232,8 +244,9 @@ class MockClusterServer : public MockServer, protected: std::unique_ptr _pool; + + private: bool _useAgencyMockPool; - arangodb::ServerState::RoleEnum _oldRole; int _dummy; }; @@ -251,7 +264,8 @@ class MockDBServer : public MockClusterServer { class MockCoordinator : public MockClusterServer { public: - MockCoordinator(bool startFeatures = true, bool useAgencyMockConnection = true); + MockCoordinator(bool startFeatures = true, bool useAgencyMockConnection = true, + bool injectClusterIndexes = false); ~MockCoordinator(); TRI_vocbase_t* createDatabase(std::string const& name) override; diff --git a/tests/Mocks/StorageEngineMock.cpp b/tests/Mocks/StorageEngineMock.cpp index e9f303be60b1..92b96fbcabd7 100644 --- a/tests/Mocks/StorageEngineMock.cpp +++ b/tests/Mocks/StorageEngineMock.cpp @@ -33,6 +33,7 @@ #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "ClusterEngine/ClusterEngine.h" +#include "ClusterEngine/ClusterIndexFactory.h" #include "IResearch/IResearchCommon.h" #include "IResearch/IResearchFeature.h" #include "IResearch/IResearchLinkCoordinator.h" @@ -479,8 +480,13 @@ class AllIteratorMock final : public arangodb::IndexIterator { }; // AllIteratorMock struct IndexFactoryMock : arangodb::IndexFactory { - IndexFactoryMock(arangodb::application_features::ApplicationServer& server) - : IndexFactory(server) {} + IndexFactoryMock(arangodb::application_features::ApplicationServer& server, bool injectClusterIndexes) + : IndexFactory(server) { + if (injectClusterIndexes) { + arangodb::ClusterIndexFactory::LinkIndexFactories(server, *this); + } + } + virtual void fillSystemIndexes(arangodb::LogicalCollection& col, std::vector>& systemIndexes) const override { // NOOP @@ -1301,6 +1307,24 @@ void PhysicalCollectionMock::prepareIndexes(arangodb::velocypack::Slice indexesS } } +arangodb::IndexEstMap PhysicalCollectionMock::clusterIndexEstimates(bool allowUpdating, + arangodb::TransactionId tid) { + TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator()); + arangodb::IndexEstMap estimates; + for (auto const& it : _indexes) { + std::string id = std::to_string(it->id().id()); + if (it->hasSelectivityEstimate()) { + // Note: This may actually be bad, as this instance cannot + // have documents => The estimate is off. + estimates.emplace(std::move(id), it->selectivityEstimate()); + } else { + // Random hardcoded estimate. We do not actually know anything + estimates.emplace(std::move(id), 0.25); + } + } + return estimates; +} + arangodb::Result PhysicalCollectionMock::read( arangodb::transaction::Methods*, arangodb::velocypack::StringRef const& key, arangodb::IndexIterator::DocumentCallback const& cb) const { @@ -1484,9 +1508,11 @@ std::function StorageEngineMock::recoveryTickCallback = []() -> void {}; /*static*/ std::string StorageEngineMock::versionFilenameResult; -StorageEngineMock::StorageEngineMock(arangodb::application_features::ApplicationServer& server) +StorageEngineMock::StorageEngineMock(arangodb::application_features::ApplicationServer& server, + bool injectClusterIndexes) : StorageEngine(server, "Mock", "", - std::unique_ptr(new IndexFactoryMock(server))), + std::unique_ptr( + new IndexFactoryMock(server, injectClusterIndexes))), vocbaseCount(1), _releasedTick(0) {} diff --git a/tests/Mocks/StorageEngineMock.h b/tests/Mocks/StorageEngineMock.h index eb8f8ed5608e..52f2971e2155 100644 --- a/tests/Mocks/StorageEngineMock.h +++ b/tests/Mocks/StorageEngineMock.h @@ -28,13 +28,13 @@ #include #include "Basics/Result.h" +#include "IResearchLinkMock.h" #include "Indexes/IndexIterator.h" #include "StorageEngine/HealthData.h" #include "StorageEngine/PhysicalCollection.h" #include "StorageEngine/StorageEngine.h" #include "StorageEngine/TransactionCollection.h" #include "StorageEngine/TransactionState.h" -#include "IResearchLinkMock.h" #include "VocBase/Identifiers/IndexId.h" #include "VocBase/Identifiers/LocalDocumentId.h" @@ -96,12 +96,16 @@ class PhysicalCollectionMock : public arangodb::PhysicalCollection { virtual uint64_t numberDocuments(arangodb::transaction::Methods* trx) const override; virtual std::string const& path() const override; virtual void prepareIndexes(arangodb::velocypack::Slice indexesSlice) override; + + arangodb::IndexEstMap clusterIndexEstimates(bool allowUpdating, + arangodb::TransactionId tid) override; + virtual arangodb::Result read(arangodb::transaction::Methods*, arangodb::velocypack::StringRef const& key, arangodb::IndexIterator::DocumentCallback const& cb) const override; virtual arangodb::Result read(arangodb::transaction::Methods* trx, - arangodb::LocalDocumentId const& token, - arangodb::IndexIterator::DocumentCallback const& cb) const override; + arangodb::LocalDocumentId const& token, + arangodb::IndexIterator::DocumentCallback const& cb) const override; virtual bool readDocument(arangodb::transaction::Methods* trx, arangodb::LocalDocumentId const& token, arangodb::ManagedDocumentResult& result) const override; @@ -184,7 +188,8 @@ class StorageEngineMock : public arangodb::StorageEngine { std::map, arangodb::velocypack::Builder> views; std::atomic vocbaseCount; - explicit StorageEngineMock(arangodb::application_features::ApplicationServer& server); + explicit StorageEngineMock(arangodb::application_features::ApplicationServer& server, + bool injectClusterIndexes = false); arangodb::HealthData healthCheck() override; virtual void addOptimizerRules(arangodb::aql::OptimizerRulesFeature& feature) override; virtual void addRestHandlers(arangodb::rest::RestHandlerFactory& handlerFactory) override; @@ -194,7 +199,8 @@ class StorageEngineMock : public arangodb::StorageEngine { bool doSync) override; virtual arangodb::Result changeView(TRI_vocbase_t& vocbase, arangodb::LogicalView const& view, bool doSync) override; - virtual void createCollection(TRI_vocbase_t& vocbase, arangodb::LogicalCollection const& collection) override; + virtual void createCollection(TRI_vocbase_t& vocbase, + arangodb::LogicalCollection const& collection) override; virtual std::unique_ptr createDatabase(arangodb::CreateDatabaseInfo&&, ErrorCode& status) override; virtual arangodb::Result createLoggerState(TRI_vocbase_t*, VPackBuilder&) override; @@ -227,13 +233,15 @@ class StorageEngineMock : public arangodb::StorageEngine { arangodb::velocypack::Builder& result, bool includeIndexes, TRI_voc_tick_t maxTick) override; virtual ErrorCode getCollectionsAndIndexes(TRI_vocbase_t& vocbase, - arangodb::velocypack::Builder& result, - bool wasCleanShutdown, bool isUpgrade) override; + arangodb::velocypack::Builder& result, + bool wasCleanShutdown, bool isUpgrade) override; virtual void getDatabases(arangodb::velocypack::Builder& result) override; virtual void cleanupReplicationContexts() override; - virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase, ErrorCode& result) override; + virtual arangodb::velocypack::Builder getReplicationApplierConfiguration( + TRI_vocbase_t& vocbase, ErrorCode& result) override; virtual arangodb::velocypack::Builder getReplicationApplierConfiguration(ErrorCode& result) override; - virtual ErrorCode getViews(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result) override; + virtual ErrorCode getViews(TRI_vocbase_t& vocbase, + arangodb::velocypack::Builder& result) override; virtual arangodb::Result handleSyncKeys(arangodb::DatabaseInitialSyncer& syncer, arangodb::LogicalCollection& col, std::string const& keysId) override; @@ -256,15 +264,16 @@ class StorageEngineMock : public arangodb::StorageEngine { arangodb::LogicalCollection const& collection, std::string const& oldName) override; virtual ErrorCode saveReplicationApplierConfiguration(TRI_vocbase_t& vocbase, - arangodb::velocypack::Slice slice, - bool doSync) override; - virtual ErrorCode saveReplicationApplierConfiguration(arangodb::velocypack::Slice, bool) override; + arangodb::velocypack::Slice slice, + bool doSync) override; + virtual ErrorCode saveReplicationApplierConfiguration(arangodb::velocypack::Slice, + bool) override; virtual std::string versionFilename(TRI_voc_tick_t) const override; virtual void waitForEstimatorSync(std::chrono::milliseconds maxWaitTime) override; virtual arangodb::WalAccess const* walAccess() const override; static std::shared_ptr buildLinkMock( - arangodb::IndexId id, arangodb::LogicalCollection& collection, VPackSlice const& info); + arangodb::IndexId id, arangodb::LogicalCollection& collection, VPackSlice const& info); private: TRI_voc_tick_t _releasedTick;