8000 Bug fix/bts 2152 fix overfetch input on parallel traversals by mchacki · Pull Request #21808 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Bug fix/bts 2152 fix overfetch input on parallel traversals #21808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Applied clangformat
  • Loading branch information
mchacki committed Jun 23, 2025
commit 4a73443a6adee23618f4c1f3349a93b9abe14ba5
70 changes: 40 additions & 30 deletions arangod/Aql/BlocksWithClients.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,28 +200,33 @@ auto BlocksWithClientsImpl<Executor>::executeWithoutTraceForClient(
auto call = callList.peekNextCall();

auto& dataContainer = it->second;
/*
* In this block we need to handle the following behavior:
* 1) Call without a hardLimit:
* - Pull what is already on the dataContainer,
* - if dataContainer does not have more data, fetch more from upstream.
*
* 2) Call with a hardLimit = 0, without fullCount:
* - This indicates this lane is complete and does not need more data.
* - Clear what is still active in the dataContainer. (Do not drop ShadowRows)
* - If all other lanes already reported a hardLimit, and upstream still HASMORE, then send hardLimit to upstream.
*
* 3) Call with hardLimit = 0 and fullCount:
* - This indicates this lane is ready to skip and count.
* - I do not think that this is possible in practice. Therefor this will be asserted, but in prod handled like case 1.
* - As case 1 is slower then necceassy, but always correct.
*
* Also NOTE: We are only doing this in the main query, not in subqueries. In the main query we can simply
* return DONE with an empty block. For subqueries we would need to return a ShadowRow. However, the row is not
* yet available, if upstream is not fully consumed.
* We can only fix this by returning WAITING here, and waking up the caller, as soon as the ShadowRow is available.
*/
if (stack.empty() && call.hasHardLimit() && call.getLimit() == 0 && !call.fullCount) {
/*
* In this block we need to handle the following behavior:
* 1) Call without a hardLimit:
* - Pull what is already on the dataContainer,
* - if dataContainer does not have more data, fetch more from upstream.
*
* 2) Call with a hardLimit = 0, without fullCount:
* - This indicates this lane is complete and does not need more data.
* - Clear what is still active in the dataContainer. (Do not drop ShadowRows)
* - If all other lanes already reported a hardLimit, and upstream still
* HASMORE, then send hardLimit to upstream.
*
* 3) Call with hardLimit = 0 and fullCount:
* - This indicates this lane is ready to skip and count.
* - I do not think that this is possible in practice. Therefor this will be
* asserted, but in prod handled like case 1.
* - As case 1 is slower then necceassy, but always correct.
*
* Also NOTE: We are only doing this in the main query, not in subqueries. In
* the main query we can simply return DONE with an empty block. For
* subqueries we would need to return a ShadowRow. However, the row is not yet
* available, if upstream is not fully consumed. We can only fix this by
* returning WAITING here, and waking up the caller, as soon as the ShadowRow
* is available.
*/
if (stack.empty() && call.hasHardLimit() && call.getLimit() == 0 &&
!call.fullCount) {
dataContainer.setSeenHardLimit();
// Need to handle this case separately, as we do not want to fetch more
// data from upstream.
Expand All @@ -231,7 +236,7 @@ auto BlocksWithClientsImpl<Executor>::executeWithoutTraceForClient(
stack.pushCall(callList);
while (dataContainer.hasDataFor(call)) {
// Just clear out what is still on the queue.
std::ignore = dataContainer.execute(stack, _upstreamState);
std::ignore = dataContainer.execute(stack, _upstreamState);
}
stack.popCall();
}
Expand Down Expand Up @@ -291,7 +296,7 @@ auto BlocksWithClientsImpl<Executor>::hardLimitDependency(AqlCallStack stack)
// NOTE: We do not handle limits / skip here
// They can differ between different calls to this executor.
// We may need to revisit this for performance reasons.
stack.pushCall(AqlCallList{AqlCall{0,false,0, AqlCall::LimitType::HARD}});
stack.pushCall(AqlCallList{AqlCall{0, false, 0, AqlCall::LimitType::HARD}});

TRI_ASSERT(_dependencies.size() == 1);
auto [state, skipped, block] = _dependencies[0]->execute(stack);
Expand All @@ -310,7 +315,8 @@ auto BlocksWithClientsImpl<Executor>::hardLimitDependency(AqlCallStack stack)
if (state != ExecutionState::WAITING && block != nullptr) {
// We need to report everything that is not waiting
// Here this could be a ShadowRow!
TRI_ASSERT(false) << "Right now we cannot get here, as the optimization is not yet active on subqueries.";
TRI_ASSERT(false) << "Right now we cannot get here, as the optimization is "
"not yet active on subqueries.";
_executor.distributeBlock(block, skipped, _clientBlockData);
}

Expand Down Expand Up @@ -352,16 +358,20 @@ auto BlocksWithClientsImpl<Executor>::fetchMore(AqlCallStack stack)
}

template<class Executor>
auto BlocksWithClientsImpl<Executor>::allLanesComplete() const noexcept -> bool {
return std::ranges::none_of(_clientBlockData,
[](const auto& entry) { return !entry.second.gotHardLimit(); });
auto BlocksWithClientsImpl<Executor>::allLanesComplete() const noexcept
-> bool {
return std::ranges::none_of(_clientBlockData, [](const auto& entry) {
return !entry.second.gotHardLimit();
});
}

#ifdef ARANGODB_USE_GOOGLE_TESTS
template<class Executor>
auto BlocksWithClientsImpl<Executor>::remainingRowsForClient(std::string const& clientId) const -> uint64_t {
auto BlocksWithClientsImpl<Executor>::remainingRowsForClient(
std::string const& clientId) const -> uint64_t {
auto it = _clientBlockData.find(clientId);
TRI_ASSERT(it != _clientBlockData.end()) << "Test setup issue, clientId " << clientId << " is not registered";
TRI_ASSERT(it != _clientBlockData.end())
<< "Test setup issue, clientId " << clientId << " is not registered";
return it->second.remainingRows();
}
#endif
Expand Down
18 changes: 9 additions & 9 deletions arangod/Aql/BlocksWithClients.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ class BlocksWithClientsImpl : public ExecutionBlock, public BlocksWithClients {
auto executeForClient(AqlCallStack stack, std::string const& clientId)
-> std::tuple<ExecutionState, SkipResult, SharedAqlItemBlockPtr> override;


#ifdef ARANGODB_USE_GOOGLE_TESTS

/**
* @brief Get the number of rows remaining for a client
* TEST ONLY feature. This number should never be a concern for production code.
*
* @param clientId The client ID
* @return The number of rows remaining for the client
*/
/**
* @brief Get the number of rows remaining for a client
* TEST ONLY feature. This number should never be a concern for production
* code.
*
* @param clientId The client ID
* @return The number of rows remaining for the client
*/
auto remainingRowsForClient(std::string const& clientId) const -> uint64_t;
#endif

Expand Down Expand Up @@ -168,7 +168,7 @@ class BlocksWithClientsImpl : public ExecutionBlock, public BlocksWithClients {
*/
auto fetchMore(AqlCallStack stack) -> ExecutionState;

auto allLanesComplete() const noexcept-> bool;
auto allLanesComplete() const noexcept -> bool;

protected:
/// @brief getClientId: get the number <clientId> (used internally)
Expand Down
15 changes: 5 additions & 10 deletions arangod/Aql/DistributeClientBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,22 +236,17 @@ auto DistributeClientBlock::gotHardLimit() const -> bool {
/**
* @brief Reset the hard limit
*/
auto DistributeClientBlock::resetHardLimit() -> void {
_gotHardLimit = false;
}
auto DistributeClientBlock::resetHardLimit() -> void { _gotHardLimit = false; }

/**
* @brief Set hard limit has been seen.
*/
auto DistributeClientBlock::setSeenHardLimit() -> void {
_gotHardLimit = true;
}
auto DistributeClientBlock::setSeenHardLimit() -> void { _gotHardLimit = true; }

#ifdef ARANGODB_USE_GOOGLE_TESTS
auto DistributeClientBlock::remainingRows() const -> uint64_t {
return std::accumulate(_queue.begin(), _queue.end(), 0,
[](uint64_t sum, auto const& item) {
return sum + item.numRows();
});
return std::accumulate(
_queue.begin(), _queue.end(), 0,
[](uint64_t sum, auto const& item) { return sum + item.numRows(); });
}
#endif
2 changes: 1 addition & 1 deletion arangod/Aql/DistributeClientBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class DistributeClientBlock {
auto execute(AqlCallStack callStack, ExecutionState upstreamState)
-> std::tuple<ExecutionState, SkipResult, SharedAqlItemBlockPtr>;

/**
/**
* @brief Check if we have received a hard limit
* @return true if we have received a hard limit
*/
Expand Down
Loading
0