8000 Bug fix/presupp 358 (#14404) · arangodb/arangodb@c055bcc · GitHub
[go: up one dir, main page]

Skip to content

Commit c055bcc

Browse files
authored
Bug fix/presupp 358 (#14404)
1 parent 98a9744 commit c055bcc

File tree

8 files changed

+258
-60
lines changed

8 files changed

+258
-60
lines changed

CHANGELOG

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
devel
22
-----
33

4+
* Bug-Fix: Pregel WCC algorithm could yield incorrect results if a
5+
part of the connected component was only attached via OUTBOUND edges.
6+
The underlying algorithm is now modified to properly retain INBOUND
7+
edges for the runtime of the execution. This uses more RAM for the
8+
algorithm but guarantees correctness.
9+
410
* Updated ArangoDB Starter to 0.15.1-preview-1.
511

612
* Updated arangosync to 2.4.0.

arangod/Pregel/Algos/WCC.cpp

Lines changed: 95 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -40,95 +40,136 @@ using namespace arangodb::pregel::algos;
4040

4141
namespace {
4242

43-
struct WCCComputation : public VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>> {
43+
struct WCCComputation
44+
: public VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>> {
4445
WCCComputation() {}
4546
void compute(MessageIterator<SenderMessage<uint64_t>> const& messages) override {
46-
uint64_t currentComponent = vertexData();
47+
bool shouldPropagate = selectMinimumOfLocalAndInput(messages);
48+
// We need to propagate on first step
49+
TRI_ASSERT(globalSuperstep() != 0 || shouldPropagate);
4750

48-
if (globalSuperstep() > 0) {
49-
bool halt = true;
51+
if (shouldPropagate) {
52+
propagate();
53+
}
54+
// We can always stop.
55+
// Every vertex will be awoken on
56+
// input messages. If there are no input
57+
// messages for us, we have the same ID
58+
// as our neighbors.
59+
voteHalt();
60+
}
5061

51-
for (const SenderMessage<uint64_t>* msg : messages) {
52-
if (msg->value < currentComponent) {
53-
currentComponent = msg->value;
54-
// TODO: optimization update the edge value if present
55-
// problem: there might be loads of edges, could be expensive
56-
}
62+
private:
63+
/**
64+
* @brief Scan the input, compare it pairwise with our current value.
65+
* We store the minimum into our current value.
66+
* And return true, whenever there was a difference between input and our
67+
* value. This difference indicates that the sender or this vertex are in
68+
* different components if this vertex is off, we will send the new component
69+
* to all our neighbors, if the other vertex is off, we will send our
70+
* component back. Will always return true in the very first step, as this
71+
* kicks of the algorithm and does not yet have input.
72+
*/
73+
bool selectMinimumOfLocalAndInput(MessageIterator<SenderMessage<uint64_t>> const& messages) {
74+
// On first iteration, we need to propagate.
75+
// Otherwise the default is to stay silent, unless some message
76+
// sends a different component then us.
77+
// Either the sender has a wrong component or we have.
78+
bool shouldPropagate = globalSuperstep() == 0;
79+
80+
auto& myData = *mutableVertexData();
81+
for (const SenderMessage<uint64_t>* msg : messages) {
82+
if (globalSuperstep() == 1) {
83+
// In the first step, we need to retain all inbound connections
84+
// for propagation
85+
myData.inboundNeighbors.emplace(msg->senderId);
5786
}
58-
59-
SenderMessage<uint64_t> message(pregelId(), currentComponent);
60-
for (const SenderMessage<uint64_t>* msg : messages) {
61-
if (msg->value > currentComponent) {
62-
TRI_ASSERT(msg->senderId != pregelId());
63-
sendMessage(msg->senderId, message);
64-
halt = false;
87+
if (msg->value != myData.component) {
88+
// we have a difference. Send updates
89+
shouldPropagate = true;
90+
if (msg->value < myData.component) {
91+
// The other component is lower.
92+
// We join this component
93+
myData.component = msg->value;
6594
}
6695
}
67-
68-
if (currentComponent != vertexData()) {
69-
*mutableVertexData() = currentComponent;
70-
halt = false;
71-
}
72-
73-
if (halt) {
74-
voteHalt();
75-
} else {
76-
voteActive();
77-
}
7896
}
79-
80-
if (this->getEdgeCount() > 0) {
81-
SenderMessage<uint64_t> message(pregelId(), currentComponent);
82-
RangeIterator<Edge<uint64_t>> edges = this->getEdges();
83-
for (; edges.hasMore(); ++edges) {
84-
Edge<uint64_t>* edge = *edges;
85-
if (edge->toKey() == this->key()) {
86-
continue; // no need to send message to self
87-
}
88-
89-
// remember the value we send
90-
edge->data() = currentComponent;
97+
return shouldPropagate;
98+
}
9199

92-
sendMessage(edge, message);
100+
/**
101+
* @brief
102+
* Send the current vertex data to all our neighbors, inbound
103+
* and outbound.
104+
* Store the component value in the outbound edges
105+
*/
106+
void propagate() {
107+
auto const& myData = vertexData();
108+
SenderMessage<uint64_t> message(pregelId(), myData.component);
109+
// Send to OUTBOUND neighbors
110+
RangeIterator<Edge<uint64_t>> edges = this->getEdges();
111+
for (; edges.hasMore(); ++edges) {
112+
Edge<uint64_t>* edge = *edges;
113+
if (edge->toKey() == this->key()) {
114+
continue; // no need to send message to self
93115
}
116+
117+
// remember the value we send
118+
// NOTE: I have done refactroing of the algorithm
119+
// the original variant saved this, i do not know
120+
// if it is actually relevant for anything.
121+
edge->data() = myData.component;
122+
123+
sendMessage(edge, message);
124+
}
125+
// Also send to INBOUND neighbors
126+
for (auto const& target : myData.inboundNeighbors) {
127+
sendMessage(target, message);
94128
}
95129
}
130+
131+
private:
96132
};
97133

98-
struct WCCGraphFormat final : public GraphFormat<uint64_t, uint64_t> {
134+
struct WCCGraphFormat final : public GraphFormat<WCCValue, uint64_t> {
99135
explicit WCCGraphFormat(application_features::ApplicationServer& server,
100-
std::string const& result)
101-
: GraphFormat<uint64_t, uint64_t>(server), _resultField(result) {}
102-
136+
std::string const& result)
137+
: GraphFormat<WCCValue, uint64_t>(server), _resultField(result) {}
138+
103139
std::string const _resultField;
104-
105-
size_t estimatedVertexSize() const override { return sizeof(uint64_t); }
140+
141+
size_t estimatedVertexSize() const override {
142+
// This is a very rough and guessed estimate.
143+
// We need some space for the inbound connections,
144+
// but we have not a single clue how many we will have
145+
return sizeof(uint64_t) + 8 * sizeof(PregelID);
146+
}
106147
size_t estimatedEdgeSize() const override { return sizeof(uint64_t); }
107148

108149
void copyVertexData(arangodb::velocypack::Options const&, std::string const& /*documentId*/,
109150
arangodb::velocypack::Slice /*document*/,
110-
uint64_t& targetPtr, uint64_t& vertexIdRange) override {
111-
targetPtr = vertexIdRange++;
151+
WCCValue& targetPtr, uint64_t& vertexIdRange) override {
152+
targetPtr.component = vertexIdRange++;
112153
}
113154

114155
void copyEdgeData(arangodb::velocypack::Options const&,
115156
arangodb::velocypack::Slice /*document*/, uint64_t& targetPtr) override {
116157
targetPtr = std::numeric_limits<uint64_t>::max();
117158
}
118159

119-
bool buildVertexDocument(arangodb::velocypack::Builder& b, uint64_t const* ptr) const override {
120-
b.add(_resultField, arangodb::velocypack::Value(*ptr));
160+
bool buildVertexDocument(arangodb::velocypack::Builder& b, WCCValue const* ptr) const override {
161+
b.add(_resultField, arangodb::velocypack::Value(ptr->component));
121162
return true;
122163
}
123164
};
124-
}
165+
} // namespace
125166

126-
VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>>* WCC::createComputation(
167+
VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>>* WCC::createComputation(
127168
WorkerConfig const* config) const {
128169
return new ::WCCComputation();
129170
}
130171

131-
GraphFormat<uint64_t, uint64_t>* WCC::inputFormat() const {
172+
GraphFormat<WCCValue, uint64_t>* WCC::inputFormat() const {
132173
return new ::WCCGraphFormat(_server, _resultField);
133174
}
134175

arangod/Pregel/Algos/WCC.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,24 @@ namespace algos {
3434
/// vertex id along the edges to all vertices of a connected component. The
3535
/// number of supersteps necessary is equal to the length of the maximum
3636
/// diameter of all components + 1
37-
/// doesn't necessarily leads to a correct result on unidirected graphs
38-
struct WCC : public SimpleAlgorithm<uint64_t, uint64_t, SenderMessage<uint64_t>> {
37+
struct WCC : public SimpleAlgorithm<WCCValue, uint64_t, SenderMessage<uint64_t>> {
3938
public:
4039
explicit WCC(application_features::ApplicationServer& server, VPackSlice userParams)
4140
: SimpleAlgorithm(server, "WCC", userParams) {}
4241

4342
bool supportsAsyncMode() const override { return false; }
4443
bool supportsCompensation() const override { return false; }
4544

46-
GraphFormat<uint64_t, uint64_t>* inputFormat() const override;
45+
GraphFormat<WCCValue, uint64_t>* inputFormat() const override;
4746

4847
MessageFormat<SenderMessage<uint64_t>>* messageFormat() const override {
4948
return new SenderMessageFormat<uint64_t>();
5049
}
5150
MessageCombiner<SenderMessage<uint64_t>>* messageCombiner() const override {
5251
return nullptr;
5352
}
54-
VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>>* createComputation(WorkerConfig const*) const override;
53+
VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>>* createComputation(
54+
WorkerConfig const*) const override;
5555
};
5656
} // namespace algos
5757
} // namespace pregel

arangod/Pregel/CommonFormats.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ struct SCCValue {
112112
uint64_t color;
113113
};
114114

115+
struct WCCValue {
116+
uint64_t component;
117+
std::unordered_set<PregelID> inboundNeighbors;
118+
};
119+
115120
template <typename T>
116121
struct SenderMessage {
117122
SenderMessage() = default;

arangod/Pregel/GraphStore.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ template class arangodb::pregel::GraphStore<double, float>;
712712
template class arangodb::pregel::GraphStore<double, double>;
713713

714714
// specific algo combos
715+
template class arangodb::pregel::GraphStore<WCCValue, uint64_t>;
715716
template class arangodb::pregel::GraphStore<SCCValue, int8_t>;
716717
template class arangodb::pregel::GraphStore<ECValue, int8_t>;
717718
template class arangodb::pregel::GraphStore<HITSValue, int8_t>;

arangod/Pregel/Worker.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,7 @@ template class arangodb::pregel::Worker<double, float, double>;
832832

833833
// custom algorithm types
834834
template class arangodb::pregel::Worker<uint64_t, uint64_t, SenderMessage<uint64_t>>;
835+
template class arangodb::pregel::Worker<WCCValue, uint64_t, SenderMessage<uint64_t>>;
835836
template class arangodb::pregel::Worker<SCCValue, int8_t, SenderMessage<uint64_t>>;
836837
template class arangodb::pregel::Worker<HITSValue, int8_t, SenderMessage<double>>;
837838
template class arangodb::pregel::Worker<ECValue, int8_t, HLLCounter>;

0 commit comments

Comments
 (0)
0