8000 Refactor/cluster provider fetch (#16012) · OOSYOO/arangodb@2e87504 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2e87504

Browse files
Refactor/cluster provider fetch (arangodb#16012)
1 parent 29fd3cb commit 2e87504

19 files changed

+326
-62
lines changed

arangod/Graph/Enumerators/OneSidedEnumerator.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,16 @@ auto OneSidedEnumerator<Configuration>::computeNeighbourhoodOfNextVertex()
102102
// Pull next element from Queue
103103
// Do 1 step search
104104
TRI_ASSERT(!_queue.isEmpty());
105-
if (!_queue.hasProcessableElement()) {
106-
std::vector<Step*> looseEnds = _queue.getLooseEnds();
107-
futures::Future<std::vector<Step*>> futureEnds = _provider.fetch(looseEnds);
105+
if (!_queue.firstIsVertexFetched()) {
106+
std::vector<Step*> looseEnds = _queue.getStepsWithoutFetchedVertex();
107+
futures::Future<std::vector<Step*>> futureEnds =
108+
_provider.fetchVertices(looseEnds);
108109

109110
// Will throw all network errors here
110-
auto&& preparedEnds = futureEnds.get();
111+
std::vector<Step*> preparedEnds = std::move(futureEnds.get());
111112

112113
TRI_ASSERT(preparedEnds.size() != 0);
113-
TRI_ASSERT(_queue.hasProcessableElement());
114+
TRI_ASSERT(_queue.firstIsVertexFetched());
114115
}
115116

116117
auto tmp = _queue.pop();
@@ -143,6 +144,16 @@ auto OneSidedEnumerator<Configuration>::computeNeighbourhoodOfNextVertex()
143144
}
144145

145146
if (step.getDepth() < _options.getMaxDepth() && !res.isPruned()) {
147+
if (!step.edgeFetched()) {
148+
// NOTE: The step we have should be the first, s.t. we are guaranteed
149+
// to work on it, as the ordering here gives the priority to the Provider
150+
// in how important it is to get responses for a particular step.
151+
std::vector<Step*> stepsToFetch{&step};
152+
_queue.getStepsWithoutFetchedEdges(stepsToFetch);
153+
TRI_ASSERT(!stepsToFetch.empty());
154+
_provider.fetchEdges(stepsToFetch);
155+
TRI_ASSERT(step.edgeFetched());
156+
}
146157
_provider.expand(step, posPrevious,
147158
[&](Step n) -> void { _queue.append(n); });
148159
}
@@ -248,16 +259,16 @@ auto OneSidedEnumerator<Configuration>::fetchResults() -> void {
248259
if (!_resultsFetched && !_results.empty()) {
249260
std::vector<Step*> looseEnds{};
250261

251-
for (auto& vertex : _results) {
252-
if (!vertex.isProcessable()) {
253-
looseEnds.emplace_back(&vertex);
262+
for (auto& step : _results) {
263+
if (!step.vertexFetched()) {
264+
looseEnds.emplace_back(&step);
254265
}
255266
}
256267

257268
if (!looseEnds.empty()) {
258269
// Will throw all network errors here
259270
futures::Future<std::vector<Step*>> futureEnds =
260-
_provider.fetch(looseEnds);
271+
_provider.fetchVertices(looseEnds);
261272
futureEnds.get();
262273
// Notes for the future:
263274
// Vertices are now fetched. Think about other less-blocking and

arangod/Graph/PathManagement/PathValidator.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,6 @@ auto PathValidator<ProviderType, PathStore, vertexUniqueness, edgeUniqueness>::
330330
}
331331
}
332332
}
333-
334333
return res;
335334
}
336335

arangod/Graph/Providers/ClusterProvider.cpp

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ VertexType getEdgeDestination(arangodb::velocypack::Slice edge,
7070
}
7171
return VertexType{from};
7272
}
73+
74+
ClusterProviderStep::FetchedType getFetchedType(bool vertexFetched,
75+
bool edgesFetched) {
76+
if (vertexFetched) {
77+
if (edgesFetched) {
78+
return ClusterProviderStep::FetchedType::VERTEX_AND_EDGES_FETCHED;
79+
} else {
80+
return ClusterProviderStep::FetchedType::VERTEX_FETCHED;
81+
}
82+
} else {
83+
if (edgesFetched) {
84+
return ClusterProviderStep::FetchedType::EDGES_FETCHED;
85+
} else {
86+
return ClusterProviderStep::FetchedType::UNFETCHED;
87+
}
88+
}
89+
}
90+
7391
} // namespace
7492

7593
template<class StepImpl>
@@ -118,7 +136,6 @@ void ClusterProvider<StepImpl>::fetchVerticesFromEngines(
118136
leased->openObject();
119137
leased->add("keys", VPackValue(VPackValueType::Array));
120138
for (auto const& looseEnd : looseEnds) {
121-
TRI_ASSERT(looseEnd->isLooseEnd());
122139
auto const& vertexId = looseEnd->getVertex().getID();
123140
if (!_opts.getCache()->isVertexCached(vertexId)) {
124141
leased->add(VPackValuePair(vertexId.data(), vertexId.length(),
@@ -221,6 +238,7 @@ void ClusterProvider<StepImpl>::fetchVerticesFromEngines(
221238
VPackSlice::nullSlice());
222239
}
223240
result.emplace_back(lE);
241+
lE->setVertexFetched();
224242
}
225243
}
226244

@@ -396,9 +414,8 @@ Result ClusterProvider<StepImpl>::fetchEdgesFromEngines(Step* step) {
396414
}
397415

398416
template<class StepImpl>
399-
auto ClusterProvider<StepImpl>::fetch(std::vector<Step*> const& looseEnds)
400-
-> futures::Future<std::vector<Step*>> {
401-
LOG_TOPIC("03c1b", TRACE, Logger::GRAPHS) << "<ClusterProvider> Fetching...";
417+
auto ClusterProvider<StepImpl>::fetchVertices(
418+
std::vector<Step*> const& looseEnds) -> std::vector<Step*> {
402419
std::vector<Step*> result{};
403420

404421
if (!looseEnds.empty()) {
@@ -413,29 +430,42 @@ auto ClusterProvider<StepImpl>::fetch(std::vector<Step*> const& looseEnds)
413430
VPackSlice::nullSlice());
414431
}
415432
result.emplace_back(lE);
433+
lE->setVertexFetched();
416434
}
417435
} else {
418436
fetchVerticesFromEngines(looseEnds, result);
419437
_stats.incrHttpRequests(_opts.engines()->size() * looseEnds.size());
420438
}
439+
}
440+
return result;
441+
}
421442

