diff --git a/CHANGELOG b/CHANGELOG index 0139964cbaf1..d13dfedf5a45 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,12 @@ devel ----- +* Bug-Fix: Pregel WCC algorithm could yield incorrect results if a + part of the connected component was only attached via OUTBOUND edges. + The underlying algorithm is now modified to properly retain INBOUND + edges for the runtime of the execution. This uses more RAM for the + algorithm but guarantees correctness. + * Updated ArangoDB Starter to 0.15.1-preview-1. * Updated arangosync to 2.4.0. diff --git a/arangod/Pregel/Algos/WCC.cpp b/arangod/Pregel/Algos/WCC.cpp index 593f30febf48..886f9bdbd890 100644 --- a/arangod/Pregel/Algos/WCC.cpp +++ b/arangod/Pregel/Algos/WCC.cpp @@ -40,75 +40,116 @@ using namespace arangodb::pregel::algos; namespace { -struct WCCComputation : public VertexComputation> { +struct WCCComputation + : public VertexComputation> { WCCComputation() {} void compute(MessageIterator> const& messages) override { - uint64_t currentComponent = vertexData(); + bool shouldPropagate = selectMinimumOfLocalAndInput(messages); + // We need to propagate on first step + TRI_ASSERT(globalSuperstep() != 0 || shouldPropagate); - if (globalSuperstep() > 0) { - bool halt = true; + if (shouldPropagate) { + propagate(); + } + // We can always stop. + // Every vertex will be awoken on + // input messages. If there are no input + // messages for us, we have the same ID + // as our neighbors. + voteHalt(); + } - for (const SenderMessage* msg : messages) { - if (msg->value < currentComponent) { - currentComponent = msg->value; - // TODO: optimization update the edge value if present - // problem: there might be loads of edges, could be expensive - } + private: + /** + * @brief Scan the input, compare it pairwise with our current value. + * We store the minimum into our current value. + * And return true, whenever there was a difference between input and our + * value. This difference indicates that the sender or this vertex are in + * different components if this vertex is off, we will send the new component + * to all our neighbors, if the other vertex is off, we will send our + * component back. Will always return true in the very first step, as this + * kicks of the algorithm and does not yet have input. + */ + bool selectMinimumOfLocalAndInput(MessageIterator> const& messages) { + // On first iteration, we need to propagate. + // Otherwise the default is to stay silent, unless some message + // sends a different component then us. + // Either the sender has a wrong component or we have. + bool shouldPropagate = globalSuperstep() == 0; + + auto& myData = *mutableVertexData(); + for (const SenderMessage* msg : messages) { + if (globalSuperstep() == 1) { + // In the first step, we need to retain all inbound connections + // for propagation + myData.inboundNeighbors.emplace(msg->senderId); } - - SenderMessage message(pregelId(), currentComponent); - for (const SenderMessage* msg : messages) { - if (msg->value > currentComponent) { - TRI_ASSERT(msg->senderId != pregelId()); - sendMessage(msg->senderId, message); - halt = false; + if (msg->value != myData.component) { + // we have a difference. Send updates + shouldPropagate = true; + if (msg->value < myData.component) { + // The other component is lower. + // We join this component + myData.component = msg->value; } } - - if (currentComponent != vertexData()) { - *mutableVertexData() = currentComponent; - halt = false; - } - - if (halt) { - voteHalt(); - } else { - voteActive(); - } } - - if (this->getEdgeCount() > 0) { - SenderMessage message(pregelId(), currentComponent); - RangeIterator> edges = this->getEdges(); - for (; edges.hasMore(); ++edges) { - Edge* edge = *edges; - if (edge->toKey() == this->key()) { - continue; // no need to send message to self - } - - // remember the value we send - edge->data() = currentComponent; + return shouldPropagate; + } - sendMessage(edge, message); + /** + * @brief + * Send the current vertex data to all our neighbors, inbound + * and outbound. + * Store the component value in the outbound edges + */ + void propagate() { + auto const& myData = vertexData(); + SenderMessage message(pregelId(), myData.component); + // Send to OUTBOUND neighbors + RangeIterator> edges = this->getEdges(); + for (; edges.hasMore(); ++edges) { + Edge* edge = *edges; + if (edge->toKey() == this->key()) { + continue; // no need to send message to self } + + // remember the value we send + // NOTE: I have done refactroing of the algorithm + // the original variant saved this, i do not know + // if it is actually relevant for anything. + edge->data() = myData.component; + + sendMessage(edge, message); + } + // Also send to INBOUND neighbors + for (auto const& target : myData.inboundNeighbors) { + sendMessage(target, message); } } + + private: }; -struct WCCGraphFormat final : public GraphFormat { +struct WCCGraphFormat final : public GraphFormat { explicit WCCGraphFormat(application_features::ApplicationServer& server, - std::string const& result) - : GraphFormat(server), _resultField(result) {} - + std::string const& result) + : GraphFormat(server), _resultField(result) {} + std::string const _resultField; - - size_t estimatedVertexSize() const override { return sizeof(uint64_t); } + + size_t estimatedVertexSize() const override { + // This is a very rough and guessed estimate. + // We need some space for the inbound connections, + // but we have not a single clue how many we will have + return sizeof(uint64_t) + 8 * sizeof(PregelID); + } size_t estimatedEdgeSize() const override { return sizeof(uint64_t); } void copyVertexData(arangodb::velocypack::Options const&, std::string const& /*documentId*/, arangodb::velocypack::Slice /*document*/, - uint64_t& targetPtr, uint64_t& vertexIdRange) override { - targetPtr = vertexIdRange++; + WCCValue& targetPtr, uint64_t& vertexIdRange) override { + targetPtr.component = vertexIdRange++; } void copyEdgeData(arangodb::velocypack::Options const&, @@ -116,19 +157,19 @@ struct WCCGraphFormat final : public GraphFormat { targetPtr = std::numeric_limits::max(); } - bool buildVertexDocument(arangodb::velocypack::Builder& b, uint64_t const* ptr) const override { - b.add(_resultField, arangodb::velocypack::Value(*ptr)); + bool buildVertexDocument(arangodb::velocypack::Builder& b, WCCValue const* ptr) const override { + b.add(_resultField, arangodb::velocypack::Value(ptr->component)); return true; } }; -} +} // namespace -VertexComputation>* WCC::createComputation( +VertexComputation>* WCC::createComputation( WorkerConfig const* config) const { return new ::WCCComputation(); } -GraphFormat* WCC::inputFormat() const { +GraphFormat* WCC::inputFormat() const { return new ::WCCGraphFormat(_server, _resultField); } diff --git a/arangod/Pregel/Algos/WCC.h b/arangod/Pregel/Algos/WCC.h index bfc0b2cc2b3a..837ef4d3008c 100644 --- a/arangod/Pregel/Algos/WCC.h +++ b/arangod/Pregel/Algos/WCC.h @@ -34,8 +34,7 @@ namespace algos { /// vertex id along the edges to all vertices of a connected component. The /// number of supersteps necessary is equal to the length of the maximum /// diameter of all components + 1 -/// doesn't necessarily leads to a correct result on unidirected graphs -struct WCC : public SimpleAlgorithm> { +struct WCC : public SimpleAlgorithm> { public: explicit WCC(application_features::ApplicationServer& server, VPackSlice userParams) : SimpleAlgorithm(server, "WCC", userParams) {} @@ -43,7 +42,7 @@ struct WCC : public SimpleAlgorithm> bool supportsAsyncMode() const override { return false; } bool supportsCompensation() const override { return false; } - GraphFormat* inputFormat() const override; + GraphFormat* inputFormat() const override; MessageFormat>* messageFormat() const override { return new SenderMessageFormat(); @@ -51,7 +50,8 @@ struct WCC : public SimpleAlgorithm> MessageCombiner>* messageCombiner() const override { return nullptr; } - VertexComputation>* createComputation(WorkerConfig const*) const override; + VertexComputation>* createComputation( + WorkerConfig const*) const override; }; } // namespace algos } // namespace pregel diff --git a/arangod/Pregel/CommonFormats.h b/arangod/Pregel/CommonFormats.h index 94812a39e2e8..2ec4a756cf25 100644 --- a/arangod/Pregel/CommonFormats.h +++ b/arangod/Pregel/CommonFormats.h @@ -112,6 +112,11 @@ struct SCCValue { uint64_t color; }; +struct WCCValue { + uint64_t component; + std::unordered_set inboundNeighbors; +}; + template struct SenderMessage { SenderMessage() = default; diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 5dfc6909816a..2abd59ed84d3 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -712,6 +712,7 @@ template class arangodb::pregel::GraphStore; template class arangodb::pregel::GraphStore; // specific algo combos +template class arangodb::pregel::GraphStore; template class arangodb::pregel::GraphStore; template class arangodb::pregel::GraphStore; template class arangodb::pregel::GraphStore; diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index a9838bdad263..7411a493d1be 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -832,6 +832,7 @@ template class arangodb::pregel::Worker; // custom algorithm types template class arangodb::pregel::Worker>; +template class arangodb::pregel::Worker>; template class arangodb::pregel::Worker>; template class arangodb::pregel::Worker>; template class arangodb::pregel::Worker; diff --git a/tests/js/common/shell/shell-pregel-components.js b/tests/js/common/shell/shell-pregel-components.js index dbcfcd372c6d..d310fed597ca 100644 --- a/tests/js/common/shell/shell-pregel-components.js +++ b/tests/js/common/shell/shell-pregel-components.js @@ -269,5 +269,149 @@ function componentsTestSuite() { }; } +function wccRegressionTestSuite() { + 'use strict'; + + const makeEdge = (from, to) => { + return {_from: `${vColl}/${from}`, _to: `${vColl}/${to}`, vertex: `${from}`}; + }; + return { + + setUp: function() { + db._create(vColl, { numberOfShards: 4 }); + db._createEdgeCollection(eColl, { + numberOfShards: 4, + replicationFactor: 1, + shardKeys: ["vertex"], + distributeShardsLike: vColl + }); + + graph_module._create(graphName, [graph_module._relation(eColl, vColl, vColl)], []); + }, + + tearDown: function() { + graph_module._drop(graphName, true); + }, + + testWCCLineComponent: function() { + const vertices = []; + const edges = []; + + // We create one forward path 100 -> 120 (100 will be component ID, and needs to be propagated outbound) + for (let i = 100; i < 120; ++i) { + vertices.push({_key: `${i}`}); + if (i > 100) { + edges.push(makeEdge(i-1, i)); + } + } + + // We create one backward path 200 <- 220 (200 will be component ID, and needs to be propagated inbound) + for (let i = 200; i < 220; ++i) { + vertices.push({_key: `${i}`}); + if (i > 200) { + edges.push(makeEdge(i, i - 1)); + } + } + db[vColl].save(vertices); + db[eColl].save(edges); + + const pid = pregel.start("wcc", graphName, { resultField: "result", store: true }); + const maxWaitTimeSecs = 120; + const sleepIntervalSecs = 0.2; + let wakeupsLeft = maxWaitTimeSecs / sleepIntervalSecs; + while (pregel.status(pid).state !== "done" && wakeupsLeft > 0) { + wakeupsLeft--; + internal.sleep(0.2); + } + const status = pregel.status(pid); + assertEqual(status.state, "done", "Pregel Job did never succeed."); + + // Now test the result. + // We expect two components + const query = ` + FOR v IN ${vColl} + COLLECT component = v.result WITH COUNT INTO size + RETURN {component, size} + `; + const computedComponents = db._query(query).toArray(); + assertEqual(computedComponents.length, 2, `We expected 2 components instead got ${JSON.stringify(computedComponents)}`); + // Both have 20 elements + assertEqual(computedComponents[0].size, 20); + assertEqual(computedComponents[1].size, 20); + }, + + testWCCLostBackwardsConnection: function() { + const vertices = []; + const edges = []; + + /* + * This test's background is a bit tricky and tests a deep technical + * detail. + * The idea in this test is the following: + * We have two Subgroups A and B where there are only OUTBOUND + * Edges from A to B. + * In total and ignoring those edges A has a higher ComponentID then B. + * Now comes the Tricky Part: + * 1) The Smallest ID in B, needs to have a a long Distance to all contact points to A. + * 2) The ContactPoints A -> B need to have the second Smallest IDs + * This now creates the following Effect: + * The contact points only communitcate in the beginning, as they will not see smaller IDs + * until the SmallestID arrives. + * In the implementation showing this Bug, the INBOUND connection was not retained, + * so as soon as the Smallest ID arrived at B it was not communicated back into the A + * Cluster, which resulted in two components instead of a single one. + */ + + // We create 20 vertices + for (let i = 1; i <= 20; ++i) { + vertices.push({_key: `${i}`}); + } + // By convention the lowest keys will have the Lowest ids. + // We need to pick two second lowest Vertices as the ContactPoints of our two clusters: + edges.push(makeEdge(2, 3)); + // Generate the A side + edges.push(makeEdge(4,2)); + edges.push(makeEdge(4,5)); + edges.push(makeEdge(5,2)); + // Generate the B side with a Long path to 3 + // A path 6->7->...->20 (the 6 is already low, so not much updates happing) + for (let i = 7; i <= 20; ++i) { + edges.push(makeEdge(i - 1, i)); + } + // Now connect 3 -> 6 and 20 -> 1 + edges.push(makeEdge(3, 6)); + edges.push(makeEdge(20, 1)); + + db[vColl].save(vertices); + db[eColl].save(edges); + + const pid = pregel.start("wcc", graphName, { resultField: "result", store: true }); + const maxWaitTimeSecs = 120; + const sleepIntervalSecs = 0.2; + let wakeupsLeft = maxWaitTimeSecs / sleepIntervalSecs; + while (pregel.status(pid).state !== "done" && wakeupsLeft > 0) { + wakeupsLeft--; + internal.sleep(0.2); + } + const status = pregel.status(pid); + assertEqual(status.state, "done", "Pregel Job did never succeed."); + + // Now test the result. + // We expect two components + const query = ` + FOR v IN ${vColl} + COLLECT component = v.result WITH COUNT INTO size + RETURN {component, size} + `; + const computedComponents = db._query(query).toArray(); + assertEqual(computedComponents.length, 1, `We expected 1 component instead got ${JSON.stringify(computedComponents)}`); + // we have all 20 elements + assertEqual(computedComponents[0].size, 20); + }, + + }; +} + jsunity.run(componentsTestSuite); +jsunity.run(wccRegressionTestSuite); return jsunity.done(); diff --git a/tests/js/common/shell/shell-pregel-mulicollection-edgecol.js b/tests/js/common/shell/shell-pregel-mulicollection-edgecol.js index 3c189bf508b3..fc46e95c701e 100644 --- a/tests/js/common/shell/shell-pregel-mulicollection-edgecol.js +++ b/tests/js/common/shell/shell-pregel-mulicollection-edgecol.js @@ -228,7 +228,7 @@ function multiCollectionTestSuite() { internal.sleep(0.2); let stats = pregel.status(pid); if (stats.state !== "running" && stats.state !== "storing") { - assertEqual(500, stats.gss); + assertTrue( stats.gss <= 25); assertEqual(stats.vertexCount, numComponents * n, stats); assertEqual(stats.edgeCount, numComponents * (m + n), stats); assertEqual(parallelism, stats.parallelism); @@ -262,7 +262,7 @@ function multiCollectionTestSuite() { internal.sleep(0.2); let stats = pregel.status(pid); if (stats.state !== "running" && stats.state !== "storing") { - assertEqual(500, stats.gss); + assertTrue(stats.gss <= 25); assertEqual(stats.vertexCount, numComponents * n, stats); assertEqual(stats.edgeCount, numComponents * (m + n), stats); assertEqual(parallelism, stats.parallelism);