@@ -125,24 +125,37 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
125
125
choosenMap[key].emplace_back (i);
126
126
}
127
127
} else {
128
- auto input = extractInput (block, i);
129
- if (!input.isNone ()) {
130
- // NONE is ignored.
131
- // Object is processd
132
- // All others throw.
133
- TRI_ASSERT (input.isObject ());
134
- if (_infos.shouldDistributeToAll (input)) {
135
- // This input should be added to all clients
136
- for (auto const & [key, value] : blockMap) {
137
- choosenMap[key].emplace_back (i);
138
- }
139
- } else {
140
- auto client = getClient (input);
141
- if (!client.empty ()) {
142
- // We can only have clients we are prepared for
143
- TRI_ASSERT (blockMap.find (client) != blockMap.end ());
144
- choosenMap[client].emplace_back (i);
128
+ // check first input register
129
+ AqlValue val = InputAqlItemRow{block, i}.getValue (_infos.registerId ());
130
+
131
+ VPackSlice input = val.slice (); // will throw when wrong type
132
+ if (input.isNone ()) {
133
+ continue ;
134
+ }
135
+
136
+ if (!input.isObject ()) {
137
+ THROW_ARANGO_EXCEPTION_MESSAGE (
138
+ TRI_ERROR_INTERNAL, " DistributeExecutor requires an object as input" );
139
+ }
140
+ // NONE is ignored.
141
+ // Object is processd
142
+ // All others throw.
143
+ TRI_ASSERT (input.isObject ());
144
+ if (_infos.shouldDistributeToAll (input)) {
145
+ // This input should be added to all clients
146
+ for (auto const & [key, value] : blockMap) {
147
+ choosenMap[key].emplace_back (i);
148
+ }
149
+ } else {
150
+ auto client = getClient (input);
151
+ if (!client.empty ()) {
152
+ // We can only have clients we are prepared for
153
+ TRI_ASSERT (blockMap.find (client) != blockMap.end ());
154
+ if (ADB_UNLIKELY (blockMap.find (client) == blockMap.end ())) {
155
+ THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_INTERNAL,
156
+ std::string (" unexpected client id '" ) + client + " ' found in blockMap" );
145
157
}
158
+ choosenMap[client].emplace_back (i);
146
159
}
147
160
}
148
161
}
@@ -168,23 +181,6 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
168
181
}
169
182
}
170
183
171
- auto DistributeExecutor::extractInput (SharedAqlItemBlockPtr const & block,
172
- size_t rowIndex) const -> VPackSlice {
173
- // check first input register
174
- AqlValue val = InputAqlItemRow{block, rowIndex}.getValue (_infos.registerId ());
175
-
176
- VPackSlice input = val.slice (); // will throw when wrong type
177
- if (input.isNone ()) {
178
- return input;
179
- }
180
-
181
- if (!input.isObject ()) {
182
- THROW_ARANGO_EXCEPTION_MESSAGE (
183
- TRI_ERROR_INTERNAL, " DistributeExecutor requires an object as input" );
184
- }
185
- return input;
186
- }
187
-
188
184
auto DistributeExecutor::getClient (VPackSlice input) const -> std::string {
189
185
auto res = _infos.getResponsibleClient (input);
190
186
if (res.fail ()) {
0 commit comments