8000 [PREG-226] Use structs for conductor messages (#17952) · cloudhub-js/arangodb@c64ff03 · GitHub
[go: up one dir, main page]

Skip to content

Commit c64ff03

Browse files
authored
[PREG-226] Use structs for conductor messages (arangodb#17952)
* Use struct for CreateWorker message * Use struct for prepare gss message * Use struct to start gss * Delete unused path (remnant of deleted recovery) * Use struct for finalize execution message * Use struct for collect pregel results (via aql) message * Delete commented out code * Add missing include
1 parent 72814ae commit c64ff03

12 files changed

+334
-279
lines changed

arangod/Pregel/AlgoRegistry.cpp

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
////////////////////////////////////////////////////////////////////////////////
2323

2424
#include "ApplicationFeatures/ApplicationServer.h"
25+
#include "Pregel/Conductor/Messages.h"
2526
#include "VocBase/vocbase.h"
2627
#include "Pregel/AlgoRegistry.h"
2728
#include "Pregel/Algos/ConnectedComponents.h"
@@ -94,83 +95,77 @@ IAlgorithm* AlgoRegistry::createAlgorithm(
9495
}
9596

9697
template<typename V, typename E, typename M>
97-
/*static*/ std::shared_ptr<IWorker> AlgoRegistry::createWorker(
98-
TRI_vocbase_t& vocbase, Algorithm<V, E, M>* algo, VPackSlice body,
99-
PregelFeature& feature) {
100-
return std::make_shared<Worker<V, E, M>>(vocbase, algo, body, feature);
98+
std::shared_ptr<IWorker> AlgoRegistry::createWorker(
99+
TRI_vocbase_t& vocbase, Algorithm<V, E, M>* algo,
100+
CreateWorker const& parameters, PregelFeature& feature) {
101+
return std::make_shared<Worker<V, E, M>>(vocbase, algo, parameters, feature);
101102
}
102103

103-
/*static*/ std::shared_ptr<IWorker> AlgoRegistry::createWorker(
104-
TRI_vocbase_t& vocbase, VPackSlice body, PregelFeature& feature) {
105-
VPackSlice algoSlice = body.get(Utils::algorithmKey);
106-
107-
if (!algoSlice.isString()) {
108-
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
109-
"Supplied bad parameters to worker");
110-
}
111-
112-
VPackSlice userParams = body.get(Utils::userParametersKey);
113-
std::string algorithm = algoSlice.copyString();
104+
std::shared_ptr<IWorker> AlgoRegistry::createWorker(
105+
TRI_vocbase_t& vocbase, CreateWorker const& parameters,
106+
PregelFeature& feature) {
107+
VPackSlice userParams = parameters.userParameters.slice();
108+
std::string algorithm = parameters.algorithm;
114109
std::transform(algorithm.begin(), algorithm.end(), algorithm.begin(),
115110
::tolower);
116111

117112
auto& server = vocbase.server();
118113
if (algorithm == "sssp") {
119114
return createWorker(vocbase, new algos::SSSPAlgorithm(server, userParams),
120-
body, feature);
115+
parameters, feature);
121116
} else if (algorithm == "pagerank") {
122-
return createWorker(vocbase, new algos::PageRank(server, userParams), body,
123-
feature);
117+
return createWorker(vocbase, new algos::PageRank(server, userParams),
118+
parameters, feature);
124119
} else if (algorithm == "recoveringpagerank") {
125120
return createWorker(vocbase,
126-
new algos::RecoveringPageRank(server, userParams), body,
127-
feature);
121+
new algos::RecoveringPageRank(server, userParams),
122+
parameters, feature);
128123
} else if (algorithm == "shortestpath") {
129124
return createWorker(vocbase,
130125
new algos::ShortestPathAlgorithm(server, userParams),
131-
body, feature);
126+
parameters, feature);
132127
} else if (algorithm == "linerank") {
133-
return createWorker(vocbase, new algos::LineRank(server, userParams), body,
134-
feature);
128+
return createWorker(vocbase, new algos::LineRank(server, userParams),
129+
parameters, feature);
135130
} else if (algorithm == "effectivecloseness") {
136131
return createWorker(vocbase,
137-
new algos::EffectiveCloseness(server, userParams), body,
138-
feature);
132+
new algos::EffectiveCloseness(server, userParams),
133+
parameters, feature);
139134
} else if (algorithm == "connectedcomponents") {
140135
return createWorker(vocbase,
141136
new algos::ConnectedComponents(server, userParams),
142-
body, feature);
137+
parameters, feature);
143138
} else if (algorithm == "scc") {
144-
return createWorker(vocbase, new algos::SCC(server, userParams), body,
139+
return createWorker(vocbase, new algos::SCC(server, userParams), parameters,
145140
feature);
146141
} else if (algorithm == "hits") {
147-
return createWorker(vocbase, new algos::HITS(server, userParams), body,
148-
feature);
142+
return createWorker(vocbase, new algos::HITS(server, userParams),
143+
parameters, feature);
149144
} else if (algorithm == "hitskleinberg") {
150145
return createWorker(vocbase, new algos::HITSKleinberg(server, userParams),
151-
body, feature);
146+
parameters, feature);
152147
} else if (algorithm == "labelpropagation") {
153148
return createWorker(vocbase,
154-
new algos::LabelPropagation(server, userParams), body,
155-
feature);
149+
new algos::LabelPropagation(server, userParams),
150+
parameters, feature);
156151
} else if (algorithm == "slpa") {
157-
return createWorker(vocbase, new algos::SLPA(server, userParams), body,
158-
feature);
152+
return createWorker(vocbase, new algos::SLPA(server, userParams),
153+
parameters, feature);
159154
} else if (algorithm == "dmid") {
160-
return createWorker(vocbase, new algos::DMID(server, userParams), body,
161-
feature);
155+
return createWorker(vocbase, new algos::DMID(server, userParams),
156+
parameters, feature);
162157
} else if (algorithm == "wcc") {
163-
return createWorker(vocbase, new algos::WCC(server, userParams), body,
158+
return createWorker(vocbase, new algos::WCC(server, userParams), parameters,
164159
feature);
165160
} else if (algorithm == "colorpropagation") {
166161
return createWorker(vocbase,
167-
new algos::ColorPropagation(server, userParams), body,
168-
feature);
162+
new algos::ColorPropagation(server, userParams),
163+
parameters, feature);
169164
}
170165
#if defined(ARANGODB_ENABLE_MAINTAINER_MODE)
171166
else if (algorithm == "readwrite") {
172-
return createWorker(vocbase, new algos::ReadWrite(server, userParams), body,
173-
feature);
167+
return createWorker(vocbase, new algos::ReadWrite(server, userParams),
168+
parameters, feature);
174169
}
175170
#endif
176171
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,

arangod/Pregel/AlgoRegistry.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include <string>
2727
#include "Pregel/Algorithm.h"
28+
#include "Pregel/Conductor/Messages.h"
2829
#include "Pregel/Worker/Worker.h"
2930

3031
struct TRI_vocbase_t;
@@ -39,14 +40,14 @@ struct AlgoRegistry {
3940
application_features::ApplicationServer& server,
4041
std::string const& algorithm, VPackSlice userParams);
4142
static std::shared_ptr<IWorker> createWorker(TRI_vocbase_t& vocbase,
42-
VPackSlice body,
43+
CreateWorker const& parameters,
4344
PregelFeature& feature);
4445

4546
private:
4647
template<typename V, typename E, typename M>
4748
static std::shared_ptr<IWorker> createWorker(TRI_vocbase_t& vocbase,
4849
Algorithm<V, E, M>* algo,
49-
VPackSlice body,
50+
CreateWorker const& parameters,
5051
PregelFeature& feature);
5152
};
5253

0 commit comments

Comments
 (0)
0