8000 working - not done yet · arangodb/arangodb@aa275b4 · GitHub
[go: up one dir, main page]

Skip to content

Commit aa275b4

Browse files
committed
working - not done yet
1 parent 4d019a1 commit aa275b4

File tree

3 files changed

+220
-10
lines changed

3 files changed

+220
-10
lines changed

arangod/Aql/TraversalExecutor.cpp

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
////////////////////////////////////////////////////////////////////////////////
2222

2323
#include "TraversalExecutor.h"
24+
#include <Logger/LogMacros.h>
2425

26+
#include "Aql/AqlCall.h"
27+
#include "Aql/AqlItemBlockInputRange.h"
2528
#include "Aql/ExecutionNode.h"
2629
#include "Aql/OutputAqlItemRow.h"
2730
#include "Aql/PruneExpressionEvaluator.h"
@@ -248,6 +251,57 @@ std::pair<ExecutionState, TraversalStats> TraversalExecutor::produceRows(OutputA
248251
return {ExecutionState::DONE, s};
249252
}
250253

254+
std::tuple<ExecutorState, TraversalStats, AqlCall> TraversalExecutor::produceRows(
255+
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
256+
TraversalStats s;
257+
258+
while (inputRange.hasMore() && limit > 0) {
259+
auto const& [state, input] = inputRange.next();
260+
LOG_DEVEL << "ExecutorState: " << state << " - remove me after review";
261+
262+
if (!resetTraverser(input)) {
263+
// Could not start here, (invalid)
264+
// Go to next
265+
continue;
266+
}
267+
268+
if (!_traverser.hasMore() || !_traverser.next()) {
269+
// Nothing more to read, reset input to refetch
270+
continue;
271+
} else {
272+
// traverser now has next v, e, p values
273+
if (_infos.useVertexOutput()) {
274+
AqlValue vertex = _traverser.lastVertexToAqlValue();
275+
AqlValueGuard guard{vertex, true};
276+
output.moveValueInto(_infos.vertexRegister(), input, guard);
277+
}
278+
if (_infos.useEdgeOutput()) {
279+
AqlValue edge = _traverser.lastEdgeToAqlValue();
280+
AqlValueGuard guard{edge, true};
281+
output.moveValueInto(_infos.edgeRegister(), input, guard);
282+
}
283+
if (_infos.usePathOutput()) {
284+
transaction::BuilderLeaser tmp(_traverser.trx());
285+
tmp->clear();
286+
AqlValue path = _traverser.pathToAqlValue(*tmp.builder());
287+
AqlValueGuard guard{path, true};
288+
output.moveValueInto(_infos.pathRegister(), input, guard);
289+
}
290+
output.advanceRow();
291+
limit--;
292+
}
293+
}
294+
295+
// we are done
296+
s.addFiltered(_traverser.getAndResetFilteredPaths());
297+
s.addScannedIndex(_traverser.getAndResetReadDocuments());
298+
s.addHttpRequests(_traverser.getAndResetHttpRequests());
299+
300+
AqlCall upstreamCall{};
301+
upstreamCall.softLimit = limit;
302+
return {inputRange.peek().first, s, upstreamCall};
303+
}
304+
251305
ExecutionState TraversalExecutor::computeState() const {
252306
if (_rowState == ExecutionState::DONE && !_traverser.hasMore()) {
253307
return ExecutionState::DONE;
@@ -310,3 +364,59 @@ bool TraversalExecutor::resetTraverser() {
310364
}
311365
}
312366
}
367+
368+
bool TraversalExecutor::resetTraverser(InputAqlItemRow const& input) {
369+
_traverser.traverserCache()->clear();
370+
371+
// Initialize the Expressions within the options.
372+
// We need to find the variable and read its value here. Everything is
373+
// computed right now.
374+
auto opts = _traverser.options();
375+
opts->clearVariableValues();
376+
for (auto const& pair : _infos.filterConditionVariables()) {
377+
opts->setVariableValue(pair.first, input.getValue(pair.second));
378+
}
379+
if (opts->usesPrune()) {
380+
auto* evaluator = opts->getPruneEvaluator();
381+
// Replace by inputRow
382+
evaluator->prepareContext(input);
383+
}
384+
// Now reset the traverser
385+
if (_infos.usesFixedSource()) {
386+
auto pos = _infos.getFixedSource().find('/');
387+
if (pos == std::string::npos) {
388+
_traverser.options()->query()->registerWarning(
389+
TRI_ERROR_BAD_PARAMETER,
390+
"Invalid input for traversal: "
391+
"Only id strings or objects with "
392+
"_id are allowed");
393+
return false;
394+
} else {
395+
// Use constant value
396+
_traverser.setStartVertex(_infos.getFixedSource());
397+
return true;
398+
}
399+
} else {
400+
AqlValue const& in = input.getValue(_infos.getInputRegister());
401+
if (in.isObject()) {
402+
try {
403+
_traverser.setStartVertex(_traverser.options()->trx()->extractIdString(in.slice()));
404+
return true;
405+
} catch (...) {
406+
// on purpose ignore this error.
407+
return false;
408+
}
409+
// _id or _key not present we cannot start here, register warning take next
410+
} else if (in.isString()) {
411+
_traverser.setStartVertex(in.slice().copyString());
412+
return true;
413+
} else {
414+
_traverser.options()->query()->registerWarning(
415+
TRI_ERROR_BAD_PARAMETER,
416+
"Invalid input for traversal: Only "
417+
"id strings or objects with _id are "
418+
"allowed");
419+
return false;
420+
}
421+
}
422+
}

