8000 Improve query memory usage tracking for graph operations by jsteemann · Pull Request #14535 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Improve query memory usage tracking for graph operations 8000 #14535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Improve query memory usage tracking for graph operations
* Include K_SHORTEST_PATHS and SHORTEST_PATH execution nodes in AQL query
  memory usage accounting. The memory used by these execution node types was
  previously not tracked against the configured query memory limit.
  • Loading branch information
jsteemann committed Jul 22, 2021
commit 26fe8cc425fb829f41dc5021fe1d1a4096544075
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
devel
-----

* Include K_SHORTEST_PATHS and SHORTEST_PATH execution nodes in AQL query
memory usage accounting. The memory used by these execution node types was
previously not tracked against the configured query memory limit.

* Make `--javascript.copy-installation` also copy the `node_modules` sub
directory. This is required so we have a full copy of the JavaScript
dependencies and not one that excludes some infrequently changed modules.
Expand Down
78 changes: 57 additions & 21 deletions arangod/Graph/AttributeWeightShortestPathFinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "AttributeWeightShortestPathFinder.h"

#include "Basics/Exceptions.h"
#include "Basics/ResourceUsage.h"
#include "Basics/tryEmplaceHelper.h"
#include "Graph/EdgeCursor.h"
#include "Graph/EdgeDocumentToken.h"
Expand Down Expand Up @@ -125,7 +126,7 @@ bool AttributeWeightShortestPathFinder::Searcher::oneStep() {
bool b = _myInfo._pq.popMinimal(v, s);

if (_pathFinder->_bingo || !b) {
// We can leave this functino only under 2 conditions:
// We can leave this function only under 2 conditions:
// 1) already bingo==true => bingo = true no effect
// 2) This queue is empty => if there would be a
// path we would have found it here
Expand All @@ -136,12 +137,15 @@ bool AttributeWeightShortestPathFinder::Searcher::oneStep() {

TRI_ASSERT(s != nullptr);

std::vector<std::unique_ptr<Step>> neighbors;
_pathFinder->expandVertex(_backward, v, neighbors);
for (std::unique_ptr<Step>& neighbor : neighbors) {
_neighbors.clear();
// populates _neighbors
_pathFinder->expandVertex(_backward, v, _neighbors);

for (auto& neighbor : _neighbors) {
insertNeighbor(std::move(neighbor), s->weight() + neighbor->weight());
}
// All neighbours are moved out.
_neighbors.clear();
// All neighbors are moved out.
lookupPeer(v, s->weight());

Step* s2 = _myInfo._pq.find(v);
Expand All @@ -154,6 +158,7 @@ bool AttributeWeightShortestPathFinder::Searcher::oneStep() {

AttributeWeightShortestPathFinder::AttributeWeightShortestPathFinder(ShortestPathOptions& options)
: ShortestPathFinder(options),
_resourceMonitor(options.resourceMonitor()),
_highscoreSet(false),
_highscore(0),
_bingo(false),
Expand All @@ -165,7 +170,15 @@ AttributeWeightShortestPathFinder::AttributeWeightShortestPathFinder(ShortestPat
_backwardCursor = _options.buildCursor(true);
}

AttributeWeightShortestPathFinder::~AttributeWeightShortestPathFinder() = default;
AttributeWeightShortestPathFinder::~AttributeWeightShortestPathFinder() {
// required for memory usage tracking
clearCandidates();
}

void AttributeWeightShortestPathFinder::clearCandidates() noexcept {
_resourceMonitor.decreaseMemoryUsage(_candidates.size() * candidateMemoryUsage());
_candidates.clear();
}

void AttributeWeightShortestPathFinder::clear() {
options().cache()->clear();
Expand All @@ -174,6 +187,7 @@ void AttributeWeightShortestPathFinder::clear() {
_bingo = false;
_intermediateSet = false;
_intermediate = arangodb::velocypack::StringRef{};
clearCandidates();
}

bool AttributeWeightShortestPathFinder::shortestPath(arangodb::velocypack::Slice const& st,
Expand Down Expand Up @@ -206,7 +220,7 @@ bool AttributeWeightShortestPathFinder::shortestPath(arangodb::velocypack::Slice
Searcher forwardSearcher(this, forward, backward, start, false);
std::unique_ptr<Searcher> backwardSearcher;
if (_options.bidirectional) {
backwardSearcher.reset(new Searcher(this, backward, forward, target, true));
backwardSearcher = std::make_unique<Searcher>(this, backward, forward, target, true);
}

TRI_IF_FAILURE("TraversalOOMInitialize") {
Expand Down Expand Up @@ -235,6 +249,9 @@ bool AttributeWeightShortestPathFinder::shortestPath(arangodb::velocypack::Slice
}

Step* s = forward._pq.find(_intermediate);

// track memory usage for result buildup.
ResourceUsageScope guard(_resourceMonitor);

result._vertices.emplace_back(_intermediate);

Expand All @@ -251,6 +268,8 @@ bool AttributeWeightShortestPathFinder::shortestPath(arangodb::velocypack::Slice
break;
}

guard.increase(arangodb::graph::ShortestPathResult::resultItemMemoryUsage());

result._edges.push_front(std::move(s->_edge));
result._vertices.push_front(arangodb::velocypack::StringRef(s->_predecessor));
s = forward._pq.find(s->_predecessor);
Expand All @@ -270,6 +289,8 @@ bool AttributeWeightShortestPathFinder::shortestPath(arangodb::velocypack::Slice
if (s->_predecessor.empty()) {
break;
}

guard.increase(arangodb::graph::ShortestPathResult::resultItemMemoryUsage());

result._edges.emplace_back(std::move(s->_edge));
result._vertices.emplace_back(arangodb::velocypack::StringRef(s->_predecessor));
Expand All @@ -281,23 +302,28 @@ bool AttributeWeightShortestPathFinder::shortestPath(arangodb::velocypack::Slice
}

_options.fetchVerticesCoordinator(result._vertices);
// we intentionally don't commit the memory usage to the _resourceMonitor here.
return true;
}

void AttributeWeightShortestPathFinder::inserter(
std::unordered_map<arangodb::velocypack::StringRef, size_t>& candidates,
std::vector<std::unique_ptr<Step>>& result,
arangodb::velocypack::StringRef const& s, arangodb::velocypack::StringRef const& t,
double currentWeight, EdgeDocumentToken&& edge) {

ResourceUsageScope guard(_resourceMonitor, candidateMemoryUsage());

auto [cand, emplaced] =
candidates.try_emplace(t, arangodb::lazyConstruct([&] {
result.emplace_back(
std::make_unique<Step>(t, s, currentWeight,
std::move(edge)));
return result.size() - 1;
}));

if (!emplaced) {
_candidates.try_emplace(t, arangodb::lazyConstruct([&] {
result.emplace_back(
std::make_unique<Step>(t, s, currentWeight,
std::move(edge)));
return result.size() - 1;
}));
if (emplaced) {
// new candidate created. now candiates are responsible for memory usage tracking
guard.steal();
} else {
// Compare weight
auto& old = result[cand->second];
auto oldWeight = old->weight();
Expand All @@ -312,20 +338,22 @@ void AttributeWeightShortestPathFinder::inserter(
void AttributeWeightShortestPathFinder::expandVertex(
bool backward, arangodb::velocypack::StringRef const& vertex,
std::vector<std::unique_ptr<Step>>& result) {
TRI_ASSERT(result.empty());

EdgeCursor* cursor = backward ? _backwardCursor.get() : _forwardCursor.get();
cursor->rearm(vertex, 0);

std::unordered_map<arangodb::velocypack::StringRef, size_t> candidates;
clearCandidates();
cursor->readAll([&](EdgeDocumentToken&& eid, VPackSlice edge, size_t cursorIdx) -> void {
if (edge.isString()) {
VPackSlice doc = _options.cache()->lookupToken(eid);
double currentWeight = _options.weightEdge(doc);
arangodb::velocypack::StringRef other =
_options.cache()->persistString(arangodb::velocypack::StringRef(edge));
if (other.compare(vertex) != 0) {
inserter(candidates, result, vertex, other, currentWeight, std::move(eid));
inserter(result, vertex, other, currentWeight, std::move(eid));
} else {
inserter(candidates, result, other, vertex, currentWeight, std::move(eid));
inserter(result, other, vertex, currentWeight, std::move(eid));
}
} else {
arangodb::velocypack::StringRef fromTmp(
Expand All @@ -335,12 +363,20 @@ void AttributeWeightShortestPathFinder::expandVertex(
arangodb::velocypack::StringRef to = _options.cache()->persistString(toTmp);
double currentWeight = _options.weightEdge(edge);
if (from == vertex) {
inserter(candidates, result, from, to, currentWeight, std::move(eid));
inserter(result, from, to, currentWeight, std::move(eid));
} else {
inserter(candidates, result, to, from, currentWeight, std::move(eid));
inserter(result, to, from, currentWeight, std::move(eid));
}
}
});

clearCandidates();
}

size_t AttributeWeightShortestPathFinder::candidateMemoryUsage() const noexcept {
return 16 /*arbitrary overhead*/ +
sizeof(decltype(_candidates)::key_type) +
sizeof(decltype(_candidates)::value_type);
}

/*
Expand Down
43 changes: 26 additions & 17 deletions arangod/Graph/AttributeWeightShortestPathFinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@

#pragma once

#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"

#include "Graph/EdgeDocumentToken.h"
#include "Graph/ShortestPathFinder.h"
#include "Graph/ShortestPathPriorityQueue.h"

#include <velocypack/StringRef.h>

#include <memory>
#include <thread>

namespace arangodb {
struct ResourceMonitor;

namespace graph {
class EdgeCursor;
Expand Down Expand Up @@ -87,7 +84,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {

struct ThreadInfo {
PQueue _pq;
arangodb::Mutex _mutex;
// arangodb::Mutex _mutex;
};

//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -182,6 +179,8 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {
ThreadInfo& _peerInfo;
arangodb::velocypack::StringRef _start;
bool _backward;
/// @brief temp value, which is used only in Searcher::oneStep() and recycled.
std::vector<std::unique_ptr<Step>> _neighbors;
};

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -213,16 +212,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {
bool shortestPath(arangodb::velocypack::Slice const& start,
arangodb::velocypack::Slice const& target,
arangodb::graph::ShortestPathResult& result) override;

void inserter(std::unordered_map<arangodb::velocypack::StringRef, size_t>& candidates,
std::vector<std::unique_ptr<Step>>& result,
arangodb::velocypack::StringRef const& s,
arangodb::velocypack::StringRef const& t, double currentWeight,
graph::EdgeDocumentToken&& edge);

void expandVertex(bool backward, arangodb::velocypack::StringRef const& source,
std::vector<std::unique_ptr<Step>>& result);


//////////////////////////////////////////////////////////////////////////////
/// @brief return the shortest path between the start and target vertex,
/// multi-threaded version using SearcherTwoThreads.
Expand All @@ -237,6 +227,21 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {
arangodb::velocypack::Slice& target,
arangodb::graph::ShortestPathResult& result);
*/

private:

void inserter(std::vector<std::unique_ptr<Step>>& result,
arangodb::velocypack::StringRef const& s,
arangodb::velocypack::StringRef const& t, double currentWeight,
graph::EdgeDocumentToken&& edge);

void expandVertex(bool backward, arangodb::velocypack::StringRef const& source,
std::vector<std::unique_ptr<Step>>& result);

void clearCandidates() noexcept;
size_t candidateMemoryUsage() const noexcept;

arangodb::ResourceMonitor& _resourceMonitor;

//////////////////////////////////////////////////////////////////////////////
/// @brief lowest total weight for a complete path found
Expand All @@ -260,7 +265,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {
/// @brief _resultMutex, this is used to protect access to the result data
//////////////////////////////////////////////////////////////////////////////

arangodb::Mutex _resultMutex;
// arangodb::Mutex _resultMutex;

//////////////////////////////////////////////////////////////////////////////
/// @brief _intermediate, one vertex on the shortest path found, flag
Expand All @@ -270,7 +275,11 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {

bool _intermediateSet;
arangodb::velocypack::StringRef _intermediate;


/// @brief temporary value, which is going to be populate in inserter,
/// and recycled between calls
std::unordered_map<arangodb::velocypack::StringRef, size_t> _candidates;

std::unique_ptr<EdgeCursor> _forwardCursor;
std::unique_ptr<EdgeCursor> _backwardCursor;
};
Expand Down
Loading
0