8000 [Join] SkipRowsRange (#20003) · SwathyPraveen/arangodb@922a6ed · GitHub 8000
[go: up one dir, main page]

Skip to content

Commit 922a6ed

Browse files
Lars Maierhkernbach
andauthored
[Join] SkipRowsRange (arangodb#20003)
* Quick and dirty implementation of skip rows range. * Fixing tests in cluster by pimping the optimizer rule. * attempt to reduce amount of generated testCases per testSuite, by seperating some code in helper functions and creating four testSuite files out of one (reduce results by internal used generator). This is required to not reach CIs timeout limits. This commit might get reverted in case it does not solve our problem. In case it doesn't solve our problem, we need to extend the functionality of testing.js and test-utils.js to be able to not only break down testSuites into seperate Buckets, but add Bucket support for testSuites itself (if required). Currently the first and easier approach is taken, as it seems that a lot of adjustments throughout the code-base is required to implement bucket testSuite support - as currently the overall mechanism is based on fileNames/testSuites itself and not list of testCases. * added missing semicolons --------- Co-authored-by: Heiko Kernbach <heiko@arangodb.com>
1 parent babdc01 commit 922a6ed

9 files changed

+610
-265
lines changed

arangod/Aql/JoinExecutor.cpp

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ auto JoinExecutor::produceRows(AqlItemBlockInputRange& inputRange,
148148

149149
// first do all the filtering and only if all indexes produced a
150150
// value write it into the aql output block
151-
// TODO make a member
152151

153152
std::size_t projectionsOffset = 0;
154153

@@ -323,9 +322,113 @@ auto JoinExecutor::skipRowsRange(AqlItemBlockInputRange& inputRange,
323322

324323
hasMore = _strategy->next([&](std::span<LocalDocumentId> docIds,
325324
std::span<VPackSlice> projections) -> bool {
326-
// TODO post filtering based on projections
325+
auto lookupDocument = [&](std::size_t index, LocalDocumentId id,
326+
auto cb) {
327+
auto result =
328+
_infos.indexes[index]
329+
.collection->getCollection()
330+
->getPhysical()
331+
->lookup(&_trx, id,
332+
{DocumentCallbackOverload{
333+
[&](LocalDocumentId token, auto docPtr) {
334+
cb.template operator()<decltype(docPtr)>(docPtr);
335+
return true;
336+
}}},
337+
{});
338+
if (result.fail()) {
339+
THROW_ARANGO_EXCEPTION_MESSAGE(
340+
result.errorNumber(),
341+
basics::StringUtils::concatT(
342+
"failed to lookup indexed document ", id.id(),
343+
" for collection ", _infos.indexes[index].collection->name(),
344+
": ", result.errorMessage()));
345+
}
346+
};
347+
348+
// first do all the filtering and only if all indexes produced a
349+
// value write it into the aql output block
350+
351+
std::size_t projectionsOffset = 0;
352+
327353
for (std::size_t k = 0; k < docIds.size(); k++) {
328-
// TODO post filter based on document value
354+
auto& idx = _infos.indexes[k];
355+
if (idx.projections.usesCoveringIndex(idx.index)) {
356+
projectionsOffset += idx.projections.size();
357+
}
358+
// evaluate filter conditions
359+
if (!idx.filter.has_value()) {
360+
continue;
361+
}
362+
363+
bool const useFilterProjections =
364+
idx.filter->projections.usesCoveringIndex();
365+
bool filtered = false;
366+
367+
auto filterCallback = [&](auto docPtr) {
368+
auto doc = extractSlice(docPtr);
369+
LOG_JOIN << "INDEX " << k << " read document " << doc.toJson();
370+
GenericDocumentExpressionContext ctx{_trx,
371+
*_infos.query,
372+
_functionsCache,
373+
idx.filter->filterVarsToRegs,
374+
_currentRow,
375+
idx.filter->documentVariable};
376+
ctx.setCurrentDocument(doc);
377+
bool mustDestroy;
378+
AqlValue result = idx.filter->expression->execute(&ctx, mustDestroy);
379+
AqlValueGuard guard(result, mustDestroy);
380+
filtered = !result.toBoolean();
381+
LOG_JOIN << "INDEX " << k << " filter = " << std::boolalpha
382+
<< filtered;
383+
};
384+
385+
auto filterWithProjectionsCallback =
386+
[&](std::span<VPackSlice> projections) {
387+
GenericDocumentExpressionContext ctx{
388+
_trx,
389+
*_infos.query,
390+
_functionsCache,
391+
idx.filter->filterVarsToRegs,
392+
_currentRow,
393+
idx.filter->documentVariable};
394+
ctx.setCurrentDocument(VPackSlice::noneSlice());
395+
396+
TRI_ASSERT(idx.filter->projections.size() == projections.size());
397+
for (size_t j = 0; j < projections.size(); j++) {
398+
TRI_ASSERT(projections[j].start() != nullptr);
399+
LOG_JOIN << "INDEX " << k << " set "
400+
<< idx.filter->filterProjectionVars[j]->id << " = "
401+
<< projections[j].toJson();
402+
ctx.setVariable(idx.filter->filterProjectionVars[j],
403+
projections[j]);
404+
}
405+
406+
bool mustDestroy;
407+
AqlValue result =
408+
idx.filter->expression->execute(&ctx, mustDestroy);
409+
AqlValueGuard guard(result, mustDestroy);
410+
filtered = !result.toBoolean();
411+
};
412+
413+
if (useFilterProjections) {
414+
LOG_JOIN << "projectionsOffset = " << projectionsOffset;
415+
std::span<VPackSlice> projectionRange = {
416+
projections.begin() + projectionsOffset,
417+
projections.begin() + projectionsOffset +
418+
idx.filter->projections.size()};
419+
LOG_JOIN << "INDEX " << k << " unsing filter projections";
420+
filterWithProjectionsCallback(projectionRange);
421+
projectionsOffset += idx.filter->projections.size();
422+
} else {
423+
LOG_JOIN << "INDEX " << k << " looking up document " << docIds[k];
424+
lookupDocument(k, docIds[k], filterCallback);
425+
}
426+
427+
if (filtered) {
428+
// forget about this row
429+
LOG_JOIN << "INDEX " << k << " eliminated pair";
430+
return clientCall.needSkipMore();
431+
}
329432
}
330433

331434
clientCall.didSkip(1);
@@ -334,9 +437,8 @@ auto JoinExecutor::skipRowsRange(AqlItemBlockInputRange& inputRange,
334437

335438
if (!hasMore) {
336439
_currentRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
440+
inputRange.advanceDataRow();
337441
}
338-
339-
inputRange.advanceDataRow();
340442
}
341443

342444
return {inputRange.upstreamState(), Stats{}, clientCall.getSkipCount(),

arangod/Aql/OptimizerRules.cpp

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9160,17 +9160,44 @@ void arangodb::aql::joinIndexNodesRule(Optimizer* opt,
91609160
containers::SmallVector<IndexNode*, 8> candidates;
91619161
IndexNode* indexNode = startNode;
91629162

9163+
containers::SmallVector<CalculationNode*, 8> calculations;
9164+
91639165
while (true) {
91649166
if (handled.contains(indexNode) || !nodeQualifies(*indexNode)) {
91659167
break;
91669168
}
91679169
candidates.emplace_back(indexNode);
91689170
auto* parent = indexNode->getFirstParent();
9169-
if (parent == nullptr || parent->getType() != EN::INDEX) {
9170-
break;
9171+
while (true) {
9172+
if (parent == nullptr) {
9173+
goto endOfIndexNodeSearch;
9174+
} else if (parent->getType() == EN::CALCULATION) {
9175+
// store this calculation and check later if and index depends on
9176+
// it
9177+
auto calc = ExecutionNode::castTo<CalculationNode*>(parent);
9178+
calculations.push_back(calc);
9179+
parent = parent->getFirstParent();
9180+
continue;
9181+
} else if (parent->getType() == EN::INDEX) {
9182+
// check that this index node does not depend on previous
9183+
// calculations
9184+
9185+
indexNode = ExecutionNode::castTo<IndexNode*>(parent);
9186+
VarSet usedVariables;
9187+
indexNode->getVariablesUsedHere(usedVariables);
9188+
for (auto* calc : calculations) {
9189+
if (calc->setsVariable(usedVariables)) {
9190+
// can not join past this calculation
9191+
goto endOfIndexNodeSearch;
9192+
}
9193+
}
9194+
break;
9195+
} else {
9196+
goto endOfIndexNodeSearch;
9197+
}
91719198
}
9172-
indexNode = ExecutionNode::castTo<IndexNode*>(parent);
91739199
}
9200+
endOfIndexNodeSearch:
91749201

91759202
if (candidates.size() >= 2) {
91769203
bool eligible = true;
@@ -9304,6 +9331,8 @@ void arangodb::aql::joinIndexNodesRule(Optimizer* opt,
93049331
JoinNode* jn = plan->createNode<JoinNode>(
93059332
plan.get(), plan->nextId(), std::move(indexInfos),
93069333
IndexIteratorOptions{});
9334+
// Nodes we jumped over (like calculations) are left in place
9335+
// and are now below the Join Node
93079336
plan->replaceNode(candidates[0], jn);
93089337
for (size_t i = 1; i < candidates.size(); ++i) {
93099338
plan->unlinkNode(candidates[i]);

0 commit comments

Comments
 (0)
0