8000 [PREG-207] Use struct for pregel options by jvolmer · Pull Request #17925 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[PREG-207] Use struct for pregel options #17925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use struct for pregel options
  • Loading branch information
jvolmer committed Jan 12, 2023
commit 6d73f4766aafa7f9b1363a95f717644f1e15dcbe
79 changes: 65 additions & 14 deletions arangod/Pregel/PregelFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "Graph/GraphManager.h"
#include "Metrics/CounterBuilder.h"
#include "Metrics/GaugeBuilder.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Pregel/AlgoRegistry.h"
#include "Pregel/Conductor.h"
#include "Pregel/ExecutionNumber.h"
#include "Pregel/PregelOptions.h"
#include "Pregel/Utils.h"
#include "Pregel/Worker.h"
#include "RestServer/DatabasePathFeature.h"
Expand Down Expand Up @@ -96,25 +98,71 @@ network::Headers buildHeaders() {

} // namespace

ResultT<ExecutionNumber> PregelFeature::startExecution(
TRI_vocbase_t& vocbase, std::string algorithm,
std::vector<std::string> const& vertexCollections,
std::vector<std::string> const& edgeCollections,
std::unordered_map<std::string, std::vector<std::string>> const&
edgeCollectionRestrictions,
VPackSlice const& params) {
ResultT<ExecutionNumber> PregelFeature::startExecution(TRI_vocbase_t& vocbase,
PregelOptions options) {
if (isStopping() || _softShutdownOngoing.load(std::memory_order_relaxed)) {
return Result{TRI_ERROR_SHUTTING_DOWN, "pregel system not available"};
}

// // extract the collections
std::vector<std::string> vertexCollections;
std::vector<std::string> edgeCollections;
std::unordered_map<std::string, std::vector<std::string>>
edgeCollectionRestrictions;

if (std::holds_alternative<GraphCollectionNames>(
options.graphSource.graphOrCollections)) {
auto collectionNames =
std::get<GraphCollectionNames>(options.graphSource.graphOrCollections);
vertexCollections = collectionNames.vertexCollections;
edgeCollections = collectionNames.edgeCollections;
edgeCollectionRestrictions =
options.graphSource.edgeCollectionRestrictions.items;
} else {
auto graphName =
std::get<GraphName>(options.graphSource.graphOrCollections);
if (graphName.graph == "") {
return Result{TRI_ERROR_BAD_PARAMETER, "expecting graphName as string"};
}

graph::GraphManager gmngr{vocbase};
auto graphRes = gmngr.lookupGraphByName(graphName.graph);
if (graphRes.fail()) {
return std::move(graphRes).result();
}
std::unique_ptr<graph::Graph> graph = std::move(graphRes.get());

auto const& gv = graph->vertexCollections();
for (auto const& v : gv) {
vertexCollections.push_back(v);
}

auto const& ge = graph->edgeCollections();
for (auto const& e : ge) {
edgeCollections.push_back(e);
}

auto const& ed = graph->edgeDefinitions();
for (auto const& e : ed) {
auto const& from = e.second.getFrom();
// intentionally create map entry
for (auto const& f : from) {
auto& restrictions = edgeCollectionRestrictions[f];
restrictions.push_back(e.second.getName());
}
}
}

ServerState* ss = ServerState::instance();

// check the access rights to collections
ExecContext const& exec = ExecContext::current();
if (!exec.isSuperuser()) {
TRI_ASSERT(params.isObject());
VPackSlice storeSlice = params.get("store");
// TODO get rid of that when we have a pregel parameter struct
TRI_ASSERT(options.userParameters.slice().isObject());
VPackSlice storeSlice = options.userParameters.slice().get("store");
bool storeResults = !storeSlice.isBool() || storeSlice.getBool();

for (std::string const& vc : vertexCollections) {
bool canWrite = exec.canUseCollection(vc, auth::Level::RW);
bool canRead = exec.canUseCollection(vc, auth::Level::RO);
Expand Down Expand Up @@ -177,10 +225,13 @@ ResultT<ExecutionNumber> PregelFeature::startExecution(
if (!coll->isSmart()) {
std::vector<std::string> eKeys = coll->shardKeys();

std::string shardKeyAttribute = "vertex";
if (params.hasKey("shardKeyAttribute")) {
shardKeyAttribute = params.get("shardKeyAttribute").copyString();
}
// TODO get rid of that when we have a pregel parameter struct
std::string shardKeyAttribute =
options.userParameters.slice().hasKey("shardKeyAttribute")
? options.userParameters.slice()
.get("shardKeyAttribute")
.copyString()
: "vertex";

if (eKeys.size() != 1 || eKeys[0] != shardKeyAttribute) {
return Result{
Expand Down Expand Up @@ -222,7 +273,7 @@ ResultT<ExecutionNumber> PregelFeature::startExecution(
auto en = createExecutionNumber();
auto c = std::make_shared<pregel::Conductor>(
en, vocbase, vertexCollections, edgeColls, edgeCollectionRestrictions,
algorithm, params, *this);
options.algorithm, options.userParameters.slice(), *this);
addConductor(std::move(c), en);
TRI_ASSERT(conductor(en));
conductor(en)->start();
Expand Down
10 changes: 3 additions & 7 deletions arangod/Pregel/PregelFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Pregel/ExecutionNumber.h"
#include "Pregel/PregelOptions.h"
#include "ProgramOptions/ProgramOptions.h"
#include "RestServer/arangod.h"
#include "Scheduler/Scheduler.h"
Expand All @@ -56,13 +57,8 @@ class PregelFeature final : public ArangodFeature {
explicit PregelFeature(Server& server);
~PregelFeature();

ResultT<ExecutionNumber> startExecution(
TRI_vocbase_t& vocbase, std::string algorithm,
std::vector<std::string> const& vertexCollections,
std::vector<std::string> const& edgeCollections,
std::unordered_map<std::string, std::vector<std::string>> const&
edgeCollectionRestrictions,
VPackSlice const& params);
ResultT<ExecutionNumber> startExecution(TRI_vocbase_t& vocbase,
PregelOptions options);

void collectOptions(std::shared_ptr<arangodb::options::ProgramOptions>
options) override final;
Expand Down
97 changes: 97 additions & 0 deletions arangod/Pregel/PregelOptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2022 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Julia Volmer
////////////////////////////////////////////////////////////////////////////////
#pragma once

#include <string>
#include <variant>
#include "Cluster/ClusterTypes.h"
#include "Inspection/Types.h"
#include "velocypack/Builder.h"

namespace arangodb::pregel {

using VertexCollectionID = CollectionID;
using EdgeCollectionID = CollectionID;
using VertexShardID = ShardID;
using EdgeShardID = ShardID;

struct GraphCollectionNames {
std::vector<std::string> vertexCollections;
std::vector<std::string> edgeCollections;
};
template<typename Inspector>
auto inspect(Inspector& f, GraphCollectionNames& x) {
return f.object(x).fields(f.field("vertexCollections", x.vertexCollections),
f.field("edgeCollections", x.edgeCollections));
}

struct GraphName {
std::string graph;
};
template<typename Inspector>
auto inspect(Inspector& f, GraphName& x) {
return f.object(x).fields(f.field("graph", x.graph));
}

/**
Maps from vertex collection name to a list of edge collections that this
vertex collection is restricted to.
It is only used for a collection if there is at least one entry for the
collection!
**/
struct EdgeCollectionRestrictions {
std::unordered_map<VertexCollectionID, std::vector<EdgeCollectionID>> items;
auto add(EdgeCollectionRestrictions others) const
-> EdgeCollectionRestrictions;
};
template<typename Inspector>
auto inspect(Inspector& f, EdgeCollectionRestrictions& x) {
return f.object(x).fields(f.field("items", x.items));
}

struct GraphOrCollection : std::variant<GraphCollectionNames, GraphName> {};
template<class Inspector>
auto inspect(Inspector& f, GraphOrCollection& x) {
return f.variant(x).unqualified().alternatives(
arangodb::inspection::type<GraphCollectionNames>("collectionNames"),
arangodb::inspection::type<GraphName>("graphName"));
}

struct GraphSource {
GraphOrCollection graphOrCollections;
EdgeCollectionRestrictions edgeCollectionRestrictions;
};
template<typename Inspector>
auto inspect(Inspector& f, GraphSource& x) {
return f.object(x).fields(
f.field("graphOrCollection", x.graphOrCollections),
f.field("edgeCollectionRestrictions", x.edgeCollectionRestrictions));
}

struct PregelOptions {
std::string algorithm;
VPackBuilder userParameters;
GraphSource graphSource;
};

} // namespace arangodb::pregel
3 changes: 2 additions & 1 deletion arangod/Pregel/REST/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target_sources(arango_pregel PRIVATE
RestControlPregelHandler.cpp
RestPregelHandler.cpp)
RestPregelHandler.cpp
RestOptions.cpp)
77 changes: 9 additions & 68 deletions arangod/Pregel/REST/RestControlPregelHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Graph/Graph.h"
#include "Graph/GraphManager.h"
#include "Inspection/VPackWithErrorT.h"
#include "Pregel/Conductor.h"
#include "Pregel/ExecutionNumber.h"
#include "Pregel/PregelFeature.h"
#include "Pregel/REST/RestOptions.h"
#include "Transaction/StandaloneContext.h"

#include <velocypack/Builder.h>
#include <velocypack/Iterator.h>
#include <velocypack/SharedSlice.h>

using namespace arangodb::basics;
using namespace arangodb::rest;
Expand Down Expand Up @@ -118,75 +119,15 @@ void RestControlPregelHandler::startExecution() {
return;
}

// algorithm
std::string algorithm =
VelocyPackHelper::getStringValue(body, "algorithm", StaticStrings::Empty);
if ("" == algorithm) {
auto restOptions = inspection::deserializeWithErrorT<pregel::RestOptions>(
velocypack::SharedSlice(velocypack::SharedSlice{}, body));
if (!restOptions.ok()) {
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND,
"invalid algorithm");
return;
}

// extract the parameters
auto parameters = body.get("params");
if (!parameters.isObject()) {
parameters = VPackSlice::emptyObjectSlice();
}

// extract the collections
std::vector<std::string> vertexCollections;
std::vector<std::string> edgeCollections;
std::unordered_map<std::string, std::vector<std::string>>
edgeCollectionRestrictions;
auto vc = body.get("vertexCollections");
auto ec = body.get("edgeCollections");
if (vc.isArray() && ec.isArray()) {
for (auto v : VPackArrayIterator(vc)) {
vertexCollections.push_back(v.copyString());
}
for (auto e : VPackArrayIterator(ec)) {
edgeCollections.push_back(e.copyString());
}
} else {
auto gs = VelocyPackHelper::getStringValue(body, "graphName", "");
if ("" == gs) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"expecting graphName as string");
return;
}

graph::GraphManager gmngr{_vocbase};
auto graphRes = gmngr.lookupGraphByName(gs);
if (graphRes.fail()) {
generateError(std::move(graphRes).result());
return;
}
std::unique_ptr<graph::Graph> graph = std::move(graphRes.get());

auto const& gv = graph->vertexCollections();
for (auto const& v : gv) {
vertexCollections.push_back(v);
}

auto const& ge = graph->edgeCollections();
for (auto const& e : ge) {
edgeCollections.push_back(e);
}

auto const& ed = graph->edgeDefinitions();
for (auto const& e : ed) {
auto const& from = e.second.getFrom();
// intentionally create map entry
for (auto const& f : from) {
auto& restrictions = edgeCollectionRestrictions[f];
restrictions.push_back(e.second.getName());
}
}
restOptions.error().error());
}
auto options = std::move(restOptions).get().options();

auto res = _pregel.startExecution(_vocbase, algorithm, vertexCollections,
edgeCollections, edgeCollectionRestrictions,
parameters);
auto res = _pregel.startExecution(_vocbase, options);
if (res.fail()) {
generateError(res.result());
return;
Expand Down
23 changes: 23 additions & 0 deletions arangod/Pregel/REST/RestOptions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "RestOptions.h"
#include <variant>

using namespace arangodb::pregel;

auto RestOptions::options() -> PregelOptions {
if (std::holds_alternative<pregel::RestGraphSettings>(*this)) {
auto x = std::get<pregel::RestGraphSettings>(*this);
return PregelOptions{
.algorithm = x.options.algorithm,
.userParameters = x.options.userParameters,
.graphSource = {{GraphName{.graph = x.graph}},
{x.options.edgeCollectionRestrictions}}};
}
auto x = std::get<pregel::RestCollectionSettings>(*this);
return PregelOptions{
.algorithm = x.options.algorithm,
.userParameters = x.options.userParameters,
.graphSource = {
{GraphCollectionNames{.vertexCollections = x.vertexCollections,
.edgeCollections = x.edgeCollections}},
{x.options.edgeCollectionRestrictions}}};
}
Loading
0