|
32 | 32 | #include "Cluster/ServerState.h"
|
33 | 33 | #include "Cluster/ClusterFeature.h"
|
34 | 34 | #include "Cluster/ClusterInfo.h"
|
| 35 | +#include "Futures/Future.h" |
35 | 36 | #include "Logger/LogMacros.h"
|
36 | 37 | #include "Pregel/GraphFormat.h"
|
37 | 38 | #include "Pregel/Algos/ColorPropagation/ColorPropagationValue.h"
|
@@ -65,27 +66,33 @@ constexpr auto shardError = std::string_view{
|
65 | 66 |
|
66 | 67 | namespace arangodb::pregel {
|
67 | 68 | template<typename V, typename E>
|
68 |
| -auto GraphLoader<V, E>::requestVertexIds(uint64_t numVertices) -> void { |
| 69 | +auto GraphLoader<V, E>::requestVertexIds(uint64_t numVertices) |
| 70 | + -> VertexIdRange { |
69 | 71 | if (arangodb::ServerState::instance()->isRunningInCluster()) {
|
70 | 72 | if (config->vocbase()->server().template hasFeature<ClusterFeature>()) {
|
71 | 73 | arangodb::ClusterInfo& ci = this->config->vocbase()
|
72 | 74 | ->server()
|
73 | 75 | .template getFeature<ClusterFeature>()
|
74 | 76 | .clusterInfo();
|
75 |
| - currentVertexId = ci.uniqid(numVertices); |
76 |
| - currentVertexIdMax = currentVertexId + numVertices; |
| 77 | + auto n = ci.uniqid(numVertices); |
| 78 | + return VertexIdRange{.current = n, .maxId = n + numVertices}; |
77 | 79 | } else {
|
78 | 80 | ADB_PROD_ASSERT(false) << "ClusterFeature not present in server";
|
79 | 81 | }
|
80 | 82 | } else {
|
81 |
| - // Just bump the max |
82 |
| - currentVertexIdMax += numVertices; |
| 83 | + uint64_t base = currentIdBase.load(); |
| 84 | + while (!currentIdBase.compare_exchange_strong(base, base + numVertices)) { |
| 85 | + }; |
| 86 | + return VertexIdRange{.current = base, .maxId = base + numVertices}; |
83 | 87 | }
|
| 88 | + ADB_PROD_ASSERT(false); |
| 89 | + return VertexIdRange{}; |
84 | 90 | }
|
85 | 91 |
|
86 | 92 | template<typename V, typename E>
|
87 |
| -auto GraphLoader<V, E>::load() -> Magazine<V, E> { |
88 |
| - auto result = Magazine<V, E>{}; |
| 93 | +auto GraphLoader<V, E>::load() -> futures::Future<Magazine<V, E>> { |
| 94 | + auto futures = std::vector<futures::Future<std::shared_ptr<Quiver<V, E>>>>{}; |
| 95 | + |
89 | 96 | // Contains the shards located on this db server in the right order
|
90 | 97 | // assuming edges are sharded after _from, vertices after _key
|
91 | 98 | // then every ith vertex shard has the corresponding edges in
|
@@ -128,34 +135,50 @@ auto GraphLoader<V, E>::load() -> Magazine<V, E> {
|
128 | 135 | }
|
129 | 136 | }
|
130 | 137 |
|
131 |
| - try { |
132 |
| - result.emplace(loadVertices(vertexShard, edges)); |
133 |
| - } catch (basics::Exception const& ex) { |
134 |
| - LOG_PREGEL("8682a", WARN) |
135 |
| - << "caught exception while loading pregel graph: " << ex.what(); |
136 |
| - } catch (std::exception const& ex) { |
137 |
| - LOG_PREGEL("c87c9", WARN) |
138 |
| - << "caught exception while loading pregel graph: " << ex.what(); |
139 |
| - } catch (...) { |
140 |
| - LOG_PREGEL("c7240", WARN) |
141 |
| - << "caught unknown exception while loading pregel graph"; |
142 |
| - } |
| 138 | + auto self = this->shared_from_this(); |
| 139 | + futures.emplace_back(SchedulerFeature::SCHEDULER->queueWithFuture( |
| 140 | + RequestLane::INTERNAL_LOW, |
| 141 | + [this, self, vertexShard, edges]() -> std::shared_ptr<Quiver<V, E>> { |
| 142 | + try { |
| 143 | + return loadVertices(vertexShard, edges); |
| 144 | + } catch (basics::Exception const& ex) { |
| 145 | + LOG_PREGEL("8682a", WARN) |
| 146 | + << "caught exception while loading pregel graph: " |
| 147 | + << ex.what(); |
| 148 | + return nullptr; |
| 149 | + } catch (std::exception const& ex) { |
| 150 | + LOG_PREGEL("c87c9", WARN) |
| 151 | + << "caught exception while loading pregel graph: " |
| 152 | + << ex.what(); |
| 153 | + return nullptr; |
| 154 | + } catch (...) { |
| 155 | + LOG_PREGEL("c7240", WARN) |
| 156 | + << "caught unknown exception while loading pregel graph"; |
| 157 | + return nullptr; |
| 158 | + } |
| 159 | + })); |
143 | 160 | }
|
144 | 161 | }
|
145 |
| - |
146 |
| - std::visit(overload{[&](ActorLoadingUpdate const& update) { |
147 |
| - update.fn(message::GraphLoadingUpdate{ |
148 |
| - .verticesLoaded = result.numberOfVertices(), |
149 |
| - .edgesLoaded = result.numberOfEdges(), |
150 |
| - .memoryBytesUsed = 0 // TODO |
151 |
| - }); |
152 |
| - }, |
153 |
| - [](OldLoadingUpdate const& update) { |
154 |
| - SchedulerFeature::SCHEDULER->queue( |
155 |
| - RequestLane::INTERNAL_LOW, update.fn); |
156 |
| - }}, |
157 |
| - updateCallback); |
158 |
| - return result; |
| 162 | + auto self = this->shared_from_this(); |
| 163 | + return collectAll(futures).thenValue([this, self](auto&& results) { |
| 164 | + auto result = Magazine<V, E>{}; |
| 165 | + for (auto&& r : results) { |
| 166 | + // TODO: maybe handle exceptions here? |
| 167 | + result.emplace(std::move(r.get())); |
| 168 | + } |
| 169 | + std::visit(overload{[&](ActorLoadingUpdate const& update) { |
| 170 | + update.fn(message::GraphLoadingUpdate{ |
| 171 | + .verticesLoaded = result.numberOfVertices(), |
| 172 | + .edgesLoaded = result.numberOfEdges(), |
| 173 | + .memoryBytesUsed = 0}); |
| 174 | + }, |
| 175 | + [](OldLoadingUpdate const& update) { |
| 176 | + SchedulerFeature::SCHEDULER->queue( |
| 177 | + RequestLane::INTERNAL_LOW, update.fn); |
| 178 | + }}, |
| 179 | + updateCallback); |
| 180 | + return result; |
| 181 | + }); |
159 | 182 | }
|
160 | 183 |
|
161 | 184 | template<typename V, typename E>
|
@@ -184,11 +207,10 @@ auto GraphLoader<V, E>::loadVertices(ShardID const& vertexShard,
|
184 | 207 | LogicalCollection* coll = cursor->collection();
|
185 | 208 | uint64_t numVertices = coll->getPhysical()->numberDocuments(&trx);
|
186 | 209 |
|
187 |
| - requestVertexIds(numVertices); |
| 210 | + auto vertexIdRange = requestVertexIds(numVertices); |
188 | 211 | LOG_PREGEL("7c31f", DEBUG)
|
189 | 212 | << "Shard '" << vertexShard << "' has " << numVertices
|
190 |
| - << " vertices. id range: [" << currentVertexId << ", " |
191 |
| - << currentVertexIdMax << ")"; |
| 213 | + << " vertices. id range: " << inspection::json(vertexIdRange); |
192 | 214 |
|
193 | 215 | std::vector<std::unique_ptr<traverser::EdgeCollectionInfo>>
|
194 | 216 | edgeCollectionInfos;
|
@@ -216,12 +238,12 @@ auto GraphLoader<V, E>::loadVertices(ShardID const& vertexShard,
|
216 | 238 | // note: ventry->_data and vertexIdRange may be modified by
|
217 | 239 | // copyVertexData!
|
218 | 240 |
|
219 |
| - TRI_ASSERT(currentVertexId < currentVertexIdMax) |
220 |
| - << fmt::format("vertexId exceeded maximum: {} < {}", currentVertexId, |
221 |
| - currentVertexIdMax); |
| 241 | + TRI_ASSERT(vertexIdRange.current < vertexIdRange.maxId) |
| 242 | + << fmt::format("vertexId exceeded maximum: {} < {}", |
| 243 | + vertexIdRange.current, vertexIdRange.maxId); |
222 | 244 | graphFormat->copyVertexData(*ctx->getVPackOptions(), documentId, slice,
|
223 |
| - ventry.data(), currentVertexId); |
224 |
| - currentVertexId += 1; |
| 245 | + ventry.data(), vertexIdRange.current); |
| 246 | + vertexIdRange.current += 1; |
225 | 247 | }
|
226 | 248 |
|
227 | 249 | // load edges
|
|
0 commit comments