422-
for (auto const& step : result) {
423-
if (!_vertexConnectedEdges.contains(step->getVertex().getID())) {
424-
auto res = fetchEdgesFromEngines(step);
425-
_stats.incrHttpRequests(_opts.engines()->size());
426-
427-
if (res.fail()) {
428-
THROW_ARANGO_EXCEPTION(res);
429-
}
443+
template<class StepImpl>
444+
auto ClusterProvider<StepImpl>::fetchEdges(
445+
std::vector<Step*> const& fetchedVertices) -> Result {
446+
for (auto const& step : fetchedVertices) {
447+
if (!_vertexConnectedEdges.contains(step-> F438 getVertex().getID())) {
448+
auto res = fetchEdgesFromEngines(step);
449+
_stats.incrHttpRequests(_opts.engines()->size());
450+
451+
if (res.fail()) {
452+
THROW_ARANGO_EXCEPTION(res);
430453
}
431-
// else: We already fetched this vertex.
432-
433-
// mark a looseEnd as fetched as vertex fetch + edges fetch was a success
434-
step->setFetched();
435454
}
455+
// else: We already fetched this vertex.
456+
457+
// mark a looseEnd as fetched as vertex fetch + edges fetch was a success
458+
step->setEdgesFetched();
436459
}
460+
return TRI_ERROR_NO_ERROR;
461+
}
437462

438-
// Note: Discuss if we want to keep it that way in the future.
463+
template<class StepImpl>
464+
auto ClusterProvider<StepImpl>::fetch(std::vector<Step*> const& looseEnds)
465+
-> futures::Future<std::vector<Step*>> {
466+
LOG_TOPIC("03c1b", TRACE, Logger::GRAPHS) << "<ClusterProvider> Fetching...";
467+
std::vector<Step*> result = fetchVertices(looseEnds);
468+
fetchEdges(result);
439469
return futures::makeFuture(std::move(result));
440470
}
441471

@@ -452,13 +482,15 @@ auto ClusterProvider<StepImpl>::expand(
452482

453483
if (ADB_LIKELY(relations != _vertexConnectedEdges.end())) {
454484
for (auto const& relation : relations->second) {
455-
bool const fetchedTargetVertex =
456-
_vertexConnectedEdges.contains(relation.second);
485+
bool vertexCached = _opts.getCache()->isVertexCached(relation.second);
486+
bool edgesCached = _vertexConnectedEdges.contains(relation.second);
487+
typename Step::FetchedType fetchedType =
488+
::getFetchedType(vertexCached, edgesCached);
457489
// [GraphRefactor] TODO: KShortestPaths does not require Depth/Weight. We
458490
// need a mechanism here as well to distinguish between (non)required
459491
// parameters.
460492
callback(
461-
Step{relation.second, relation.first, previous, fetchedTargetVertex,
493+
Step{relation.second, relation.first, previous, fetchedType,
462494
step.getDepth() + 1,
463495
_opts.weightEdge(step.getWeight(), readEdge(relation.first))});
464496
}

arangod/Graph/Providers/ClusterProvider.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class ClusterProvider {
8383
double weight = 0.0) -> Step;
8484
auto fetch(std::vector<Step*> const& looseEnds)
8585
-> futures::Future<std::vector<Step*>>;
86+
auto fetchVertices(std::vector<Step*> const& looseEnds) -> std::vector<Step*>;
87+
auto fetchEdges(const std::vector<Step*>& fetchedVertices) -> Result;
8688
auto expand(Step const& from, size_t previous,
8789
std::function<void(Step)> const& callback) -> void;
8890

arangod/Graph/Providers/ProviderTracer.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,27 @@ typename ProviderImpl::Step ProviderTracer<ProviderImpl>::startVertex(
6262
return _impl.startVertex(vertex, depth, weight);
6363
}
6464

65+
template<class ProviderImpl>
66+
futures::Future<std::vector<typename ProviderImpl::Step*>>
67+
ProviderTracer<ProviderImpl>::fetchVertices(
68+
std::vector<typename ProviderImpl::Step*> const& looseEnds) {
69+
double start = TRI_microtime();
70+
auto sg = arangodb::scopeGuard([&]() noexcept {
71+
_stats["fetchVertices"].addTiming(TRI_microtime() - start);
72+
});
73+
return _impl.fetchVertices(std::move(looseEnds));
74+
}
75+
76+
template<class ProviderImpl>
77+
Result ProviderTracer<ProviderImpl>::fetchEdges(
78+
std::vector<typename ProviderImpl::Step*> const& fetchedVertices) {
79+
double start = TRI_microtime();
80+
auto sg = arangodb::scopeGuard([&]() noexcept {
81+
_stats["fetchEdges"].addTiming(TRI_microtime() - start);
82+
});
83+
return _impl.fetchEdges(fetchedVertices);
84+
}
85+
6586
template<class ProviderImpl>
6687
futures::Future<std::vector<typename ProviderImpl::Step*>>
6788
ProviderTracer<ProviderImpl>::fetch(

arangod/Graph/Providers/ProviderTracer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class ProviderTracer {
6262

6363
auto startVertex(VertexType vertex, size_t depth = 0, double weight = 0.0)
6464
-> Step;
65+
auto fetchVertices(std::vector<Step*> const& looseEnds)
66+
-> futures::Future<std::vector<Step*>>;
67+
auto fetchEdges(const std::vector<Step*>& fetchedVertices) -> Result;
6568
auto fetch(std::vector<Step*> const& looseEnds)
6669
-> futures::Future<std::vector<Step*>>;
6770
auto expand(Step const& from, size_t previous,

arangod/Graph/Providers/SingleServerProvider.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,23 @@ arangodb::aql::TraversalStats SingleServerProvider<Step>::stealStats() {
234234
return t;
235235
}
236236

237+
template<class StepType>
238+
auto SingleServerProvider<StepType>::fetchVertices(
239+
const std::vector<Step*>& looseEnds)
240+
-> futures::Future<std::vector<Step*>> {
241+
// We will never need to fetch anything
242+
TRI_ASSERT(false);
243+
return std::move(fetch(looseEnds));
244+
}
245+
246+
template<class StepType>
247+
auto SingleServerProvider<StepType>::fetchEdges(
248+
const std::vector<Step*>& fetchedVertices) -> Result {
249+
// We will never need to fetch anything
250+
TRI_ASSERT(false);
251+
return TRI_ERROR_NO_ERROR;
252+
}
253+
237254
template class arangodb::graph::SingleServerProvider<SingleServerProviderStep>;
238255

239256
#ifdef USE_ENTERPRISE

arangod/Graph/Providers/SingleServerProvider.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ class SingleServerProvider {
7373

7474
auto startVertex(VertexType vertex, size_t depth = 0, double weight = 0.0)
7575
-> Step;
76+
auto fetchVertices(std::vector<Step*> const& looseEnds)
77+
-> futures::Future<std::vector<Step*>>;
78+
// dummy function, needed for OneSidedEnumerator::Provider
79+
auto fetchEdges(const std::vector<Step*>& fetchedVertices) -> Result;
7680
auto fetch(std::vector<Step*> const& looseEnds)
7781
-> futures::Future<std::vector<Step*>>; // rocks
7882
auto expand(Step const& from, size_t previous,

arangod/Graph/Queues/FifoQueue.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ class FifoQueue {
6060
guard.steal(); // now we are responsible for tracking the memory
6161
}
6262

63+
bool firstIsVertexFetched() const {
64+
if (not isEmpty()) {
65+
auto const& first = _queue.front();
66+
return first.vertexFetched();
67+
}
68+
return false;
69+
}
70+
71+
// todo: rename to firstElementIsProcessable
6372
bool hasProcessableElement() const {
6473
if (!isEmpty()) {
6574
auto const& first = _queue.front();
@@ -86,6 +95,24 @@ class FifoQueue {
8695
return steps;
8796
}
8897

98+
std::vector<Step*> getStepsWithoutFetchedVertex() {
99+
std::vector<Step*> steps;
100+
for (auto& step : _queue) {
101+
if (not step.vertexFetched()) {
102+
steps.emplace_back(&step);
103+
}
104+
}
105+
return steps;
106+
}
107+
108+
void getStepsWithoutFetchedEdges(std::vector<Step*>& steps) {
109+
for (auto& step : _queue) {
110+
if (!step.edgeFetched()) {
111+
steps.emplace_back(&step);
112+
}
113+
}
114+
}
115+
89116
Step pop() {
90117
TRI_ASSERT(!isEmpty());
91118
Step first = std::move(_queue.front());

arangod/Graph/Queues/LifoQueue.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ class LifoQueue {
6060
guard.steal(); // now we are responsible for tracking the memory
6161
}
6262

63+
bool firstIsVertexFetched() const {
64+
if (not isEmpty()) {
65+
auto const& first = _queue.front();
66+
return first.vertexFetched();
67+
}
68+
return false;
69+
}
70+
6371
bool hasProcessableElement() const {
6472
if (!isEmpty()) {
6573
auto const& first = _queue.front();
@@ -96,6 +104,24 @@ class LifoQueue {
96104
return first;
97105
}
98106

107+
std::vector<Step*> getStepsWithoutFetchedVertex() {
108+
std::vector<Step*> steps{};
109+
for (auto& step : _queue) {
110+
if (!step.vertexFetched()) {
111+
steps.emplace_back(&step);
112+
}
113+
}
114+
return steps;
115+
}
116+
117+
void getStepsWithoutFetchedEdges(std::vector<Step*>& steps) {
118+
for (auto& step : _queue) {
119+
if (!step.edgeFetched()) {
120+
steps.emplace_back(&step);
121+
}
122+
}
123+
}
124+
99125
private:
100126
/// @brief queue datastore
101127
std::deque<Step> _queue;

0 commit comments

Comments
 (0)
0