diff --git a/arangod/Aql/DistributeExecutor.cpp b/arangod/Aql/DistributeExecutor.cpp index 75f04295d8fc..c2e1681884fd 100644 --- a/arangod/Aql/DistributeExecutor.cpp +++ b/arangod/Aql/DistributeExecutor.cpp @@ -125,24 +125,37 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski choosenMap[key].emplace_back(i); } } else { - auto input = extractInput(block, i); - if (!input.isNone()) { - // NONE is ignored. - // Object is processd - // All others throw. - TRI_ASSERT(input.isObject()); - if (_infos.shouldDistributeToAll(input)) { - // This input should be added to all clients - for (auto const& [key, value] : blockMap) { - choosenMap[key].emplace_back(i); - } - } else { - auto client = getClient(input); - if (!client.empty()) { - // We can only have clients we are prepared for - TRI_ASSERT(blockMap.find(client) != blockMap.end()); - choosenMap[client].emplace_back(i); + // check first input register + AqlValue val = InputAqlItemRow{block, i}.getValue(_infos.registerId()); + + VPackSlice input = val.slice(); // will throw when wrong type + if (input.isNone()) { + continue; + } + + if (!input.isObject()) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_INTERNAL, "DistributeExecutor requires an object as input"); + } + // NONE is ignored. + // Object is processd + // All others throw. + TRI_ASSERT(input.isObject()); + if (_infos.shouldDistributeToAll(input)) { + // This input should be added to all clients + for (auto const& [key, value] : blockMap) { + choosenMap[key].emplace_back(i); + } + } else { + auto client = getClient(input); + if (!client.empty()) { + // We can only have clients we are prepared for + TRI_ASSERT(blockMap.find(client) != blockMap.end()); + if (ADB_UNLIKELY(blockMap.find(client) == blockMap.end())) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, + std::string("unexpected client id '") + client + "' found in blockMap"); } + choosenMap[client].emplace_back(i); } } } @@ -168,23 +181,6 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski } } -auto DistributeExecutor::extractInput(SharedAqlItemBlockPtr const& block, - size_t rowIndex) const -> VPackSlice { - // check first input register - AqlValue val = InputAqlItemRow{block, rowIndex}.getValue(_infos.registerId()); - - VPackSlice input = val.slice(); // will throw when wrong type - if (input.isNone()) { - return input; - } - - if (!input.isObject()) { - THROW_ARANGO_EXCEPTION_MESSAGE( - TRI_ERROR_INTERNAL, "DistributeExecutor requires an object as input"); - } - return input; -} - auto DistributeExecutor::getClient(VPackSlice input) const -> std::string { auto res = _infos.getResponsibleClient(input); if (res.fail()) { diff --git a/arangod/Aql/DistributeExecutor.h b/arangod/Aql/DistributeExecutor.h index bfbb91c20947..8abf1e6d98cf 100644 --- a/arangod/Aql/DistributeExecutor.h +++ b/arangod/Aql/DistributeExecutor.h @@ -97,17 +97,8 @@ class DistributeExecutor { std::unordered_map& blockMap) -> void; private: - /** - * @brief Compute which client needs to get this row - * @param block The input block - * @param rowIndex - * @return std::string Identifier used by the client - */ - auto extractInput(SharedAqlItemBlockPtr const& block, size_t rowIndex) const - -> velocypack::Slice; auto getClient(velocypack::Slice input) const -> std::string; - private: DistributeExecutorInfos const& _infos; // a reusable Builder object for building _key values