8000 [GORDO-1519] Re-parallelise graph loading (#18580) · xuyanshi/arangodb@85ea697 · GitHub
[go: up one dir, main page]

Skip to content

Commit 85ea697

Browse files
authored
[GORDO-1519] Re-parallelise graph loading (arangodb#18580)
* Re-parallelise graph loading * Fix single server vertex ID registry * Remove useless assertion * Send an update once everything is loaded
1 parent ccce3f8 commit 85ea697

File tree

5 files changed

+94
-78
lines changed

5 files changed

+94
-78
lines changed

arangod/Pregel/GraphStore/GraphLoader.cpp

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "Cluster/ServerState.h"
3333
#include "Cluster/ClusterFeature.h"
3434
#include "Cluster/ClusterInfo.h"
35+
#include "Futures/Future.h"
3536
#include "Logger/LogMacros.h"
3637
#include "Pregel/GraphFormat.h"
3738
#include "Pregel/Algos/ColorPropagation/ColorPropagationValue.h"
@@ -65,27 +66,33 @@ constexpr auto shardError = std::string_view{
6566

6667
namespace arangodb::pregel {
6768
template<typename V, typename E>
68-
auto GraphLoader<V, E>::requestVertexIds(uint64_t numVertices) -> void {
69+
auto GraphLoader<V, E>::requestVertexIds(uint64_t numVertices)
70+
-> VertexIdRange {
6971
if (arangodb::ServerState::instance()->isRunningInCluster()) {
7072
if (config->vocbase()->server().template hasFeature<ClusterFeature>()) {
7173
arangodb::ClusterInfo& ci = this->config->vocbase()
7274
->server()
7375
.template getFeature<ClusterFeature>()
7476
.clusterInfo();
75-
currentVertexId = ci.uniqid(numVertices);
76-
currentVertexIdMax = currentVertexId + numVertices;
77+
auto n = ci.uniqid(numVertices);
78+
return VertexIdRange{.current = n, .maxId = n + numVertices};
7779
} else {
7880
ADB_PROD_ASSERT(false) << "ClusterFeature not present in server";
7981
}
8082
} else {
81-
// Just bump the max
82-
currentVertexIdMax += numVertices;
83+
uint64_t base = currentIdBase.load();
84+
while (!currentIdBase.compare_exchange_strong(base, base + numVertices)) {
85+
};
86+
return VertexIdRange{.current = base, .maxId = base + numVertices};
8387
}
88+
ADB_PROD_ASSERT(false);
89+
return VertexIdRange{};
8490
}
8591

8692
template<typename V, typename E>
87-
auto GraphLoader<V, E>::load() -> Magazine<V, E> {
88-
auto result = Magazine<V, E>{};
93+
auto GraphLoader<V, E>::load() -> futures::Future<Magazine<V, E>> {
94+
auto futures = std::vector<futures::Future<std::shared_ptr<Quiver<V, E>>>>{};
95+
8996
// Contains the shards located on this db server in the right order
9097
// assuming edges are sharded after _from, vertices after _key
9198
// then every ith vertex shard has the corresponding edges in
@@ -128,34 +135,50 @@ auto GraphLoader<V, E>::load() -> Magazine<V, E> {
128135
}
129136
}
130137

131-
try {
132-
result.emplace(loadVertices(vertexShard, edges));
133-
} catch (basics::Exception const& ex) {
134-
LOG_PREGEL("8682a", WARN)
135-
<< "caught exception while loading pregel graph: " << ex.what();
136-
} catch (std::exception const& ex) {
137-
LOG_PREGEL("c87c9", WARN)
138-
<< "caught exception while loading pregel graph: " << ex.what();
139-
} catch (...) {
140-
LOG_PREGEL("c7240", WARN)
141-
<< "caught unknown exception while loading pregel graph";
142-
}
138+
auto self = this->shared_from_this();
139+
futures.emplace_back(SchedulerFeature::SCHEDULER->queueWithFuture(
140+
RequestLane::INTERNAL_LOW,
141+
[this, self, vertexShard, edges]() -> std::shared_ptr<Quiver<V, E>> {
142+
try {
143+
return loadVertices(vertexShard, edges);
144+
} catch (basics::Exception const& ex) {
145+
LOG_PREGEL("8682a", WARN)
146+
<< "caught exception while loading pregel graph: "
147+
<< ex.what();
148+
return nullptr;
149+
} catch (std::exception const& ex) {
150+
LOG_PREGEL("c87c9", WARN)
151+
<< "caught exception while loading pregel graph: "
152+
<< ex.what();
153+
return nullptr;
154+
} catch (...) {
155+
LOG_PREGEL("c7240", WARN)
156+
<< "caught unknown exception while loading pregel graph";
157+
return nullptr;
158+
}
159+
}));
143160
}
144161
}
145-
146-
std::visit(overload{[&](ActorLoadingUpdate const& update) {
147-
update.fn(message::GraphLoadingUpdate{
148-
.verticesLoaded = result.numberOfVertices(),
149-
.edgesLoaded = result.numberOfEdges(),
150-
.memoryBytesUsed = 0 // TODO
151-
});
152-
},
153-
[](OldLoadingUpdate const& update) {
154-
SchedulerFeature::SCHEDULER->queue(
155-
RequestLane::INTERNAL_LOW, update.fn);
156-
}},
157-
updateCallback);
158-
return result;
162+
auto self = this->shared_from_this();
163+
return collectAll(futures).thenValue([this, self](auto&& results) {
164+
auto result = Magazine<V, E>{};
165+
for (auto&& r : results) {
166+
// TODO: maybe handle exceptions here?
167+
result.emplace(std::move(r.get()));
168+
}
169+
std::visit(overload{[&](ActorLoadingUpdate const& update) {
170+
update.fn(message::GraphLoadingUpdate{
171+
.verticesLoaded = result.numberOfVertices(),
172+
.edgesLoaded = result.numberOfEdges(),
173+
.memoryBytesUsed = 0});
174+
},
175+
[](OldLoadingUpdate const& update) {
176+
SchedulerFeature::SCHEDULER->queue(
177+
RequestLane::INTERNAL_LOW, update.fn);
178+
}},
179+
updateCallback);
180+
return result;
181+
});
159182
}
160183

161184
template<typename V, typename E>
@@ -184,11 +207,10 @@ auto GraphLoader<V, E>::loadVertices(ShardID const& vertexShard,
184207
LogicalCollection* coll = cursor->collection();
185208
uint64_t numVertices = coll->getPhysical()->numberDocuments(&trx);
186209

187-
requestVertexIds(numVertices);
210+
auto vertexIdRange = requestVertexIds(numVertices);
188211
LOG_PREGEL("7c31f", DEBUG)
189212
<< "Shard '" << vertexShard << "' has " << numVertices
190-
<< " vertices. id range: [" << currentVertexId << ", "
191-
<< currentVertexIdMax << ")";
213+
<< " vertices. id range: " << inspection::json(vertexIdRange);
192214

193215
std::vector<std::unique_ptr<traverser::EdgeCollectionInfo>>
194216
edgeCollectionInfos;
@@ -216,12 +238,12 @@ auto GraphLoader<V, E>::loadVertices(ShardID const& vertexShard,
216238
// note: ventry->_data and vertexIdRange may be modified by
217239
// copyVertexData!
218240

219-
TRI_ASSERT(currentVertexId < currentVertexIdMax)
220-
<< fmt::format("vertexId exceeded maximum: {} < {}", currentVertexId,
221-
currentVertexIdMax);
241+
TRI_ASSERT(vertexIdRange.current < vertexIdRange.maxId)
242+
<< fmt::format("vertexId exceeded maximum: {} < {}",
243+
vertexIdRange.current, vertexIdRange.maxId);
222244
graphFormat->copyVertexData(*ctx->getVPackOptions(), documentId, slice,
223-
ventry.data(), currentVertexId);
224-
currentVertexId += 1;
245+
ventry.data(), vertexIdRange.current);
246+
vertexIdRange.current += 1;
225247
}
226248

227249
// load edges

arangod/Pregel/GraphStore/GraphLoader.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <functional>
2828

2929
#include "Basics/GlobalResourceMonitor.h"
30+
#include "Basics/Guarded.h"
3031
#include "Basics/ResourceUsage.h"
3132
#include "Cluster/ClusterTypes.h"
3233
#include "Pregel/GraphStore/GraphLoaderBase.h"
@@ -50,6 +51,16 @@ struct ActorLoadingUpdate {
5051
using LoadingUpdateCallback =
5152
std::variant<OldLoadingUpdate, ActorLoadingUpdate>;
5253

54+
struct VertexIdRange {
55+
uint64_t current = 0;
56+
uint64_t maxId = 0;
57+
};
58+
template<typename Inspector>
59+
auto inspect(Inspector& f, VertexIdRange& r) {
60+
return f.object(r).fields(f.field("current", r.current),
61+
f.field("maxId", r.maxId));
62+
}
63+
5364
template<typename V, typename E>
5465
struct GraphLoader : GraphLoaderBase<V, E> {
5566
explicit GraphLoader(std::shared_ptr<WorkerConfig const> config,
@@ -59,7 +70,7 @@ struct GraphLoader : GraphLoaderBase<V, E> {
5970
resourceMonitor(GlobalResourceMonitor::instance()),
6071
config(config),
6172
updateCallback(updateCallback) {}
62-
auto load() -> Magazine<V, E> override;
73+
auto load() -> futures::Future<Magazine<V, E>> override;
6374

6475
auto loadVertices(ShardID const& vertexShard,
6576
std::vector<ShardID> const& edgeShards)
@@ -68,15 +79,14 @@ struct GraphLoader : GraphLoaderBase<V, E> {
6879
std::string_view documentID,
6980
traverser::EdgeCollectionInfo& info) -> void;
7081

71-
auto requestVertexIds(uint64_t numVertices) -> void;
82+
auto requestVertexIds(uint64_t numVertices) -> VertexIdRange;
7283

7384
std::shared_ptr<GraphFormat<V, E> const> graphFormat;
7485
ResourceMonitor resourceMonitor;
7586
std::shared_ptr<WorkerConfig const> config;
7687
LoadingUpdateCallback updateCallback;
7788

78-
uint64_t currentVertexId = 0;
79-
uint64_t currentVertexIdMax = 0;
89+
std::atomic<uint64_t> currentIdBase;
8090

8191
uint64_t const batchSize = 10000;
8292
};

arangod/Pregel/GraphStore/GraphLoaderBase.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <memory>
2727
#include <functional>
2828

29+
#include <Futures/Future.h>
2930
#include <Pregel/GraphStore/Magazine.h>
3031

3132
namespace arangodb::pregel {
@@ -36,8 +37,8 @@ template<typename V, typename E>
3637
struct Quiver;
3738

3839
template<typename V, typename E>
39-
struct GraphLoaderBase {
40-
virtual auto load() -> Magazine<V, E> = 0;
40+
struct GraphLoaderBase : std::enable_shared_from_this<GraphLoaderBase<V, E>> {
41+
virtual auto load() -> futures::Future<Magazine<V, E>> = 0;
4142
virtual ~GraphLoaderBase() = default;
4243
};
4344

arangod/Pregel/Worker/Handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ struct WorkerHandler : actor::HandlerBase<Runtime, WorkerState<V, E, M>> {
8585
this->template dispatch< 8810 pregel::message::StatusMessages>(
8686
this->state->statusActor, update);
8787
}});
88-
this->state->magazine = loader.load();
88+
this->state->magazine = loader.load().get();
8989

9090
LOG_TOPIC("5206c", WARN, Logger::PREGEL)
9191
<< fmt::format("Worker {} has finished loading.", this->self);

arangod/Pregel/Worker/Worker.cpp

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,18 @@ void Worker<V, E, M>::_initializeMessageCaches() {
171171
// @brief load the initial worker data, call conductor eventually
172172
template<typename V, typename E, typename M>
173173
void Worker<V, E, M>::setupWorker() {
174-
std::function<void()> finishedCallback = [self = shared_from_this(), this] {
174+
LOG_PREGEL("52070", WARN) << fmt::format(
175+
"Worker for execution number {} is loading", _config->executionNumber());
176+
_feature.metrics()->pregelWorkersLoadingNumber->fetch_add(1);
177+
178+
auto loader = std::make_shared<GraphLoader<V, E>>(
179+
_config, _algorithm->inputFormat(),
180+
OldLoadingUpdate{.fn = _makeStatusCallback()});
181+
182+
auto self = shared_from_this();
183+
loader->load().thenFinal([self, this](auto&& r) {
184+
_magazine = r.get();
185+
175186
LOG_PREGEL("52062", WARN)
176187
<< fmt::format("Worker for execution number {} has finished loading.",
177188
_config->executionNumber());
@@ -189,35 +200,7 @@ void Worker<V, E, M>::setupWorker() {
189200
_callConductor(Utils::finishedStartupPath,
190201
VPackBuilder(serialized.get().slice()));
191202
_feature.metrics()->pregelWorkersLoadingNumber->fetch_sub(1);
192-
};
193-
194-
// initialization of the graphstore might take an undefined amount
195-
// of time. Therefore this is performed asynchronously
196-
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
197-
LOG_PREGEL("52070", WARN) << fmt::format(
198-
"Worker for execution number {} is loading", _config->executionNumber());
199-
_feature.metrics()->pregelWorkersLoadingNumber->fetch_add(1);
200-
Scheduler* scheduler = SchedulerFeature::SCHEDULER;
201-
scheduler->queue(RequestLane::INTERNAL_LOW,
202-
[this, self = shared_from_this(),
203-
statusUpdateCallback = std::move(_makeStatusCallback()),
204-
finishedCallback = std::move(finishedCallback)] {
205-
try {
206-
auto loader = GraphLoader<V, E>(
207-
_config, _algorithm->inputFormat(),
208-
OldLoadingUpdate{.fn = statusUpdateCallback});
209-
_magazine = std::move(loader.load());
210-
} catch (std::exception const& ex) {
211-
LOG_PREGEL("a47c4", WARN)
212-
<< "caught exception in loadShards: " << ex.what();
213-
throw;
214-
} catch (...) {
215-
LOG_PREGEL("e932d", WARN)
216-
<< "caught unknown exception in loadShards";
217-
throw;
218-
}
219-
finishedCallback();
220-
});
203+
});
221204
}
222205

223206
template<typename V, typename E, typename M>

0 commit comments

Comments
 (0)
0