8000 Hybrid Smart Graphs: Adds support for clearing Providers (#14594) · open-bigdata/arangodb@d376ef9 · GitHub
[go: up one dir, main page]

Skip to content

Commit d376ef9

Browse files
authored
Hybrid Smart Graphs: Adds support for clearing Providers (arangodb#14594)
1 parent e1b7112 commit d376ef9

29 files changed

+158
-88
lines changed

arangod/Aql/ClusterNodes.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,9 @@ void DistributeNode::doToVelocyPack(VPackBuilder& builder, unsigned flags) const
333333
{
334334
VPackArrayBuilder guard(&builder);
335335
for (auto const& v : _satellites) {
336+
// if the mapped shard for a collection is empty, it means that
337+
// we have a vertex collection that is only relevant on some of the
338+
// target servers
336339
builder.add(VPackValue(v->name()));
337340
}
338341
}

arangod/Aql/DistributeExecutor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
152152
// We can only have clients we are prepared for
153153
TRI_ASSERT(blockMap.find(client) != blockMap.end());
154154
if (ADB_UNLIKELY(blockMap.find(client) == blockMap.end())) {
155-
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
155+
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
156156
std::string("unexpected client id '") + client + "' found in blockMap");
157157
}
158158
choosenMap[client].emplace_back(i);

arangod/Aql/EngineInfoContainerDBServerServerBased.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ Result EngineInfoContainerDBServerServerBased::buildEngines(
300300
TRI_ASSERT(!_closedSnippets.empty() || !_graphNodes.empty());
301301

302302
ErrorCode cleanupReason = TRI_ERROR_CLUSTER_TIMEOUT;
303-
303+
304304
auto cleanupGuard = scopeGuard([this, &serverToQueryId, &cleanupReason]() noexcept {
305305
try {
306306
transaction::Methods& trx = _query.trxForOptimization();

arangod/Aql/ExecutionBlockImpl.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,15 @@ class TestLambdaSkipExecutor;
159159
160160
template <typename Executor>
161161
constexpr bool executorHasSideEffects =
162-
is_one_of_v<Executor,
162+
is_one_of_v<Executor,
163163
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, InsertModifier>,
164164
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, RemoveModifier>,
165165
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpdateReplaceModifier>,
166166
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpsertModifier>>;
167167
168168
template <typename Executor>
169169
constexpr bool executorCanReturnWaiting =
170-
is_one_of_v<Executor,
170+
is_one_of_v<Executor,
171171
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, InsertModifier>,
172172
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, RemoveModifier>,
173173
ModificationExecutor<SingleRowFetcher<BlockPassthrough::Disable>, UpdateReplaceModifier>,
@@ -617,9 +617,9 @@ static SkipRowsRangeVariant constexpr skipRowsType() {
617617
KShortestPathsExecutor<graph::KShortestPathsFinder>, KShortestPathsExecutor<KPathRefactored>,
618618
KShortestPathsExecutor<KPathRefactoredTracer>, KShortestPathsExecutor<KPathRefactoredCluster>,
619619
KShortestPathsExecutor<KPathRefactoredClusterTracer>, ParallelUnsortedGatherExecutor,
620-
IdExecutor<SingleRowFetcher<BlockPassthrough::Enable>>, IdExecutor<ConstFetcher>, HashedCollectExecutor,
621-
AccuWindowExecutor, WindowExecutor, IndexExecutor, EnumerateCollectionExecutor, DistinctCollectExecutor,
622-
ConstrainedSortExecutor, CountCollectExecutor,
620+
IdExecutor<SingleRowFetcher<BlockPassthrough::Enable>>, IdExecutor<ConstFetcher>,
621+
HashedCollectExecutor, AccuWindowExecutor, WindowExecutor, IndexExecutor, EnumerateCollectionExecutor,
622+
DistinctCollectExecutor, ConstrainedSortExecutor, CountCollectExecutor,
623623
#ifdef ARANGODB_USE_GOOGLE_TESTS
624624
TestLambdaSkipExecutor,
625625
#endif
@@ -779,20 +779,21 @@ auto ExecutionBlockImpl<Executor>::executeFetcher(ExecutionContext& ctx,
779779

780780
// we can safely ignore the result here, because we will try to
781781
// claim the task ourselves anyway.
782+
782783
SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW,
783-
[block = this, task = _prefetchTask,
784+
[block = this, task = _prefetchTask,
784785
stack = ctx.stack]() mutable {
785-
if (!task->tryClaim()) {
786-
return;
787-
}
788-
// task is a copy of the PrefetchTask shared_ptr, and we will only
789-
// attempt to execute the task if we successfully claimed the task.
790-
// i.e., it does not matter if this task lingers around in the
791-
// scheduler queue even after the execution block has been destroyed,
792-
// because in this case we will not be able to claim the task and
793-
// simply return early without accessing the block.
794-
task->execute(*block, stack);
795-
});
786+
if (!task->tryClaim()) {
787+
return;
788+
}
789+
// task is a copy of the PrefetchTask shared_ptr, and we will only
790+
// attempt to execute the task if we successfully claimed the task.
791+
// i.e., it does not matter if this task lingers around in the
792+
// scheduler queue even after the execution block has been destroyed,
793+
// because in this case we will not be able to claim the task and
794+
// simply return early without accessing the block.
795+
task->execute(*block, stack);
796+
});
796797
}
797798

798799
if constexpr (!std::is_same_v<Executor, SubqueryStartExecutor>) {
@@ -1444,8 +1445,7 @@ ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack const& callStack)
14441445
} else {
14451446
// Execute skipSome
14461447
std::tie(state, stats, skippedLocal, call) =
1447-
executeSkipRowsRange(_lastRange, ctx.clientCall);
1448-
}
1448+
executeSkipRowsRange(_lastRange, ctx.clientCall);}
14491449

14501450
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
14511451
// Assertion: We did skip 'skippedLocal' documents here.

arangod/Aql/GraphNode.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class GraphNode : public ExecutionNode {
214214
_isDisjoint = target;
215215
}
216216
#endif
217-
protected:
217+
protected:
218218
void doToVelocyPack(arangodb::velocypack::Builder& nodes, unsigned flags) const override;
219219

220220
void graphCloneHelper(ExecutionPlan& plan, GraphNode& clone, bool withProperties) const;

arangod/Aql/QueryOptions.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,10 @@ void QueryOptions::fromVelocyPack(VPackSlice slice) {
201201
if (value = slice.get("explainRegisters"); value.isBool()) {
202202
explainRegisters = value.getBool() ? ExplainRegisterPlan::Yes : ExplainRegisterPlan::No;
203203
}
204-
204+
205205
// note: skipAudit is intentionally not read here.
206206
// the end user cannot override this setting
207-
207+
208208
if (value = slice.get("forceOneShardAttributeValue"); value.isString()) {
209209
forceOneShardAttributeValue = value.copyString();
210210
}
@@ -272,11 +272,10 @@ void QueryOptions::toVelocyPack(VPackBuilder& builder, bool disableOptimizerRule
272272
builder.add("fullCount", VPackValue(fullCount));
273273
builder.add("count", VPackValue(count));
274274
builder.add("verboseErrors", VPackValue(verboseErrors));
275-
276-
if (!forceOneShardAttributeValue.empty()) {
275+
if (!forceOneShardAttributeValue.empty()) {
277276
builder.add("forceOneShardAttributeValue", VPackValue(forceOneShardAttributeValue));
278277
}
279-
278+
280279
// note: skipAudit is intentionally not serialized here.
281280
// the end user cannot override this setting anyway.
282281

arangod/Aql/RestAqlHandler.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,10 +287,9 @@ void RestAqlHandler::setupClusterQuery() {
287287

288288
double const ttl = options.ttl;
289289
// creates a StandaloneContext or a leased context
290-
auto q = ClusterQuery::create(clusterQueryId,
290+
auto q = ClusterQuery::create(clusterQueryId,
291291
createTransactionContext(access),
292292
std::move(options));
293-
294293
TRI_ASSERT(clusterQueryId == 0 || clusterQueryId == q->id());
295294

296295
VPackBufferUInt8 buffer;

arangod/Aql/ShardLocking.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ void ShardLocking::addNode(ExecutionNode const* baseNode, size_t snippetId,
108108
}
109109
auto const graphIsUsedAsSatellite = graphNode->isUsedAsSatellite();
110110
auto const isUsedAsSatellite = [&](auto const& col) {
111-
return graphIsUsedAsSatellite || (col->isSatellite() && (pushToSingleServer || graphNode->isSmart()));
111+
return graphIsUsedAsSatellite ||
112+
(col->isSatellite() && (pushToSingleServer || graphNode->isSmart()));
112113
};
113114
// Add all Edge Collections to the Transactions, Traversals do never write
114115
for (auto const& col : graphNode->edgeColls()) {

arangod/ClusterEngine/ClusterIndexFactory.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ struct DefaultIndexFactory : public IndexTypeFactory {
5454
: IndexTypeFactory(server), _type(type) {}
5555

5656
bool equal(velocypack::Slice lhs,
57-
velocypack::Slice rhs,
58-
std::string const& dbname) const override {
57+
velocypack::Slice rhs, std::string const& dbname) const override {
5958
auto& clusterEngine =
6059
_server.getFeature<EngineSelectorFeature>().engine<ClusterEngine>();
6160
auto* engine = clusterEngine.actualEngine();
@@ -78,8 +77,7 @@ struct DefaultIndexFactory : public IndexTypeFactory {
7877
auto ct = clusterEngine.engineType();
7978

8079
return std::make_shared<ClusterIndex>(id, collection, ct,
81-
Index::type(_type),
82-
definition);
80+
Index::type(_type), definition);
8381
}
8482

8583
virtual Result normalize(
@@ -98,7 +96,7 @@ struct DefaultIndexFactory : public IndexTypeFactory {
9896
}
9997

10098
return engine->indexFactory().factory(_type).normalize(
101-
normalized, definition, isCreation, vocbase);
99+
normalized, definition, isCreation, vocbase);
102100
}
103101
};
104102

@@ -213,7 +211,7 @@ Result ClusterIndexFactory::enhanceIndexDefinition( // normalize definition
213211
}
214212

215213
return ae->indexFactory().enhanceIndexDefinition(
216-
definition, normalized, isCreation, vocbase);
214+
definition, normalized, isCreation, vocbase);
217215
}
218216

219217
void ClusterIndexFactory::fillSystemIndexes(LogicalCollection& col,
@@ -305,8 +303,8 @@ void ClusterIndexFactory::prepareIndexes(
305303
indexes.emplace_back(std::move(idx));
306304
} catch (std::exception const& ex) {
307305
LOG_TOPIC("7ed52", ERR, Logger::ENGINES)
308-
<< "error creating index from definition '" << v.toString() << "': " << ex.what();
309-
306+
<< "error creating index from definition '" << v.toString()
307+
<< "': " << ex.what();
310308
}
311309
}
312310
}

arangod/Graph/BreadthFirstEnumerator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ bool BreadthFirstEnumerator::shouldPrune() {
346346

347347
void BreadthFirstEnumerator::growStorage() {
348348
size_t capacity = arangodb::containers::Helpers::nextCapacity(_schreier, 8);
349-
349+
350350
if (capacity > _schreier.capacity()) {
351351
arangodb::ResourceUsageScope guard(_opts->resourceMonitor(),
352352
(capacity - _schreier.capacity()) * pathStepSize());

arangod/Graph/Cache/RefactoredClusterTraverserCache.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@ void RefactoredClusterTraverserCache::clear() {
5959

6060
auto RefactoredClusterTraverserCache::cacheVertex(VertexType const& vertexId,
6161
velocypack::Slice vertexSlice) -> void {
62+
ResourceUsageScope guard(_resourceMonitor, ::costPerVertexOrEdgeStringRefSlice);
6263
auto [it, inserted] = _vertexData.try_emplace(vertexId, vertexSlice);
64+
6365
if (inserted) {
6466
// If we have added something into the cache, we need to account for it.
65-
_resourceMonitor.increaseMemoryUsage(costPerVertexOrEdgeStringRefSlice);
67+
guard.steal();
6668
}
6769
}
6870

@@ -97,8 +99,9 @@ auto RefactoredClusterTraverserCache::persistString(arangodb::velocypack::Hashed
9799
}
98100
auto res = _stringHeap.registerString(idString);
99101
ResourceUsageScope guard(_resourceMonitor, ::costPerPersistedString);
100-
101-
_persistedStrings.emplace(res);
102+
103+
auto [itx, inserted] = _persistedStrings.emplace(res);
104+
TRI_ASSERT(inserted);
102105

103106
// now make the TraverserCache responsible for memory tracking
104107
guard.steal();

arangod/Graph/Cursors/RefactoredSingleServerEdgeCursor.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -277,20 +277,20 @@ void RefactoredSingleServerEdgeCursor<Step>::readAll(SingleServerProvider<Step>&
277277
} else {
278278
cursor.all([&](LocalDocumentId const& token) {
279279
return collection->getPhysical()
280-
->read(
281-
_trx, token,
282-
[&](LocalDocumentId const&, VPackSlice edgeDoc) {
283-
stats.addScannedIndex(1);
280+
->read(_trx, token,
281+
[&](LocalDocumentId const&, VPackSlice edgeDoc) {
282+
stats.addScannedIndex(1);
284283
#ifdef USE_ENTERPRISE
285-
if (_trx->skipInaccessible()) {
286-
// TODO: we only need to check one of these
287-
VPackSlice from =
288-
transaction::helpers::extractFromFromDocument(edgeDoc);
289-
VPackSlice to = transaction::helpers::extractToFromDocument(edgeDoc);
290-
if (CheckInaccessible(_trx, from) || CheckInaccessible(_trx, to)) {
291-
return false;
292-
}
293-
}
284+
if (_trx->skipInaccessible()) {
285+
// TODO: we only need to check one of these
286+
VPackSlice from =
287+
transaction::helpers::extractFromFromDocument(edgeDoc);
288+
VPackSlice to =
289+
transaction::helpers::extractToFromDocument(edgeDoc);
290+
if (CheckInaccessible(_trx, from) || CheckInaccessible(_trx, to)) {
291+
return false;
292+
}
293+
}
294294
#endif
295295
// eval depth-based expression first if available
296296
EdgeDocumentToken edgeToken(cid, token);

arangod/Graph/Enumerators/OneSidedEnumerator.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,30 @@ void OneSidedEnumerator<Configuration>::clear(bool keepPathStore) {
7373
}
7474
_queue.clear();
7575
_results.clear();
76+
77+
if (!keepPathStore) {
78+
_interior.reset();
79+
clearProvider(); // TODO: Check usage of keepPathStore (if necessary)
80+
}
81+
}
82+
83+
tem 10000 plate <class Configuration>
84+
void OneSidedEnumerator<Configuration>::clearProvider() {
85+
// Guarantee that the used Queue is empty and we do not hold any reference to PathStore.
86+
// Info: Steps do contain VertexRefs which are hold in PathStore.
87+
TRI_ASSERT(_queue.isEmpty());
88+
89+
// Guarantee that _results is empty. Steps are contained in _results and do contain
90+
// Steps which do contain VertexRefs which are hold in PathStore.
91+
TRI_ASSERT(_results.empty());
92+
93+
// Guarantee that the used PathStore is cleared, before we clear the Provider.
94+
// The Provider does hold the StringHeap cache.
95+
TRI_ASSERT(_interior.size() == 0);
96+
97+
// ProviderStore must be cleared as last (!), as we do have multiple places holding
98+
// references to contained VertexRefs there.
99+
_provider.clear(); // PathStore
76100
}
77101

78102
template <class Configuration>

arangod/Graph/Enumerators/OneSidedEnumerator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class OneSidedEnumerator : public TraversalEnumerator {
146146
// Ensure that we have more valid paths in the _result stock.
147147
// May be a noop if _result is not empty.
148148
auto searchMoreResults() -> void;
149+
void clearProvider();
149150

150151
private:
151152
GraphOptions _options;

arangod/Graph/Enumerators/TwoSidedEnumerator.cpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,31 @@ void TwoSidedEnumerator<QueueType, PathStoreType, ProviderType, PathValidator>::
7777

7878
template <class QueueType, class PathStoreType, class ProviderType, class PathValidator>
7979
void TwoSidedEnumerator<QueueType, PathStoreType, ProviderType, PathValidator>::Ball::clear() {
80-
_shell.clear();
81-
_interior.reset();
82-
_queue.clear();
8380
_depth = 0;
81+
_queue.clear();
82+
_shell.clear();
83+
_interior.reset(); // PathStore
84+
85+
// Provider - Must be last one to be cleared(!)
86+
clearProvider();
87+
}
88+
89+
template <class QueueType, class PathStoreType, class ProviderType, class PathValidator>
90+
void TwoSidedEnumerator<QueueType, PathStoreType, ProviderType, PathValidator>::Ball::clearProvider() {
91+
// We need to make sure, no one holds references to _provider.
92+
// Guarantee that the used Queue is empty and we do not hold any reference to PathStore.
93+
// Info: Steps do contain VertexRefs which are hold in PathStore.
94+
TRI_ASSERT(_queue.isEmpty());
95+
96+
// Guarantee that _shell is empty. Steps are contained in _shell and those
97+
// do contain VertexRefs which are hold in PathStore.
98+
TRI_ASSERT(_shell.empty());
99+
100+
// Guarantee that the used PathStore is cleared, before we clear the Provider.
101+
// The Provider does hold the StringHeap cache.
102+
TRI_ASSERT(_interior.size() == 0);
103+
104+
_provider.clear();
84105
}
85106

86107
template <class QueueType, class PathStoreType, class ProviderType, class PathValidator>
@@ -270,9 +291,13 @@ auto TwoSidedEnumerator<QueueType, PathStoreType, ProviderType, PathValidator>::
270291

271292
template <class QueueType, class PathStoreType, class ProviderType, class PathValidator>
272293
void TwoSidedEnumerator<QueueType, PathStoreType, ProviderType, PathValidator>::clear() {
294+
// Order is important here, please do not change.
295+
// 1.) Remove current results
296+
_results.clear();
297+
298+
// 2.) Remove both Balls (order here is not important)
273299
_left.clear();
274300
_right.clear();
275-
_results.clear();
276301
}
277302

278303
/**

arangod/Graph/Enumerators/TwoSidedEnumerator.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ class TwoSidedEnumerator {
9595

9696
auto provider() -> ProviderType&;
9797

98+
private:
99+
auto clearProvider() -> void;
100+
98101
private:
99102
// Fast path, to test if we find a connecting vertex between left and right.
100103
Shell _shell{};
@@ -129,7 +132,7 @@ class TwoSidedEnumerator {
129132

130133
~TwoSidedEnumerator();
131134

132-
void clear();
135+
auto clear() -> void;
133136

134137
/**
135138
* @brief Quick test if the finder can prove there is no more data available.
@@ -208,4 +211,3 @@ class TwoSidedEnumerator {
208211
};
209212
} // namespace graph
210213
} // namespace arangodb
211-

0 commit comments

Comments
 (0)
0