@@ -84,6 +84,9 @@ GraphStore<V, E>::~GraphStore() {
84
84
delete _edges;
85
85
}
86
86
87
+ static const char * shardError = " Collections need to have the same number of shards"
88
+ " use distributeShardsLike" ;
89
+
87
90
template <typename V, typename E>
88
91
std::map<CollectionID, std::vector<VertexShardInfo>>
89
92
GraphStore<V, E>::_allocateSpace() {
@@ -98,22 +101,32 @@ std::map<CollectionID, std::vector<VertexShardInfo>>
98
101
LOG_TOPIC (DEBUG, Logger::PREGEL) << " Allocating memory" ;
99
102
uint64_t totalMemory = TRI_totalSystemMemory ();
100
103
104
+ // Contains the shards located on this db server in the right order
105
+ // assuming edges are sharded after _from, vertices after _key
106
+ // then every ith vertex shard has the corresponding edges in
107
+ // the ith edge shard
101
108
std::map<CollectionID, std::vector<ShardID>> const & vertexCollMap =
102
109
_config->vertexCollectionShards ();
103
110
std::map<CollectionID, std::vector<ShardID>> const & edgeCollMap =
104
111
_config->edgeCollectionShards ();
112
+ size_t numShards = SIZE_T_MAX;
105
113
106
114
// Allocating some memory
107
115
uint64_t vCount = 0 ;
108
116
uint64_t eCount = 0 ;
109
117
for (auto const & pair : vertexCollMap) {
110
118
std::vector<ShardID> const & vertexShards = pair.second ;
119
+ if (numShards == SIZE_T_MAX) {
120
+ numShards = vertexShards.size ();
121
+ } else if (numShards != vertexShards.size ()) {
122
+ THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_BAD_PARAMETER, shardError);
123
+ }
124
+
111
125
for (size_t i = 0 ; i < vertexShards.size (); i++) {
112
126
113
127
VertexShardInfo info;
114
128
info.vertexShard = vertexShards[i];
115
129
info.trx = _createTransaction ();
116
- info.edgeDataOffset = eCount;
117
130
118
131
TRI_voc_cid_t cid = info.trx ->addCollectionAtRuntime (info.vertexShard );
119
132
info.trx ->pinData (cid); // will throw when it fails
@@ -135,8 +148,7 @@ std::map<CollectionID, std::vector<VertexShardInfo>>
135
148
for (auto const & pair2 : edgeCollMap) {
136
149
std::vector<ShardID> const & edgeShards = pair2.second ;
137
150
if (vertexShards.size () != edgeShards.size ()) {
138
- THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_BAD_PARAMETER,
139
- " Collections need to have the same number of shards" );
151
+ THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_BAD_PARAMETER, shardError);
140
152
}
141
153
142
154
ShardID const & eShard = edgeShards[i];
@@ -149,8 +161,9 @@ std::map<CollectionID, std::vector<VertexShardInfo>>
149
161
if (opResult.fail () || _destroyed) {
150
162
THROW_ARANGO_EXCEPTION (TRI_ERROR_BAD_PARAMETER);
151
163
}
152
- eCount += opResult.slice ().getUInt ();
164
+ info. numEdges += opResult.slice ().getUInt (); ;
153
165
}
166
+ eCount += info.numEdges ;
154
167
155
168
result[pair.first ].push_back (std::move (info));
156
169
}
@@ -187,15 +200,31 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
187
200
TRI_ASSERT (SchedulerFeature::SCHEDULER != nullptr );
188
201
rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER;
189
202
scheduler->post ([this , scheduler, callback] {
190
- uint64_t vertexOffset = 0 ;
191
- // Contains the shards located on this db server in the right order
192
- // assuming edges are sharded after _from, vertices after _key
193
- // then every ith vertex shard has the corresponding edges in
194
- // the ith edge shard
195
203
196
- auto shards = _allocateSpace ();
197
- for (auto & pair : shards) {
198
- for (VertexShardInfo& info : pair.second ) {
204
+ // hold the current position where the ith vertex shard can
205
+ // start to write its data. At the end the offset should equal the
206
+ // sum of the counts of all ith edge shards
207
+ auto collectionShards = _allocateSpace ();
208
+
209
+
210
+ uint64_t vertexOff = 0 ;
211
+ std::vector<size_t > edgeDataOffsets; // will contain # off edges in ith shard
212
+ for (auto & collection : collectionShards) {
213
+ if (edgeDataOffsets.size () == 0 ) {
214
+ edgeDataOffsets.resize (collection.second .size () + 1 );
215
+ std::fill (edgeDataOffsets.begin (), edgeDataOffsets.end (), 0 );
216
+ }
217
+ TRI_ASSERT (collection.second .size () < edgeDataOffsets.size ());
218
+ size_t shardIdx = 0 ;
219
+ for (VertexShardInfo& info : collection.second ) {
220
+ edgeDataOffsets[++shardIdx] += info.numEdges ;
221
+ }
222
+ }
223
+
224
+ for (auto & collection : collectionShards) {
225
+
226
+ size_t shardIdx = 0 ;
227
+ for (VertexShardInfo& info : collection.second ) {
199
228
200
229
try {
201
230
// we might have already loaded these shards
@@ -205,20 +234,22 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
205
234
_loadedShards.insert (info.vertexShard );
206
235
_runningThreads++;
207
236
TRI_ASSERT (info.numVertices > 0 );
208
- TRI_ASSERT (vertexOffset < _index.size ());
209
- TRI_ASSERT (info.edgeDataOffset < _edges->size ());
210
- scheduler->post ([this , &info, vertexOffset] {
237
+ TRI_ASSERT (vertexOff < _index.size ());
238
+ TRI_ASSERT (info.numEdges == 0 || edgeDataOffsets[shardIdx] < _edges->size ());
239
+
240
+ scheduler->post ([this , &info, &edgeDataOffsets, vertexOff, shardIdx] {
211
241
TRI_DEFER (_runningThreads--);// exception safe
212
242
_loadVertices (*info.trx , info.vertexShard , info.edgeShards ,
213
- vertexOffset, info. edgeDataOffset );
243
+ vertexOff, edgeDataOffsets[shardIdx] );
214
244
}, false );
215
245
// update to next offset
216
- vertexOffset += info.numVertices ;
246
+ vertexOff += info.numVertices ;
217
247
} catch (...) {
218
248
LOG_TOPIC (WARN, Logger::PREGEL) << " unhandled exception while "
219
249
<<" loading pregel graph" ;
220
250
}
221
251
252
+ shardIdx++;
222
253
}
223
254
224
255
// we can only load one vertex collection at a time
0 commit comments