8000 Fix Pregel Graph Loading Logic · arangodb/arangodb@f938c3e · GitHub
[go: up one dir, main page]

Skip to content

Commit f938c3e

Browse files
committed
Fix Pregel Graph Loading Logic
1 parent 61da339 commit f938c3e

File tree

4 files changed

+56
-24
lines changed

4 files changed

+56
-24
lines changed

arangod/Pregel/GraphStore.cpp

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ GraphStore<V, E>::~GraphStore() {
8484
delete _edges;
8585
}
8686

87+
static const char* shardError = "Collections need to have the same number of shards"
88+
" use distributeShardsLike";
89+
8790
template <typename V, typename E>
8891
std::map<CollectionID, std::vector<VertexShardInfo>>
8992
GraphStore<V, E>::_allocateSpace() {
@@ -98,22 +101,32 @@ std::map<CollectionID, std::vector<VertexShardInfo>>
98101
LOG_TOPIC(DEBUG, Logger::PREGEL) << "Allocating memory";
99102
uint64_t totalMemory = TRI_totalSystemMemory();
100103

104+
// Contains the shards located on this db server in the right order
105+
// assuming edges are sharded after _from, vertices after _key
106+
// then every ith vertex shard has the corresponding edges in
107+
// the ith edge shard
101108
std::map<CollectionID, std::vector<ShardID>> const& vertexCollMap =
102109
_config->vertexCollectionShards();
103110
std::map<CollectionID, std::vector<ShardID>> const& edgeCollMap =
104111
_config->edgeCollectionShards();
112+
size_t numShards = SIZE_T_MAX;
105113

106114
// Allocating some memory
107115
uint64_t vCount = 0;
108116
uint64_t eCount = 0;
109117
for (auto const& pair : vertexCollMap) {
110118
std::vector<ShardID> const& vertexShards = pair.second;
119+
if (numShards == SIZE_T_MAX) {
120+
numShards = vertexShards.size();
121+
} else if (numShards != vertexShards.size()) {
122+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, shardError);
123+
}
124+
111125
for (size_t i = 0; i < vertexShards.size(); i++) {
112126

113127
VertexShardInfo info;
114128
info.vertexShard = vertexShards[i];
115129
info.trx = _createTransaction();
116-
info.edgeDataOffset = eCount;
117130

118131
TRI_voc_cid_t cid = info.trx->addCollectionAtRuntime(info.vertexShard);
119132
info.trx->pinData(cid); // will throw when it fails
@@ -135,8 +148,7 @@ std::map<CollectionID, std::vector<VertexShardInfo>>
135148
for (auto const& pair2 : edgeCollMap) {
136149
std::vector<ShardID> const& edgeShards = pair2.second;
137150
if (vertexShards.size() != edgeShards.size()) {
138-
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
139-
"Collections need to have the same number of shards");
151+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, shardError);
140152
}
141153

142154
ShardID const& eShard = edgeShards[i];
@@ -149,8 +161,9 @@ std::map<CollectionID, std::vector<VertexShardInfo>>
149161
if (opResult.fail() || _destroyed) {
150162
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
151163
}
152-
eCount += opResult.slice().getUInt();
164+
info.numEdges += opResult.slice().getUInt();;
153165
}
166+
eCount += info.numEdges;
154167

