8000 [PREGEL] Split Graph.h, make PregelShard its own struct (#17963) · olegrok/arangodb@2abee2f · GitHub
[go: up one dir, main page]

Skip to content

Commit 2abee2f

Browse files
authored
[PREGEL] Split Graph.h, make PregelShard its own struct (arangodb#17963)
* Provide test binary for Pregel * move Graph.h to its own directory * Split Graph.h into Vertex.h/VertexID.h/Edge.h * Split out PregelShard, too * Change header * Rename Graph/ to GraphStore/ to address ambiguities * Rename Pregel/Graph/ to Pregel/GraphFormat to avoid ambiguities * Rename PregelID to VertexID * Introduce PregelShard as its own struct * Fix some complaints of the MSVC compiler * Fix some complaints of the MSVC compiler * clang-format
1 parent dde57d7 commit 2abee2f

26 files changed

+392
-152
lines changed

arangod/Pregel/Algos/DMID/DMID.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ struct DMIDComputation
178178
void superstep1(MessageIterator<DMIDMessage> const& messages) {
179179
float weightedInDegree = 0.0;
180180
/** vertices that need a reply containing this vertexs weighted indegree */
181-
std::unordered_set<PregelID> predecessors;
181+
std::unordered_set<VertexID> predecessors;
182182

183183
for (DMIDMessage const* message : messages) {
184184
/**
@@ -195,7 +195,7 @@ struct DMIDComputation
195195

196196
// send weighted degree to all predecessors
197197
DMIDMessage message(pregelId(), weightedInDegree);
198-
for (PregelID const& pid : predecessors) {
198+
for (VertexID const& pid : predecessors) {
199199
sendMessage(pid, message);
200200
}
201201
}
@@ -266,7 +266,7 @@ struct DMIDComputation
266266
*/
267267
/** (corresponds to vector matrix multiplication R^1xN * R^NxN) */
268268
double newEntryDA = 0.0;
269-
curDA->forEach([&](PregelID const& _id, double entry) {
269+
curDA->forEach([&](VertexID const& _id, double entry) {
270270
auto const& it = vertexState->disCol.find(_id);
271271
if (it != vertexState->disCol.end()) { // sparse vector in the original
272272
newEntryDA += entry * it->second;
@@ -318,7 +318,7 @@ struct DMIDComputation
318318
VertexSumAggregator const* vecLS =
319319
(VertexSumAggregator*)getReadAggregator(LS_AGG);
320320
for (DMIDMessage const* message : messages) {
321-
PregelID senderID = message->senderId;
321+
VertexID senderID = message->senderId;
322322
/** Weight= weightedInDegree */
323323

324324
float senderWeight = message->weight;
@@ -373,7 +373,7 @@ struct DMIDComputation
373373
float maxInfValue = 0;
374374

375375
/** Set of possible local leader for this vertex. Contains VertexID's */
376-
std::set<PregelID> leaderSet;
376+
std::set<VertexID> leaderSet;
377377

378378
/** Find possible local leader */
379379
for (DMIDMessage const* message : messages) {
@@ -394,7 +394,7 @@ struct DMIDComputation
394394
double leaderInit = 1.0 / leaderSet.size();
395395
VertexSumAggregator* vecFD =
396396
(VertexSumAggregator*)getWriteAggregator(FD_AGG);
397-
for (PregelID const& _id : leaderSet) {
397+
for (VertexID const& _id : leaderSet) {
398398
vecFD->aggregate(_id.shard, _id.key, leaderInit);
399399
}
400400
}
@@ -473,7 +473,7 @@ struct DMIDComputation
473473
* specific communities
474474
*/
475475
for (DMIDMessage const* message : messages) {
476-
PregelID const leaderID = message->leaderId;
476+
VertexID const leaderID = message->leaderId;
477477
/**
478478
* send a message back with the same double entry if this vertex is
479479
* part of this specific community
@@ -505,7 +505,7 @@ struct DMIDComputation
505505
if (it == vertexState->membershipDegree.end()) { // no
506506
//! vertex.getValue().getMembershipDegree().containsKey(vertexID)
507507
/** counts per communities the number of successors which are member */
508-
std::map<PregelID, float> membershipCounter;
508+
std::map<VertexID, float> membershipCounter;
509509
// double previousCount = 0.0;
510510

511511
for (DMIDMessage const* message : messages) {
@@ -514,7 +514,7 @@ struct DMIDComputation
514514
* member of
515515
*/
516516
// Long leaderID = ((long) msg.getValue());
517-
PregelID const& leaderID = message->leaderId;
517+
VertexID const& leaderID = message->leaderId;
518518
// .containsKey(leaderID)
519519
if (membershipCounter.find(leaderID) != membershipCounter.end()) {
520520
/** increase count by 1 */
@@ -566,9 +566,9 @@ struct DMIDComputation
566566
VertexSumAggregator const* vecGL =
567567
(VertexSumAggregator*)getReadAggregator(GL_AGG);
568568
// DoubleSparseVector vecGL = getAggregatedValue(GL_AGG);
569-
// std::map<PregelID, float> newMemDeg;
569+
// std::map<VertexID, float> newMemDeg;
570570

571-
vecGL->forEach([&](PregelID const& _id, double entry) {
571+
vecGL->forEach([&](VertexID const& _id, double entry) {
572572
if (entry != 0.0) {
573573
/** is entry _id a global leader?*/
574574
if (_id == this->pregelId()) {
@@ -620,13 +620,13 @@ struct DMIDGraphFormat : public GraphFormat<DMIDValue, float> {
620620
bool buildVertexDocument(arangodb::velocypack::Builder& b,
621621
DMIDValue const* ptr) const override {
622622
if (ptr->membershipDegree.size() > 0) {
623-
std::vector<std::pair<PregelID, float>> communities;
624-
for (std::pair<PregelID, float> pair : ptr->membershipDegree) {
623+
std::vector<std::pair<VertexID, float>> communities;
624+
for (std::pair<VertexID, float> pair : ptr->membershipDegree) {
625625
communities.push_back(pair);
626626
}
627627
std::sort(
628628
communities.begin(), communities.end(),
629-
[ptr](std::pair<PregelID, float> a, std::pair<PregelID, float> b) {
629+
[ptr](std::pair<VertexID, float> a, std::pair<VertexID, float> b) {
630630
return ptr->membershipDegree.at(a.first) >
631631
ptr->membershipDegree.at(b.first);
632632
});
@@ -650,7 +650,7 @@ struct DMIDGraphFormat : public GraphFormat<DMIDValue, float> {
650650
b.close();
651651
/*unsigned i = _maxCommunities;
652652
b.add(_resultField, VPackValue(VPackValueType::Object));
653-
for (std::pair<PregelID, float> const& pair : ptr->membershipDegree) {
653+
for (std::pair<VertexID, float> const& pair : ptr->membershipDegree) {
654654
b.add(pair.first.key, VPackValue(pair.second));
655655
if (--i == 0) {
656656
break;
@@ -735,14 +735,14 @@ struct DMIDMasterContext : public MasterContext {
735735

736736
LOG_TOPIC("db510", INFO, Logger::PREGEL)
737737
<< "Aggregator DA at step: " << globalSuperstep();
738-
convergedDA->forEach([&](PregelID const& _id, double entry) {
738+
convergedDA->forEach([&](VertexID const& _id, double entry) {
739739
LOG_TOPIC("df98d", INFO, Logger::PREGEL) << _id.key;
740740
});
741741
}
742742
if (globalSuperstep() == RW_ITERATIONBOUND + 6) {
743743
VertexSumAggregator* leadershipVector =
744744
getAggregator<VertexSumAggregator>(LS_AGG);
745-
leadershipVector->forEach([&](PregelID const& _id, double entry) {
745+
leadershipVector->forEach([&](VertexID const& _id, double entry) {
746746
LOG_TOPIC("c82d2", INFO, Logger::PREGEL)
747747
<< "Aggregator LS:" << _id.key;
748748
});
@@ -762,7 +762,7 @@ struct DMIDMasterContext : public MasterContext {
762762
double averageFD = 0.0;
763763
int numLocalLeader = 0;
764764
/** get averageFollower degree */
765-
vecFD->forEach([&](PregelID const& _id, double entry) {
765+
vecFD->forEach([&](VertexID const& _id, double entry) {
766766
averageFD += entry;
767767
if (entry != 0) {
768768
numLocalLeader++;
@@ -773,7 +773,7 @@ struct DMIDMasterContext : public MasterContext {
773773
averageFD = (double)averageFD / numLocalLeader;
774774
}
775775
/** set flag for globalLeader */
776-
vecFD->forEach([&](PregelID const& _id, double entry) {
776+
vecFD->forEach([&](VertexID const& _id, double entry) {
777777
if (entry > averageFD) {
778778
initGL->aggregate(_id.shard, _id.key, 1.0);
779779
LOG_TOPIC("a3665", INFO, Logger::PREGEL) << "Global Leader " << _id.key;

arangod/Pregel/Algos/DMID/DMIDMessageFormat.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
#pragma once
2929

3030
#include "Pregel/CommonFormats.h"
31-
#include "Pregel/Graph.h"
31+
#include "Pregel/GraphStore/Graph.h"
3232
#include "Pregel/GraphFormat.h"
3333
#include "Pregel/MessageFormat.h"
3434

@@ -39,9 +39,11 @@ struct DMIDMessageFormat : public MessageFormat<DMIDMessage> {
3939
DMIDMessageFormat() {}
4040
void unwrapValue(VPackSlice s, DMIDMessage& message) const override {
4141
VPackArrayIterator array(s);
42-
message.senderId.shard = (PregelShard)((*array).getUInt());
42+
message.senderId.shard =
43+
PregelShard(static_cast<PregelShard::value_type>((*array).getUInt()));
4344
message.senderId.key = (*(++array)).copyString();
44-
message.leaderId.shard = (PregelShard)(*array).getUInt();
45+
message.leaderId.shard =
46+
PregelShard(static_cast<PregelShard::value_type>((*array).getUInt()));
4547
message.leaderId.key = (*(++array)).copyString();
4648
message.weight = (*(++array)).getNumber<float>();
4749
}

arangod/Pregel/Algos/DMID/VertexSumAggregator.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
#include <velocypack/Iterator.h>
2828
#include <velocypack/Slice.h> 1241
2929

30-
#include "Pregel/Graph.h"
30+
#include "Pregel/GraphStore/Graph.h"
3131

3232
#include <map>
3333
#include <string>
@@ -57,7 +57,8 @@ struct VertexSumAggregator : public IAggregator {
5757

5858
void parseAggregate(VPackSlice const& slice) override {
5959
for (auto const& pair : VPackObjectIterator(slice)) {
60-
PregelShard shard = std::stoi(pair.key.copyString());
60+
auto shardId = std::stoi(pair.key.copyString());
61+
auto shard = PregelShard(static_cast<PregelShard::value_type>(shardId));
6162
std::string key;
6263
VPackValueLength i = 0;
6364
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
@@ -75,7 +76,8 @@ struct VertexSumAggregator : public IAggregator {
7576

7677
void setAggregatedValue(VPackSlice const& slice) override {
7778
for (auto const& pair : VPackObjectIterator(slice)) {
78-
PregelShard shard = std::stoi(pair.key.copyString());
79+
auto shardId = std::stoi(pair.key.copyString());
80+
auto shard = PregelShard(static_cast<PregelShard::value_type>(shardId));
7981
std::string key;
8082
VPackValueLength i = 0;
8183
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
@@ -92,7 +94,7 @@ struct VertexSumAggregator : public IAggregator {
9294
void serialize(std::string const& key, VPackBuilder& builder) const override {
9395
builder.add(key, VPackValue(VPackValueType::Object));
9496
for (auto const& pair1 : _entries) {
95-
builder.add(std::to_string(pair1.first),
97+
builder.add(std::to_string(pair1.first.value),
9698
VPackValue(VPackValueType::Array));
9799
for (auto const& pair2 : pair1.second) {
98100
builder.add(VPackValuePair(pair2.first.data(), pair2.first.size(),
@@ -132,12 +134,12 @@ struct VertexSumAggregator : public IAggregator {
132134
void aggregateDefaultValue(double empty) { _default += empty; }
133135

134136
void forEach(
135-
std::function<void(PregelID const& _id, double value)> func) const {
137+
std::function<void(VertexID const& _id, double value)> func) const {
136138
for (auto const& pair : _entries) {
137139
PregelShard shard = pair.first;
138140
std::unordered_map<std::string, double> const& vertexMap = pair.second;
139141
for (auto const& vertexMessage : vertexMap) {
140-
func(PregelID(shard, vertexMessage.first), vertexMessage.second);
142+
func(VertexID(shard, vertexMessage.first), vertexMessage.second);
141143
}
142144
}
143145
}

arangod/Pregel/Algos/EffectiveCloseness/HLLCounter.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
#include <cmath>
2525
#include "Basics/fasthash.h"
2626
#include "Pregel/CommonFormats.h"
27-
#include "Pregel/Graph.h"
27+
#include "Pregel/GraphStore/Graph.h"
2828

2929
using namespace arangodb::pregel;
3030

@@ -54,15 +54,15 @@ inline uint8_t _get_leading_zero_count(uint32_t x, uint8_t b) {
5454
#define _GET_CLZ(x, b) _get_leading_zero_count(x, b)
5555
#endif /* defined(__GNUC__) */
5656

57-
static uint32_t hashPregelId(PregelID const& pregelId) {
57+
static uint32_t hashPregelId(VertexID const& pregelId) {
5858
uint32_t h1 =
5959
fasthash32(pregelId.key.data(), pregelId.key.length(), 0xf007ba11UL);
60-
uint64_t h2 = fasthash64_uint64(pregelId.shard, 0xdefec7edUL);
60+
uint64_t h2 = fasthash64_uint64(pregelId.shard.value, 0xdefec7edUL);
6161
uint32_t h3 = (uint32_t)(h2 - (h2 >> 32));
6262
return h1 ^ (h3 << 1);
6363
}
6464

65-
void HLLCounter::addNode(PregelID const& pregelId) {
65+
void HLLCounter::addNode(VertexID const& pregelId) {
6666
uint32_t hashid = hashPregelId(pregelId);
6767
// last 6 bits as bucket index
6868
uint32_t index = hashid >> (32 - 6);

arangod/Pregel/Algos/EffectiveCloseness/HLLCounterFormat.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
#pragma once
2525

2626
#include "Pregel/CommonFormats.h"
27-
#include "Pregel/Graph.h"
27+
#include "Pregel/GraphStore/Graph.h"
2828
#include "Pregel/GraphFormat.h"
2929
#include "Pregel/MessageCombiner.h"
3030
#include "Pregel/MessageFormat.h"

arangod/Pregel/Algos/HITS.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ struct HITSComputation
8686
aggregate<double>(kHubNorm, auth * auth);
8787

8888
// no sender required, the senders have an outgoing edge to us
89-
SenderMessage<double> authData(PregelID(), auth);
89+
SenderMessage<double> authData(VertexID(), auth);
9090
for (SenderMessage<double> const* message : messages) {
9191
if (message->senderId.isValid()) { // send to incoming Neighbors
9292
sendMessage(message->senderId, authData);

arangod/Pregel/Algos/SCC.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ struct SCCComputation
119119
if (vertexState->vertexID == vertexState->color) {
120120
SenderMessage<uint64_t> message(pregelId(), vertexState->color);
121121
// sendMessageToAllParents
122-
for (PregelID const& pid : vertexState->parents) {
122+
for (VertexID const& pid : vertexState->parents) {
123123
sendMessage(pid, message); // todo: if the parent was deactivated
124124
// this reactivates it in the
125125
// refactored Pregel. Change this.
@@ -132,7 +132,7 @@ struct SCCComputation
132132
case SCCPhase::BACKWARD_TRAVERSAL_REST: {
133133
for (SenderMessage<uint64_t> const* msg : messages) {
134134
if (vertexState->color == msg->value) {
135-
for (PregelID const& pid : vertexState->parents) {
135+
for (VertexID const& pid : vertexState->parents) {
136136
sendMessage(pid, *msg);
137137
}
138138
aggregate(kConverged, true);

arangod/Pregel/Algos/ShortestPath.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ using namespace arangodb::pregel::algos;
3636
static std::string const spUpperPathBound = "bound";
3737

3838
struct SPComputation : public VertexComputation<int64_t, int64_t, int64_t> {
39-
PregelID _target;
39+
VertexID _target;
4040

41-
explicit SPComputation(PregelID const& target) : _target(target) {}
41+
explicit SPComputation(VertexID const& target) : _target(target) {}
4242
void compute(MessageIterator<int64_t> const& messages) override {
4343
int64_t current = vertexData();
4444
for (const int64_t* msg : messages) {
@@ -119,7 +119,7 @@ GraphFormat<int64_t, int64_t>* ShortestPathAlgorithm::inputFormat() const {
119119

120120
VertexComputation<int64_t, int64_t, int64_t>*
121121
ShortestPathAlgorithm::createComputation(WorkerConfig const* _config) const {
122-
PregelID target = _config->documentIdToPregel(_target);
122+
VertexID target = _config->documentIdToPregel(_target);
123123
return new SPComputation(target);
124124
}
125125

arangod/Pregel/Algos/WCC.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ struct WCCGraphFormat final : public GraphFormat<WCCValue, uint64_t> {
144144
// This is a very rough and guessed estimate.
145145
// We need some space for the inbound connections,
146146
// but we have not a single clue how many we will have
147-
return sizeof(uint64_t) + 8 * sizeof(PregelID);
147+
return sizeof(uint64_t) + 8 * sizeof(VertexID);
148148
}
149149
size_t estimatedEdgeSize() const override { return sizeof(uint64_t); }
150150

0 commit comments

Comments
 (0)
0