8000 Bug fix/presupp 358 by mchacki · Pull Request #14404 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Bug fix/presupp 358 #14404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 30, 2021
Merged
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
devel
-----

* Bug-Fix: Pregel WCC algorithm could yield incorrect results if a
part of the connected component was only attached via OUTBOUND edges.
The underlying algorithm is now modified to properly retain INBOUND
edges for the runtime of the execution. This uses more RAM for the
algorithm but guarantees correctness.

* Updated ArangoDB Starter to 0.15.1-preview-1.

* Updated arangosync to 2.4.0.
Expand Down
149 changes: 95 additions & 54 deletions arangod/Pregel/Algos/WCC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,95 +40,136 @@ using namespace arangodb::pregel::algos;

namespace {

struct WCCComputation : public VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>> {
struct WCCComputation
: public VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>> {
WCCComputation() {}
void compute(MessageIterator<SenderMessage<uint64_t>> const& messages) override {
uint64_t currentComponent = vertexData();
bool shouldPropagate = selectMinimumOfLocalAndInput(messages);
// We need to propagate on first step
TRI_ASSERT(globalSuperstep() != 0 || shouldPropagate);

if (globalSuperstep() > 0) {
bool halt = true;
if (shouldPropagate) {
propagate();
}
// We can always stop.
// Every vertex will be awoken on
// input messages. If there are no input
// messages for us, we have the same ID
// as our neighbors.
voteHalt();
}

for (const SenderMessage<uint64_t>* msg : messages) {
if (msg->value < currentComponent) {
currentComponent = msg->value;
// TODO: optimization update the edge value if present
// problem: there might be loads of edges, could be expensive
}
private:
/**
* @brief Scan the input, compare it pairwise with our current value.
* We store the minimum into our current value.
* And return true, whenever there was a difference between input and our
* value. This difference indicates that the sender or this vertex are in
* different components if this vertex is off, we will send the new component
* to all our neighbors, if the other vertex is off, we will send our
* component back. Will always return true in the very first step, as this
* kicks of the algorithm and does not yet have input.
*/
bool selectMinimumOfLocalAndInput(MessageIterator<SenderMessage<uint64_t>> const& messages) {
// On first iteration, we need to propagate.
// Otherwise the default is to stay silent, unless some message
// sends a different component then us.
// Either the sender has a wrong component or we have.
bool shouldPropagate = globalSuperstep() == 0;

auto& myData = *mutableVertexData();
for (const SenderMessage<uint64_t>* msg : messages) {
if (globalSuperstep() == 1) {
// In the first step, we need to retain all inbound connections
// for propagation
myData.inboundNeighbors.emplace(msg->senderId);
}

SenderMessage<uint64_t> message(pregelId(), currentComponent);
for (const SenderMessage<uint64_t>* msg : messages) {
if (msg->value > currentComponent) {
TRI_ASSERT(msg->senderId != pregelId());
sendMessage(msg->senderId, message);
halt = false;
if (msg->value != myData.component) {
// we have a difference. Send updates
shouldPropagate = true;
if (msg->value < myData.component) {
// The other component is lower.
// We join this component
myData.component = msg->value;
}
}

if (currentComponent != vertexData()) {
*mutableVertexData() = currentComponent;
halt = false;
}

if (halt) {
voteHalt();
} else {
voteActive();
}
}

if (this->getEdgeCount() > 0) {
SenderMessage<uint64_t> message(pregelId(), currentComponent);
RangeIterator<Edge<uint64_t>> edges = this->getEdges();
for (; edges.hasMore(); ++edges) {
Edge<uint64_t>* edge = *edges;
if (edge->toKey() == this->key()) {
continue; // no need to send message to self
}

// remember the value we send
edge->data() = currentComponent;
return shouldPropagate;
}

sendMessage(edge, message);
/**
* @brief
* Send the current vertex data to all our neighbors, inbound
* and outbound.
* Store the component value in the outbound edges
*/
void propagate() {
auto const& myData = vertexData();
SenderMessage<uint64_t> message(pregelId(), myData.component);
// Send to OUTBOUND neighbors
RangeIterator<Edge<uint64_t>> edges = this->getEdges();
for (; edges.hasMore(); ++edges) {
Edge<uint64_t>* edge = *edges;
if (edge->toKey() == this->key()) {
continue; // no need to send message to self
}

// remember the value we send
// NOTE: I have done refactroing of the algorithm
// the original variant saved this, i do not know
// if it is actually relevant for anything.
edge->data() = myData.component;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove it. It is never accessed and Pregel can not write data into edges, I don't see what this should be good for.


sendMessage(edge, message);
}
// Also send to INBOUND neighbors
for (auto const& target : myData.inboundNeighbors) {
sendMessage(target, message);
}
}

private:
};

struct WCCGraphFormat final : public GraphFormat<uint64_t, uint64_t> {
struct WCCGraphFormat final : public GraphFormat<WCCValue, uint64_t> {
explicit WCCGraphFormat(application_features::ApplicationServer& server,
std::string const& result)
: GraphFormat<uint64_t, uint64_t>(server), _resultField(result) {}
std::string const& result)
: GraphFormat<WCCValue, uint64_t>(server), _resultField(result) {}

std::string const _resultField;

size_t estimatedVertexSize() const override { return sizeof(uint64_t); }

size_t estimatedVertexSize() const override {
// This is a very rough and guessed estimate.
// We need some space for the inbound connections,
// but we have not a single clue how many we will have
return sizeof(uint64_t) + 8 * sizeof(PregelID);
}
size_t estimatedEdgeSize() const override { return sizeof(uint64_t); }

void copyVertexData(arangodb::velocypack::Options const&, std::string const& /*documentId*/,
arangodb::velocypack::Slice /*document*/,
uint64_t& targetPtr, uint64_t& vertexIdRange) override {
targetPtr = vertexIdRange++;
WCCValue& targetPtr, uint64_t& vertexIdRange) override {
targetPtr.component = vertexIdRange++;
}

void copyEdgeData(arangodb::velocypack::Options const&,
arangodb::velocypack::Slice /*document*/, uint64_t& targetPtr) override {
targetPtr = std::numeric_limits<uint64_t>::max();
}

bool buildVertexDocument(arangodb::velocypack::Builder& b, uint64_t const* ptr) const override {
b.add(_resultField, arangodb::velocypack::Value(*ptr));
bool buildVertexDocument(arangodb::velocypack::Builder& b, WCCValue const* ptr) const override {
b.add(_resultField, arangodb::velocypack::Value(ptr->component));
return true;
}
};
}
} // namespace

VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>>* WCC::createComputation(
VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>>* WCC::createComputation(
WorkerConfig const* config) const {
return new ::WCCComputation();
}

GraphFormat<uint64_t, uint64_t>* WCC::inputFormat() const {
GraphFormat<WCCValue, uint64_t>* WCC::inputFormat() const {
return new ::WCCGraphFormat(_server, _resultField);
}

8 changes: 4 additions & 4 deletions arangod/Pregel/Algos/WCC.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,24 @@ namespace algos {
/// vertex id along the edges to all vertices of a connected component. The
/// number of supersteps necessary is equal to the length of the maximum
/// diameter of all components + 1
/// doesn't necessarily leads to a correct result on unidirected graphs
struct WCC : public SimpleAlgorithm<uint64_t, uint64_t, SenderMessage<uint64_t>> {
struct WCC : public SimpleAlgorithm<WCCValue, uint64_t, SenderMessage<uint64_t>> {
public:
explicit WCC(application_features::ApplicationServer& server, VPackSlice userParams)
: SimpleAlgorithm(server, "WCC", userParams) {}

bool supportsAsyncMode() const override { return false; }
bool supportsCompensation() const override { return false; }

GraphFormat<uint64_t, uint64_t>* inputFormat() const override;
GraphFormat<WCCValue, uint64_t>* inputFormat() const override;

MessageFormat<SenderMessage<uint64_t>>* messageFormat() const override {
return new SenderMessageFormat<uint64_t>();
}
MessageCombiner<SenderMessage<uint64_t>>* messageCombiner() const override {
return nullptr;
}
VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>>* createComputation(WorkerConfig const*) const override;
VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>>* createComputation(
WorkerConfig const*) const override;
};
} // namespace algos
} // namespace pregel
Expand Down
5 changes: 5 additions & 0 deletions arangod/Pregel/CommonFormats.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ struct SCCValue {
uint64_t color;
};

struct WCCValue {
uint64_t component;
std::unordered_set<PregelID> inboundNeighbors;
};

template <typename T>
struct SenderMessage {
SenderMessage() = default;
Expand Down
1 change: 1 addition & 0 deletions arangod/Pregel/GraphStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ template class arangodb::pregel::GraphStore<double, float>;
template class arangodb::pregel::GraphStore<double, double>;

// specific algo combos
template class arangodb::pregel::GraphStore<WCCValue, uint64_t>;
template class arangodb::pregel::GraphStore<SCCValue, int8_t>;
template class arangodb::pregel::GraphStore<ECValue, int8_t>;
template class arangodb::pregel::GraphStore<HITSValue, int8_t>;
Expand Down
1 change: 1 addition & 0 deletions arangod/Pregel/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ template class arangodb::pregel::Worker<double, float, double>;

// custom algorithm types
template class arangodb::pregel::Worker<uint64_t, uint64_t, SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<WCCValue, uint64_t, SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<SCCValue, int8_t, SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<HITSValue, int8_t, SenderMessage<double>>;
template class arangodb::pregel::Worker<ECValue, int8_t, HLLCounter>;
Expand Down
Loading
0