10000 Bug fix/presupp 358 (#14404) by mchacki · Pull Request #14436 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Bug fix/presupp 358 (#14404) #14436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Bug fix/presupp 358 (#14404)
  • Loading branch information
mchacki committed Jul 1, 2021
commit 3e5274ac28f92b7e3b9d253da9a9c13bbb5dbb4c
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
v3.8.0 (XXXX-XX-XX)
-------------------

* Bug-Fix: Pregel WCC algorithm could yield incorrect results if a
part of the connected component was only attached via OUTBOUND edges.
The underlying algorithm is now modified to properly retain INBOUND
edges for the runtime of the execution. This uses more RAM for the
algorithm but guarantees correctness.

* Updated arangosync to 2.4.0.

* For cluster AQL queries, let the coordinator determine the query id to be used
Expand Down
149 changes: 95 additions & 54 deletions arangod/Pregel/Algos/WCC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,95 +40,136 @@ using namespace arangodb::pregel::algos;

namespace {

struct WCCComputation : public VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>> {
struct WCCComputation
: public VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>> {
WCCComputation() {}
void compute(MessageIterator<SenderMessage<uint64_t>> const& messages) override {
uint64_t currentComponent = vertexData();
bool shouldPropagate = selectMinimumOfLocalAndInput(messages);
// We need to propagate on first step
TRI_ASSERT(globalSuperstep() != 0 || shouldPropagate);

if (globalSuperstep() > 0) {
bool halt = true;
if (shouldPropagate) {
propagate();
}
// We can always stop.
// Every vertex will be awoken on
// input messages. If there are no input
// messages for us, we have the same ID
// as our neighbors.
voteHalt();
}

for (const SenderMessage<uint64_t>* msg : messages) {
if (msg->value < currentComponent) {
currentComponent = msg->value;
// TODO: optimization update the edge value if present
// problem: there might be loads of edges, could be expensive
}
private:
/**
* @brief Scan the input, compare it pairwise with our current value.
* We store the minimum into our current value.
* And return true, whenever there was a difference between input and our
* value. This difference indicates that the sender or this vertex are in
* different components if this vertex is off, we will send the new component
* to all our neighbors, if the other vertex is off, we will send our
* component back. Will always return true in the very first step, as this
* kicks of the algorithm and does not yet have input.
*/
bool selectMinimumOfLocalAndInput(MessageIterator<SenderMessage<uint64_t>> const& messages) {
// On first iteration, we need to propagate.
// Otherwise the default is to stay silent, unless some message
// sends a different component then us.
// Either the sender has a wrong component or we have.
bool shouldPropagate = globalSuperstep() == 0;

auto& myData = *mutableVertexData();
for (const SenderMessage<uint64_t>* msg : messages) {
if (globalSuperstep() == 1) {
// In the first step, we need to retain all inbound connections
// for propagation
myData.inboundNeighbors.emplace(msg->senderId);
}

SenderMessage<uint64_t> message(pregelId(), currentComponent);
for (const SenderMessage<uint64_t>* msg : messages) {
if (msg->value > currentComponent) {
TRI_ASSERT(msg->senderId != pregelId());
sendMessage(msg->senderId, message);
halt = false;
if (msg->value != myData.component) {
// we have a difference. Send updates
shouldPropagate = true;
if (msg->value < myData.component) {
// The other component is lower.
// We join this component
myData.component = msg->value;
}
}

if (currentComponent != vertexData()) {
*mutableVertexData() = currentComponent;
halt = false;
}

if (halt) {
voteHalt();
} else {
voteActive();
}
}

if (this->getEdgeCount() > 0) {
SenderMessage<uint64_t> message(pregelId(), currentComponent);
RangeIterator<Edge<uint64_t>> edges = this->getEdges();
for (; edges.hasMore(); ++edges) {
Edge<uint64_t>* edge = *edges;
if (edge->toKey() == this->key()) {
continue; // no need to send message to self
}

// remember the value we send
edge->data() = currentComponent;
return shouldPropagate;
}

sendMessage(edge, message);
/**
* @brief
* Send the current vertex data to all our neighbors, inbound
* and outbound.
* Store the component value in the outbound edges
*/
void propagate() {
auto const& myData = vertexData();
SenderMessage<uint64_t> message(pregelId(), myData.component);
// Send to OUTBOUND neighbors
RangeIterator<Edge<uint64_t>> edges = this->getEdges();
for (; edges.hasMore(); ++edges) {
Edge<uint64_t>* edge = *edges;
if (edge->toKey() == this->key()) {
continue; // no need to send message to self
}

// remember the value we send
// NOTE: I have done refactroing of the algorithm
// the original variant saved this, i do not know
// if it is actually 8000 relevant for anything.
edge->data() = myData.component;

sendMessage(edge, message);
}
// Also send to INBOUND neighbors
for (auto const& target : myData.inboundNeighbors) {
sendMessage(target, message);
}
}

private:
};

struct WCCGraphFormat final : public GraphFormat<uint64_t, uint64_t> {
struct WCCGraphFormat final : public GraphFormat<WCCValue, uint64_t> {
explicit WCCGraphFormat(application_features::ApplicationServer& server,
std::string const& result)
: GraphFormat<uint64_t, uint64_t>(server), _resultField(result) {}
std::string const& result)
: GraphFormat<WCCValue, uint64_t>(server), _resultField(result) {}

std::string const _resultField;

size_t estimatedVertexSize() const override { return sizeof(uint64_t); }

size_t estimatedVertexSize() const override {
// This is a very rough and guessed estimate.
// We need some space for the inbound connections,
// but we have not a single clue how many we will have
return sizeof(uint64_t) + 8 * sizeof(PregelID);
}
size_t estimatedEdgeSize() const override { return sizeof(uint64_t); }

void copyVertexData(arangodb::velocypack::Options const&, std::string const& /*documentId*/,
arangodb::velocypack::Slice /*document*/,
uint64_t& targetPtr, uint64_t& vertexIdRange) override {
targetPtr = vertexIdRange++;
WCCValue& targetPtr, uint64_t& vertexIdRange) override {
targetPtr.component = vertexIdRange++;
}

void copyEdgeData(arangodb::velocypack::Options const&,
arangodb::velocypack::Slice /*document*/, uint64_t& targetPtr) override {
targetPtr = std::numeric_limits<uint64_t>::max();
}

bool buildVertexDocument(arangodb::velocypack::Builder& b, uint64_t const* ptr) const override {
b.add(_resultField, arangodb::velocypack::Value(*ptr));
bool buildVertexDocument(arangodb::velocypack::Builder& b, WCCValue const* ptr) const override {
b.add(_resultField, arangodb::velocypack::Value(ptr->component));
return true;
}
};
}
} // namespace

VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>>* WCC::createComputation(
VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>>* WCC::createComputation(
WorkerConfig const* config) const {
return new ::WCCComputation();
}

GraphFormat<uint64_t, uint64_t>* WCC::inputFormat() const {
GraphFormat<WCCValue, uint64_t>* WCC::inputFormat() const {
return new ::WCCGraphFormat(_server, _resultField);
}

8 changes: 4 additions & 4 deletions arangod/Pregel/Algos/WCC.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,24 @@ namespace algos {
/// vertex id along the edges to all vertices of a connected component. The
/// number of supersteps necessary is equal to the length of the maximum
/// diameter of all components + 1
/// doesn't necessarily leads to a correct result on unidirected graphs
struct WCC : public SimpleAlgorithm<uint64_t, uint64_t, SenderMessage<uint64_t>> {
struct WCC : public SimpleAlgorithm<WCCValue, uint64_t, SenderMessage<uint64_t>> {
public:
explicit WCC(application_features::ApplicationServer& server, VPackSlice userParams)
: SimpleAlgorithm(server, "WCC", userParams) {}

bool supportsAsyncMode() const override { return false; }
bool supportsCompensation() const override { return false; }

GraphFormat<uint64_t, uint64_t>* inputFormat() const override;
GraphFormat<WCCValue, uint64_t>* inputFormat() const override;

MessageFormat<SenderMessage<uint64_t>>* messageFormat() const override {
return new SenderMessageFormat<uint64_t>();
}
MessageCombiner<SenderMessage<uint64_t>>* messageCombiner() const override {
return nullptr;
}
VertexComputation<uint64_t, uint64_t, SenderMessage<uint64_t>>* createComputation(WorkerConfig const*) const override;
VertexComputation<WCCValue, uint64_t, SenderMessage<uint64_t>>* createComputation(
WorkerConfig const*) const override;
};
} // namespace algos
} // namespace pregel
Expand Down
5 changes: 5 additions & 0 deletions arangod/Pregel/CommonFormats.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ struct SCCValue {
uint64_t color;
};

struct WCCValue {
uint64_t component;
std::unordered_set<PregelID> inboundNeighbors;
};

template <typename T>
struct SenderMessage {
SenderMessage() = default;
Expand Down
1 change: 1 addition & 0 deletions arangod/Pregel/GraphStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ template class arangodb::pregel::GraphStore<double, float>;
template class arangodb::pregel::GraphStore<double, double>;

// specific algo combos
template class arangodb::pregel::GraphStore<WCCValue, uint64_t>;
template class arangodb::pregel::GraphStore<SCCValue, int8_t>;
template class arangodb::pregel::GraphStore<ECValue, int8_t>;
template class arangodb::pregel::GraphStore<HITSValue, int8_t>;
Expand Down
3 changes: 1 addition & 2 deletions arangod/Pregel/Worker-templates-algorithms.cpp
7FC7
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

// custom algorithm types
template class arangodb::pregel::Worker<uint64_t, uint64_t, SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<WCCValue, uint64_t, SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<SCCValue, int8_t, SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<HITSValue, int8_t, SenderMessage<double>>;
template class arangodb::pregel::Worker<ECValue, int8_t, HLLCounter>;
template class arangodb::pregel::Worker<DMIDValue, float, DMIDMessage>;
template class arangodb::pregel::Worker<LPValue, int8_t, uint64_t>;
template class arangodb::pregel::Worker<SLPAValue, int8_t, uint64_t>;



using namespace arangodb::pregel::algos::accumulators;
template class arangodb::pregel::Worker<VertexData, EdgeData, MessageData>;

Loading
0