|
22 | 22 | ////////////////////////////////////////////////////////////////////////////////
|
23 | 23 |
|
24 | 24 | #include "ApplicationFeatures/ApplicationServer.h"
|
| 25 | +#include "Pregel/Conductor/Messages.h" |
25 | 26 | #include "VocBase/vocbase.h"
|
26 | 27 | #include "Pregel/AlgoRegistry.h"
|
27 | 28 | #include "Pregel/Algos/ConnectedComponents.h"
|
@@ -94,83 +95,77 @@ IAlgorithm* AlgoRegistry::createAlgorithm(
|
94 | 95 | }
|
95 | 96 |
|
96 | 97 | template<typename V, typename E, typename M>
|
97 |
| -/*static*/ std::shared_ptr<IWorker> AlgoRegistry::createWorker( |
98 |
| - TRI_vocbase_t& vocbase, Algorithm<V, E, M>* algo, VPackSlice body, |
99 |
| - PregelFeature& feature) { |
100 |
| - return std::make_shared<Worker<V, E, M>>(vocbase, algo, body, feature); |
| 98 | +std::shared_ptr<IWorker> AlgoRegistry::createWorker( |
| 99 | + TRI_vocbase_t& vocbase, Algorithm<V, E, M>* algo, |
| 100 | + CreateWorker const& parameters, PregelFeature& feature) { |
| 101 | + return std::make_shared<Worker<V, E, M>>(vocbase, algo, parameters, feature); |
101 | 102 | }
|
102 | 103 |
|
103 |
| -/*static*/ std::shared_ptr<IWorker> AlgoRegistry::createWorker( |
104 |
| - TRI_vocbase_t& vocbase, VPackSlice body, PregelFeature& feature) { |
105 |
| - VPackSlice algoSlice = body.get(Utils::algorithmKey); |
106 |
| - |
107 |
| - if (!algoSlice.isString()) { |
108 |
| - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, |
109 |
| - "Supplied bad parameters to worker"); |
110 |
| - } |
111 |
| - |
112 |
| - VPackSlice userParams = body.get(Utils::userParametersKey); |
113 |
| - std::string algorithm = algoSlice.copyString(); |
| 104 | +std::shared_ptr<IWorker> AlgoRegistry::createWorker( |
| 105 | + TRI_vocbase_t& vocbase, CreateWorker const& parameters, |
| 106 | + PregelFeature& feature) { |
| 107 | + VPackSlice userParams = parameters.userParameters.slice(); |
| 108 | + std::string algorithm = parameters.algorithm; |
114 | 109 | std::transform(algorithm.begin(), algorithm.end(), algorithm.begin(),
|
115 | 110 | ::tolower);
|
116 | 111 |
|
117 | 112 | auto& server = vocbase.server();
|
118 | 113 | if (algorithm == "sssp") {
|
119 | 114 | return createWorker(vocbase, new algos::SSSPAlgorithm(server, userParams),
|
120 |
| - body, feature); |
| 115 | + parameters, feature); |
121 | 116 | } else if (algorithm == "pagerank") {
|
122 |
| - return createWorker(vocbase, new algos::PageRank(server, userParams), body, |
123 |
| - feature); |
| 117 | + return createWorker(vocbase, new algos::PageRank(server, userParams), |
| 118 | + parameters, feature); |
124 | 119 | } else if (algorithm == "recoveringpagerank") {
|
125 | 120 | return createWorker(vocbase,
|
126 |
| - new algos::RecoveringPageRank(server, userParams), body, |
127 |
| - feature); |
| 121 | + new algos::RecoveringPageRank(server, userParams), |
| 122 | + parameters, feature); |
128 | 123 | } else if (algorithm == "shortestpath") {
|
129 | 124 | return createWorker(vocbase,
|
130 | 125 | new algos::ShortestPathAlgorithm(server, userParams),
|
131 |
| - body, feature); |
| 126 | + parameters, feature); |
132 | 127 | } else if (algorithm == "linerank") {
|
133 |
| - return createWorker(vocbase, new algos::LineRank(server, userParams), body, |
134 |
| - feature); |
| 128 | + return createWorker(vocbase, new algos::LineRank(server, userParams), |
| 129 | + parameters, feature); |
135 | 130 | } else if (algorithm == "effectivecloseness") {
|
136 | 131 | return createWorker(vocbase,
|
137 |
| - new algos::EffectiveCloseness(server, userParams), body, |
138 |
| - feature); |
| 132 | + new algos::EffectiveCloseness(server, userParams), |
| 133 | + parameters, feature); |
139 | 134 | } else if (algorithm == "connectedcomponents") {
|
140 | 135 | return createWorker(vocbase,
|
141 | 136 | new algos::ConnectedComponents(server, userParams),
|
142 |
| - body, feature); |
| 137 | + parameters, feature); |
143 | 138 | } else if (algorithm == "scc") {
|
144 |
| - return createWorker(vocbase, new algos::SCC(server, userParams), body, |
| 139 | + return createWorker(vocbase, new algos::SCC(server, userParams), parameters, |
145 | 140 | feature);
|
146 | 141 | } else if (algorithm == "hits") {
|
147 |
| - return createWorker(vocbase, new algos::HITS(server, userParams), body, |
148 |
| - feature); |
| 142 | + return createWorker(vocbase, new algos::HITS(server, userParams), |
| 143 | + parameters, feature); |
149 | 144 | } else if (algorithm == "hitskleinberg") {
|
150 | 145 | return createWorker(vocbase, new algos::HITSKleinberg(server, userParams),
|
151 |
| - body, feature); |
| 146 | + parameters, feature); |
152 | 147 | } else if (algorithm == "labelpropagation") {
|
153 | 148 | return createWorker(vocbase,
|
154 |
| - new algos::LabelPropagation(server, userParams), body, |
155 |
| - feature); |
| 149 | + new algos::LabelPropagation(server, userParams), |
| 150 | + parameters, feature); |
156 | 151 | } else if (algorithm == "slpa") {
|
157 |
| - return createWorker(vocbase, new algos::SLPA(server, userParams), body, |
158 |
| - feature); |
| 152 | + return createWorker(vocbase, new algos::SLPA(server, userParams), |
| 153 | + parameters, feature); |
159 | 154 | } else if (algorithm == "dmid") {
|
160 |
| - return createWorker(vocbase, new algos::DMID(server, userParams), body, |
161 |
| - feature); |
| 155 | + return createWorker(vocbase, new algos::DMID(server, userParams), |
| 156 | + parameters, feature); |
162 | 157 | } else if (algorithm == "wcc") {
|
163 |
| - return createWorker(vocbase, new algos::WCC(server, userParams), body, |
| 158 | + return createWorker(vocbase, new algos::WCC(server, userParams), parameters, |
164 | 159 | feature);
|
165 | 160 | } else if (algorithm == "colorpropagation") {
|
166 | 161 | return createWorker(vocbase,
|
167 |
| - new algos::ColorPropagation(server, userParams), body, |
168 |
| - feature); |
| 162 | + new algos::ColorPropagation(server, userParams), |
| 163 | + parameters, feature); |
169 | 164 | }
|
170 | 165 | #if defined(ARANGODB_ENABLE_MAINTAINER_MODE)
|
171 | 166 | else if (algorithm == "readwrite") {
|
172 |
| - return createWorker(vocbase, new algos::ReadWrite(server, userParams), body, |
173 |
| - feature); |
| 167 | + return createWorker(vocbase, new algos::ReadWrite(server, userParams), |
| 168 | + parameters, feature); |
174 | 169 | }
|
175 | 170 | #endif
|
176 | 171 | THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
|
|
0 commit comments