@@ -40,95 +40,136 @@ using namespace arangodb::pregel::algos;
40
40
41
41
namespace {
42
42
43
- struct WCCComputation : public VertexComputation <uint64_t , uint64_t , SenderMessage<uint64_t >> {
43
+ struct WCCComputation
44
+ : public VertexComputation<WCCValue, uint64_t , SenderMessage<uint64_t >> {
44
45
WCCComputation () {}
45
46
void compute (MessageIterator<SenderMessage<uint64_t >> const & messages) override {
46
- uint64_t currentComponent = vertexData ();
47
+ bool shouldPropagate = selectMinimumOfLocalAndInput (messages);
48
+ // We need to propagate on first step
49
+ TRI_ASSERT (globalSuperstep () != 0 || shouldPropagate);
47
50
48
- if (globalSuperstep () > 0 ) {
49
- bool halt = true ;
51
+ if (shouldPropagate) {
52
+ propagate ();
53
+ }
54
+ // We can always stop.
55
+ // Every vertex will be awoken on
56
+ // input messages. If there are no input
57
+ // messages for us, we have the same ID
58
+ // as our neighbors.
59
+ voteHalt ();
60
+ }
50
61
51
- for (const SenderMessage<uint64_t >* msg : messages) {
52
- if (msg->value < currentComponent) {
53
- currentComponent = msg->value ;
54
- // TODO: optimization update the edge value if present
55
- // problem: there might be loads of edges, could be expensive
56
- }
62
+ private:
63
+ /* *
64
+ * @brief Scan the input, compare it pairwise with our current value.
65
+ * We store the minimum into our current value.
66
+ * And return true, whenever there was a difference between input and our
67
+ * value. This difference indicates that the sender or this vertex are in
68
+ * different components if this vertex is off, we will send the new component
69
+ * to all our neighbors, if the other vertex is off, we will send our
70
+ * component back. Will always return true in the very first step, as this
71
+ * kicks of the algorithm and does not yet have input.
72
+ */
73
+ bool selectMinimumOfLocalAndInput (MessageIterator<SenderMessage<uint64_t >> const & messages) {
74
+ // On first iteration, we need to propagate.
75
+ // Otherwise the default is to stay silent, unless some message
76
+ // sends a different component then us.
77
+ // Either the sender has a wrong component or we have.
78
+ bool shouldPropagate = globalSuperstep () == 0 ;
79
+
80
+ auto & myData = *mutableVertexData ();
81
+ for (const SenderMessage<uint64_t >* msg : messages) {
82
+ if (globalSuperstep () == 1 ) {
83
+ // In the first step, we need to retain all inbound connections
84
+ // for propagation
85
+ myData.inboundNeighbors .emplace (msg->senderId );
57
86
}
58
-
59
- SenderMessage< uint64_t > message ( pregelId (), currentComponent);
60
- for ( const SenderMessage< uint64_t >* msg : messages) {
61
- if (msg->value > currentComponent ) {
62
- TRI_ASSERT (msg-> senderId != pregelId ());
63
- sendMessage (msg-> senderId , message);
64
- halt = false ;
87
+ if (msg-> value != myData. component ) {
88
+ // we have a difference. Send updates
89
+ shouldPropagate = true ;
90
+ if (msg->value < myData. component ) {
91
+ // The other component is lower.
92
+ // We join this component
93
+ myData. component = msg-> value ;
65
94
}
66
95
}
67
-
68
- if (currentComponent != vertexData ()) {
69
- *mutableVertexData () = currentComponent;
70
- halt = false ;
71
- }
72
-
73
- if (halt) {
74
- voteHalt ();
75
- } else {
76
- voteActive ();
77
- }
78
96
}
79
-
80
- if (this ->getEdgeCount () > 0 ) {
81
- SenderMessage<uint64_t > message (pregelId (), currentComponent);
82
- RangeIterator<Edge<uint64_t >> edges = this ->getEdges ();
83
- for (; edges.hasMore (); ++edges) {
84
- Edge<uint64_t >* edge = *edges;
85
- if (edge->toKey () == this ->key ()) {
86
- continue ; // no need to send message to self
87
- }
88
-
89
- // remember the value we send
90
- edge->data () = currentComponent;
97
+ return shouldPropagate;
98
+ }
91
99
92
- sendMessage (edge, message);
100
+ /* *
101
+ * @brief
102
+ * Send the current vertex data to all our neighbors, inbound
103
+ * and outbound.
104
+ * Store the component value in the outbound edges
105
+ */
106
+ void propagate () {
107
+ auto const & myData = vertexData ();
108
+ SenderMessage<uint64_t > message (pregelId (), myData.component );
109
+ // Send to OUTBOUND neighbors
110
+ RangeIterator<Edge<uint64_t >> edges = this ->getEdges ();
111
+ for (; edges.hasMore (); ++edges) {
112
+ Edge<uint64_t >* edge = *edges;
113
+ if (edge->toKey () == this ->key ()) {
114
+ continue ; // no need to send message to self
93
115
}
116
+
117
+ // remember the value we send
118
+ // NOTE: I have done refactroing of the algorithm
119
+ // the original variant saved this, i do not know
120
+ // if it is actually relevant for anything.
121
+ edge->data () = myData.component ;
122
+
123
+ sendMessage (edge, message);
124
+ }
125
+ // Also send to INBOUND neighbors
126
+ for (auto const & target : myData.inboundNeighbors ) {
127
+ sendMessage (target, message);
94
128
}
95
129
}
130
+
131
+ private:
96
132
};
97
133
98
- struct WCCGraphFormat final : public GraphFormat<uint64_t , uint64_t > {
134
+ struct WCCGraphFormat final : public GraphFormat<WCCValue , uint64_t > {
99
135
explicit WCCGraphFormat (application_features::ApplicationServer& server,
100
- std::string const & result)
101
- : GraphFormat<uint64_t , uint64_t>(server), _resultField(result) {}
102
-
136
+ std::string const & result)
137
+ : GraphFormat<WCCValue , uint64_t>(server), _resultField(result) {}
138
+
103
139
std::string const _resultField;
104
-
105
- size_t estimatedVertexSize () const override { return sizeof (uint64_t ); }
140
+
141
+ size_t estimatedVertexSize () const override {
142
+ // This is a very rough and guessed estimate.
143
+ // We need some space for the inbound connections,
144
+ // but we have not a single clue how many we will have
145
+ return sizeof (uint64_t ) + 8 * sizeof (PregelID);
146
+ }
106
147
size_t estimatedEdgeSize () const override { return sizeof (uint64_t ); }
107
148
108
149
void copyVertexData (arangodb::velocypack::Options const &, std::string const & /* documentId*/ ,
109
150
arangodb::velocypack::Slice /* document*/ ,
110
- uint64_t & targetPtr, uint64_t & vertexIdRange) override {
111
- targetPtr = vertexIdRange++;
151
+ WCCValue & targetPtr, uint64_t & vertexIdRange) override {
152
+ targetPtr. component = vertexIdRange++;
112
153
}
113
154
114
155
void copyEdgeData (arangodb::velocypack::Options const &,
115
156
arangodb::velocypack::Slice /* document*/ , uint64_t & targetPtr) override {
116
157
targetPtr = std::numeric_limits<uint64_t >::max ();
117
158
}
118
159
119
- bool buildVertexDocument (arangodb::velocypack::Builder& b, uint64_t const * ptr) const override {
120
- b.add (_resultField, arangodb::velocypack::Value (* ptr));
160
+ bool buildVertexDocument (arangodb::velocypack::Builder& b, WCCValue const * ptr) const override {
161
+ b.add (_resultField, arangodb::velocypack::Value (ptr-> component ));
121
162
return true ;
122
163
}
123
164
};
124
- }
165
+ } // namespace
125
166
126
- VertexComputation<uint64_t , uint64_t , SenderMessage<uint64_t >>* WCC::createComputation (
167
+ VertexComputation<WCCValue , uint64_t , SenderMessage<uint64_t >>* WCC::createComputation (
127
168
WorkerConfig const * config) const {
128
169
return new ::WCCComputation ();
129
170
}
130
171
131
- GraphFormat<uint64_t , uint64_t >* WCC::inputFormat () const {
172
+ GraphFormat<WCCValue , uint64_t >* WCC::inputFormat () const {
132
173
return new ::WCCGraphFormat (_server, _resultField);
133
174
}
134
175
0 commit comments