155168
result[pair.first].push_back(std::move(info));
156169
}
@@ -187,15 +200,31 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
187200
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
188201
rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER;
189202
scheduler->post([this, scheduler, callback] {
190-
uint64_t vertexOffset = 0;
191-
// Contains the shards located on this db server in the right order
192-
// assuming edges are sharded after _from, vertices after _key
193-
// then every ith vertex shard has the corresponding edges in
194-
// the ith edge shard
195203

196-
auto shards = _allocateSpace();
197-
for (auto& pair : shards) {
198-
for (VertexShardInfo& info : pair.second) {
204+
// hold the current position where the ith vertex shard can
205+
// start to write its data. At the end the offset should equal the
206+
// sum of the counts of all ith edge shards
207+
auto collectionShards = _allocateSpace();
208+
209+
210+
uint64_t vertexOff = 0;
211+
std::vector<size_t> edgeDataOffsets; // will contain # off edges in ith shard
212+
for (auto& collection : collectionShards) {
213+
if (edgeDataOffsets.size() == 0) {
214+
edgeDataOffsets.resize(collection.second.size() + 1);
215+
std::fill(edgeDataOffsets.begin(), edgeDataOffsets.end(), 0);
216+
}
217+
TRI_ASSERT(collection.second.size() < edgeDataOffsets.size());
218+
size_t shardIdx = 0;
219+
for (VertexShardInfo& info : collection.second) {
220+
edgeDataOffsets[++shardIdx] += info.numEdges;
221+
}
222+
}
223+
224+
for (auto& collection : collectionShards) {
225+
226+
size_t shardIdx = 0;
227+
for (VertexShardInfo& info : collection.second) {
199228

200229
try {
201230
// we might have already loaded these shards
@@ -205,20 +234,22 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
205234
_loadedShards.insert(info.vertexShard);
206235
_runningThreads++;
207236
TRI_ASSERT(info.numVertices > 0);
208-
TRI_ASSERT(vertexOffset < _index.size());
209-
TRI_ASSERT(info.edgeDataOffset < _edges->size());
210-
scheduler->post([this, &info, vertexOffset] {
237+
TRI_ASSERT(vertexOff < _index.size());
238+
TRI_ASSERT(info.numEdges == 0 || edgeDataOffsets[shardIdx] < _edges->size());
239+
240+
scheduler->post([this, &info, &edgeDataOffsets, vertexOff, shardIdx] {
211241
TRI_DEFER(_runningThreads--);// exception safe
212242
_loadVertices(*info.trx, info.vertexShard, info.edgeShards,
213-
vertexOffset, info.edgeDataOffset);
243+
vertexOff, edgeDataOffsets[shardIdx]);
214244
}, false);
215245
// update to next offset
216-
vertexOffset += info.numVertices;
246+
vertexOff += info.numVertices;
217247
} catch(...) {
218248
LOG_TOPIC(WARN, Logger::PREGEL) << "unhandled exception while "
219249
<<"loading pregel graph";
220250
}
221251

252+
shardIdx++;
222253
}
223254

224255
// we can only load one vertex collection at a time

arangod/Pregel/GraphStore.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,9 @@ struct VertexShardInfo {
5656
ShardID vertexShard;
5757
std::vector<ShardID> edgeShards;
5858
std::unique_ptr<transaction::Methods> trx;
59-
/// number of vertices
59+
/// number of vertices / edges
6060
size_t numVertices = 0;
61-
// hold the current position where the ith vertex shard can
62-
// start to write its data. At the end the offset should equal the
63-
// sum of the counts of all ith edge shards
64-
size_t edgeDataOffset = 0;
61+
size_t numEdges = 0;
6562
};
6663

6764
////////////////////////////////////////////////////////////////////////////////

arangod/Pregel/PregelFeature.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ std::pair<Result, uint64_t> PregelFeature::startExecution(
8585
// check the access rights to collections
8686
ExecContext const* exec = ExecContext::CURRENT;
8787
if (exec != nullptr) {
88+
TRI_ASSERT(params.isObject());
8889
VPackSlice storeSlice = params.get("store");
8990
bool storeResults = !storeSlice.isBool() || storeSlice.getBool();
9091
for (std::string const& vc : vertexCollections) {

arangod/RestHandler/RestControlPregelHandler.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,10 @@ void RestControlPregelHandler::startExecution() {
111111
}
112112

113113
// extract the parameters
114-
auto parameters = std::make_shared<VPackBuilder>(body.get("params"));
114+
auto parameters = body.get("params");
115+
if (!parameters.isObject()) {
116+
parameters = VPackSlice::emptyObjectSlice();
117+
}
115118

116119
// extract the collections
117120
std::vector<std::string> vertexCollections;
@@ -154,7 +157,7 @@ void RestControlPregelHandler::startExecution() {
154157

155158
auto res = pregel::PregelFeature::startExecution(
156159
_vocbase, algorithm, vertexCollections, edgeCollections,
157-
parameters->slice());
160+
parameters);
158161
if (res.first.fail()) {
159162
generateError(res.first);
160163
return;

0 commit comments

Comments
 (0)
0