arangod/Aql/TraversalExecutor.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class Traverser;
3838

3939
namespace aql {
4040

41+
struct AqlCall;
42+
class AqlItemBlockInputRange;
4143
class Query;
4244
class OutputAqlItemRow;
4345
class ExecutorInfos;
@@ -138,6 +140,15 @@ class TraversalExecutor {
138140
*/
139141
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
140142

143+
/**
144+
* @brief produce the next Row of Aql Values.
145+
*
146+
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
147+
*/
148+
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t limit,
149+
AqlItemBlockInputRange& inputRange,
150+
OutputAqlItemRow& output);
151+
141152
private:
142153
/**
143154
* @brief compute the return state
@@ -146,6 +157,7 @@ class TraversalExecutor {
146157
ExecutionState computeState() const;
147158

148159
bool resetTraverser();
160+
bool resetTraverser(InputAqlItemRow const& input);
149161

150162
private:
151163
Infos& _infos;

tests/Aql/TraversalExecutorTest.cpp

Lines changed: 98 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
/// @author Michael Hackstein
2121
////////////////////////////////////////////////////////////////////////////////
2222

23+
#include "AqlItemBlockHelper.h"
2324
#include "RowFetcherHelper.h"
2425
#include "gtest/gtest.h"
2526

27+
#include "Aql/AqlCall.h"
2628
#include "Aql/AqlItemBlock.h"
2729
#include "Aql/ExecutionNode.h"
2830
#include "Aql/InputAqlItemRow.h"
@@ -295,7 +297,8 @@ class TraversalExecutorTestInputStartVertex : public ::testing::Test {
295297

296298
TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_producer_doesnt_wait) {
297299
VPackBuilder input;
298-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false);
300+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
301+
itemBlockManager, input.steal(), false);
299302
TraversalExecutor testee(fetcher, infos);
300303
TraversalStats stats{};
301304

@@ -308,7 +311,8 @@ TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_produce
308311

309312
TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_producer_waits) {
310313
VPackBuilder input;
311-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true);
314+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
315+
itemBlockManager, input.steal(), true);
312316
TraversalExecutor testee(fetcher, infos);
313317
TraversalStats stats{};
314318

@@ -330,7 +334,8 @@ TEST_F(TraversalExecutorTestInputStartVertex, there_are_rows_upstream_producer_d
330334
myGraph.addVertex("2");
331335
myGraph.addVertex("3");
332336
auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])");
333-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false);
337+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
338+
itemBlockManager, input->steal(), false);
334339
TraversalExecutor testee(fetcher, infos);
335340
TraversalStats stats{};
336341

@@ -362,7 +367,8 @@ TEST_F(TraversalExecutorTestInputStartVertex,
362367
myGraph.addVertex("2");
363368
myGraph.addVertex("3");
364369
auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])");
365-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
370+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
371+
itemBlockManager, input->steal(), true);
366372
TraversalExecutor testee(fetcher, infos);
367373
TraversalStats stats{};
368374

@@ -400,7 +406,8 @@ TEST_F(TraversalExecutorTestInputStartVertex,
400406
myGraph.addVertex("2");
401407
myGraph.addVertex("3");
402408
auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])");
403-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
409+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
410+
itemBlockManager, input->steal(), true);
404411
TraversalExecutor testee(fetcher, infos);
405412
TraversalStats stats{};
406413

@@ -501,7 +508,8 @@ class TraversalExecutorTestConstantStartVertex : public ::testing::Test {
501508

502509
TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_doesnt_wait) {
503510
VPackBuilder input;
504-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false);
511+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
512+
itemBlockManager, input.steal(), false);
505513
TraversalExecutor testee(fetcher, infos);
506514
TraversalStats stats{};
507515

@@ -514,7 +522,8 @@ TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_doesn
514522

515523
TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_waits) {
516524
VPackBuilder input;
517-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true);
525+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
526+
itemBlockManager, input.steal(), true);
518527
TraversalExecutor testee(fetcher, infos);
519528
TraversalStats stats{};
520529

@@ -537,7 +546,8 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_doesnt_w
537546
myGraph.addVertex("3");
538547
auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])");
539548

540-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false);
549+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
550+
itemBlockManager, input->steal(), false);
541551
TraversalExecutor testee(fetcher, infos);
542552
TraversalStats stats{};
543553

@@ -569,7 +579,8 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_no
569579
myGraph.addVertex("3");
570580
auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])");
571581

572-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
582+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
583+
itemBlockManager, input->steal(), true);
573584
TraversalExecutor testee(fetcher, infos);
574585
TraversalStats stats{};
575586
OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(),
@@ -606,7 +617,8 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_ed
606617
myGraph.addVertex("3");
607618
auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])");
608619

609-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
620+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
621+
itemBlockManager, input->steal(), true);
610622
TraversalExecutor testee(fetcher, infos);
611623
TraversalStats stats{};
612624
myGraph.addEdge("1", "2", "1->2");
@@ -656,6 +668,82 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_ed
656668
}
657669
}
658670

671+
TEST_F(TraversalExecutorTestInputStartVertex, test_produce_datarange_no_edges_are_connected) {
672+
myGraph.addVertex("1");
673+
myGraph.addVertex("2");
674+
myGraph.addVertex("3");
675+
676+
// This fetcher will not be called!
677+
// After Execute is done this fetcher shall be removed, the Executor does not need it anymore!
678+
auto fakeUnusedBlock = VPackParser::fromJson("[ ]");
679+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
680+
itemBlockManager, fakeUnusedBlock->steal(), false);
681+
682+
// This is the relevant part of the test
683+
TraversalExecutor testee(fetcher, infos);
684+
SharedAqlItemBlockPtr inBlock =
685+
buildBlock<1>(itemBlockManager, {{R"("v/1")"}, {R"("v/2")"}, {R"("v/3")"}});
686+
687+
AqlItemBlockInputRange input{ExecutorState::DONE, inBlock, 0, inBlock->size()};
688+
OutputAqlItemRow output(std::move(block), infos.getOutputRegisters(),
689+
infos.registersToKeep(), infos.registersToClear());
690+
EXPECT_EQ(output.numRowsWritten(), 0);
691+
auto const [state, stats, call] = testee.produceRows(1000, input, output);
692+
EXPECT_EQ(state, ExecutorState::DONE);
693+
694+
ASSERT_EQ(stats.getFiltered(), 0);
695+
ASSERT_FALSE(output.produced());
696+
697+
ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1");
698+
ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2");
699+
ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3");
700+
}
701+
702+
TEST_F(TraversalExecutorTestConstantStartVertex, test_produce_datarange_edges_are_connected) {
703+
myGraph.addVertex("1");
704+
myGraph.addVertex("2");
705+
myGraph.addVertex("3");
706+
707+
// This fetcher will not be called!
708+
// After Execute is done this fetcher shall be removed, the Executor does not need it anymore!
709+
auto fakeUnusedBlock = VPackParser::fromJson("[ ]");
710+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
711+
itemBlockManager, fakeUnusedBlock->steal(), false);
712+
713+
// This is the relevant part of the test
714+
TraversalExecutor testee(fetcher, infos);
715+
SharedAqlItemBlockPtr inBlock =
716+
buildBlock<1>(itemBlockManager, {{R"("v/1")"}, {R"("v/2")"}, {R"("v/3")"}});
717+
718+
myGraph.addEdge("1", "2", "1->2");
719+
myGraph.addEdge("2", "3", "2->3");
720+
myGraph.addEdge("3", "1", "3->1");
721+
722+
AqlItemBlockInputRange input{ExecutorState::DONE, inBlock, 0, inBlock->size()};
723+
OutputAqlItemRow output(std::move(block), infos.getOutputRegisters(),
724+
infos.registersToKeep(), infos.registersToClear());
725+
726+
EXPECT_EQ(output.numRowsWritten(), 0);
727+
auto const [state, stats, call] = testee.produceRows(1000, input, output);
728+
EXPECT_EQ(state, ExecutorState::DONE);
729+
730+
ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1");
731+
ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2");
732+
ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3");
733+
734+
std::vector<std::string> expectedResult{"v/2", "v/3", "v/1"};
735+
auto block = output.stealBlock();
736+
for (std::size_t index = 0; index < 3; index++) {
737+
AqlValue value = block->getValue(index, outReg);
738+
ASSERT_TRUE(value.isObject());
739+
ASSERT_TRUE(arangodb::basics::VelocyPackHelper::compare(
740+
value.slice(),
741+
myGraph.getVertexData(
742+
arangodb::velocypack::StringRef(expectedResult.at(index))),
743+
false) == 0);
744+
}
745+
}
746+
659747
} // namespace aql
660748
} // namespace tests
661749
} // namespace arangodb

0 commit comments

Comments
 (0)
11 0