8000 added first implementation of count collect datarange produceRows fun… · arangodb/arangodb@2368252 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2368252

Browse files
committed
added first implementation of count collect datarange produceRows function + test
1 parent dccbbcf commit 2368252

File tree

3 files changed

+76
-4
lines changed

3 files changed

+76
-4
lines changed

arangod/Aql/CountCollectExecutor.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
#include "CountCollectExecutor.h"
2727

28+
#include "Aql/AqlCall.h"
29+
#include "Aql/AqlItemBlockInputRange.h"
2830
#include "Aql/AqlValue.h"
2931
#include "Aql/ExecutorInfos.h"
3032
#include "Aql/InputAqlItemRow.h"
@@ -93,6 +95,33 @@ std::pair<ExecutionState, NoStats> CountCollectExecutor::produceRows(OutputAqlIt
9395
return {_state, NoStats{}};
9496
}
9597

98+
std::tuple<ExecutorState, NoStats, AqlCall> CountCollectExecutor::produceRows(
99+
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
100+
TRI_IF_FAILURE("CountCollectExecutor::produceRows") {
101+
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
102+
}
103+
InputAqlItemRow input{CreateInvalidInputRowHint{}};
104+
105+
while (inputRange.hasMore() && limit > 0) {
106+
std::tie(_executorState, input) = inputRange.next();
107+
108+
limit--;
109+
_count++;
110+
}
111+
112+
// In general, we do not have an input row. In fact, we never fetch one.
113+
output.setAllowSourceRowUninitialized();
114+
115+
// We must produce exactly one output row.
116+
output.cloneValueInto(_infos.getOutputRegisterId(),
117+
InputAqlItemRow{CreateInvalidInputRowHint{}},
118+
AqlValue(AqlValueHintUInt(getCount())));
119+
120+
AqlCall upstreamCall{};
121+
upstreamCall.softLimit = limit;
122+
return {_executorState, NoStats{}, upstreamCall};
123+
}
124+
96125
void CountCollectExecutor::incrCountBy(size_t incr) noexcept { _count += incr; }
97126

98127
uint64_t CountCollectExecutor::getCount() noexcept { return _count; }

arangod/Aql/CountCollectExecutor.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
namespace arangodb {
3737
namespace aql {
3838

39+
struct AqlCall;
40+
class AqlItemBlockInputRange;
3941
class InputAqlItemRow;
4042
class NoStats;
4143
class ExecutorInfos;
@@ -91,6 +93,15 @@ class CountCollectExecutor {
9193

9294
std::pair<ExecutionState, NoStats> produceRows(OutputAqlItemRow& output);
9395

96+
/**
97+
* @brief produce the next Row of Aql Values.
98+
*
99+
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
100+
*/
101+
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t atMost,
102+
AqlItemBlockInputRange& input,
103+
OutputAqlItemRow& output);
104+
94105
void incrCountBy(size_t incr) noexcept;
95106

96107
uint64_t getCount() noexcept;;
@@ -104,6 +115,7 @@ class CountCollectExecutor {
104115
Infos const& _infos;
105116
Fetcher& _fetcher;
106117
ExecutionState _state;
118+
ExecutorState _executorState;
107119
uint64_t _count;
108120
};
109121

tests/Aql/CountCollectExecutorTest.cpp

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
/// @author Heiko Kernbach
2121
////////////////////////////////////////////////////////////////////////////////
2222

23+
#include "AqlItemBlockHelper.h"
2324
#include "RowFetcherHelper.h"
2425
#include "gtest/gtest.h"
2526

27+
#include "Aql/AqlCall.h"
2628
#include "Aql/AqlItemBlock.h"
2729
#include "Aql/CountCollectExecutor.h"
2830
#include "Aql/InputAqlItemRow.h"
@@ -60,7 +62,8 @@ class CountCollectExecutorTest : public ::testing::Test {
6062
TEST_F(CountCollectExecutorTest, there_are_no_rows_upstream_the_producer_doesnt_wait) {
6163
CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {});
6264
VPackBuilder input;
63-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false);
65+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
66+
itemBlockManager, input.steal(), false);
6467
CountCollectExecutor testee(fetcher, infos);
6568
NoStats stats{};
6669

@@ -81,7 +84,8 @@ TEST_F(CountCollectExecutorTest, there_are_no_rows_upstream_the_producer_doesnt_
8184
TEST_F(CountCollectExecutorTest, there_are_now_rows_upstream_the_producer_waits) {
8285
CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {});
8386
VPackBuilder input;
84-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true);
87+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
88+
itemBlockManager, input.steal(), true);
8589
CountCollectExecutor testee(fetcher, infos);
8690
NoStats stats{};
8791

@@ -106,7 +110,8 @@ TEST_F(CountCollectExecutorTest, there_are_now_rows_upstream_the_producer_waits)
106110
TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_doesnt_wait) {
107111
CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {});
108112
auto input = VPackParser::fromJson("[ [1], [2], [3] ]");
109-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false);
113+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
114+
itemBlockManager, input->steal(), false);
110115
CountCollectExecutor testee(fetcher, infos);
111116
NoStats stats{};
112117

@@ -127,7 +132,8 @@ TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_doe
127132
TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_waits) {
128133
CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {});
129134
auto input = VPackParser::fromJson("[ [1], [2], [3] ]");
130-
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
135+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
136+
itemBlockManager, input->steal(), true);
131137
CountCollectExecutor testee(fetcher, infos);
132138
NoStats stats{};
133139
OutputAqlItemRow result{std::move(block), outputRegisters,
@@ -157,6 +163,31 @@ TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_wai
157163
ASSERT_EQ(3, fetcher.totalSkipped());
158164
}
159165

166+
TEST_F(CountCollectExecutorTest, test_produce_datarange) {
167+
CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {});
168+
auto fakeUnusedBlock = VPackParser::fromJson("[ ]");
169+
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
170+
itemBlockManager, fakeUnusedBlock->steal(), false);
171+
CountCollectExecutor testee(fetcher, infos);
172+
173+
SharedAqlItemBlockPtr inBlock = buildBlock<1>(itemBlockManager, {{}});
174+
AqlItemBlockInputRange input{ExecutorState::DONE, inBlock, 0, inBlock->size()};
175+
176+
OutputAqlItemRow output(std::move(block), outputRegisters,
177+
infos.registersToKeep(), infos.registersToClear());
178+
EXPECT_EQ(output.numRowsWritten(), 0);
179+
auto const [state, stats, call] = testee.produceRows(1000, input, output);
180+
ASSERT_EQ(state, ExecutorState::DONE);
181+
ASSERT_TRUE(output.produced());
182+
183+
auto block = output.stealBlock();
184+
AqlValue x = block->getValue(0, 1);
185+
ASSERT_TRUE(x.isNumber());
186+
ASSERT_EQ(x.toInt64(), 0);
187+
188+
ASSERT_EQ(0, fetcher.totalSkipped());
189+
}
190+
160191
} // namespace aql
161192
} // namespace tests
162193
} // namespace arangodb

0 commit comments

Comments
 (0)
0