8000 Feature/distribute simplification (#13509) · tylde/arangodb@846ce5f · GitHub
[go: up one dir, main page]

Skip to content

Commit 846ce5f

Browse files
mpoeterjsteemann
andauthored
Feature/distribute simplification (arangodb#13509)
Co-authored-by: jsteemann <jan@arangodb.com> Co-authored-by: Jan <jsteemann@users.noreply.github.com>
1 parent c12f8af commit 846ce5f

20 files changed

+613
-388
lines changed

CHANGELOG

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
devel
22
-----
33

4+
* Simplify the DistributeExecutor and avoid implicit modification of its input
5+
variable. Previously the DistributeExecutor could update the input variable
6+
in-place, leading to unexpected results (see #13509).
7+
The modification logic has now been moved into three new _internal_ AQL
8+
functions (MAKE_DISTRIBUTE_INPUT, MAKE_DISTRIBUTE_INPUT_WITH_KEY_CREATION,
9+
and MAKE_DISTRIBUTE_GRAPH_INPUT) and an additional calculation node with an
10+
according function call will be introduced if we need to prepare the input
11+
data for the distribute node.
12+
413
* Added new REST APIs for retrieving the sharding distribution:
514

615
- GET `/_api/database/shardDistribution` will return the number of
@@ -21,7 +30,7 @@ devel
2130
This API can only be used in the `_system` database of coordinators, and
2231
requires admin user privileges.
2332

24-
* Decrease the size of serialized index esimtates, by introducing a
33+
* Decrease the size of serialized index estimates, by introducing a
2534
compressed serialization format. The compressed format uses the previous
2635
uncompressed format internally, compresses it, and stores the compressed
2736
data instead. This makes serialized index estimates a lot smaller, which

arangod/Aql/AqlFunctionFeature.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ void AqlFunctionFeature::addNumericFunctions() {
235235
add({"RADIANS", ".", flags, &Functions::Radians});
236236
add({"DEGREES", ".", flags, &Functions::Degrees});
237237
add({"PI", "", flags, &Functions::Pi});
238-
238+
239239
add({"BIT_AND", ".|.", flags, &Functions::BitAnd});
240240
add({"BIT_OR", ".|.", flags, &Functions::BitOr});
241241
add({"BIT_XOR", ".|.", flags, &Functions::BitXOr});
@@ -474,6 +474,17 @@ void AqlFunctionFeature::addMiscFunctions() {
474474
add({"WITHIN", ".h,.,.,.|.", Function::makeFlags(FF::Cacheable), &Functions::NotImplemented});
475475
add({"WITHIN_RECTANGLE", "h.,.,.,.,.", Function::makeFlags(FF::Cacheable), &Functions::NotImplemented});
476476
add({"FULLTEXT", ".h,.,.|.", Function::makeFlags(FF::Cacheable), &Functions::NotImplemented});
477+
478+
add({"MAKE_DISTRIBUTE_INPUT", ".,.",
479+
Function::makeFlags(FF::Deterministic, FF::Cacheable, FF::Internal,
480+
FF::CanRunOnDBServerCluster, FF::CanRunOnDBServerOneShard),
481+
&Functions::MakeDistributeInput});
482+
add({"MAKE_DISTRIBUTE_INPUT_WITH_KEY_CREATION", ".,.,.",
483+
Function::makeFlags(FF::Internal), &Functions::MakeDistributeInputWithKeyCreation});
484+
add({"MAKE_DISTRIBUTE_GRAPH_INPUT", ".",
485+
Function::makeFlags(FF::Deterministic, FF::Cacheable, FF::Internal,
486+
FF::CanRunOnDBServerCluster, FF::CanRunOnDBServerOneShard),
487+
&Functions::MakeDistributeGraphInput});
477488

478489
// this is an internal function that is only here for testing. it cannot
479490
// be invoked by end users, because refering to internal functions from user

arangod/Aql/ClusterNodes.cpp

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -260,30 +260,30 @@ CostEstimate ScatterNode::estimateCost() const {
260260
estimate.estimatedCost += estimate.estimatedNrItems * _clients.size();
261261
return estimate;
262262
}
263+
264+
DistributeNode::DistributeNode(ExecutionPlan* plan, ExecutionNodeId id,
265+
ScatterNode::ScatterType type, Collection const* collection,
266+
Variable const* variable, ExecutionNodeId targetNodeId)
267+
: ScatterNode(plan, id, type),
268+
CollectionAccessingNode(collection),
269+
_variable(variable),
270+
_targetNodeId(targetNodeId) {}
263271

264272
/// @brief construct a distribute node
265273
DistributeNode::DistributeNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base)
266274
: ScatterNode(plan, base),
267275
CollectionAccessingNode(plan, base),
268-
_variable(nullptr),
269-
_alternativeVariable(nullptr),
270-
_createKeys(base.get("createKeys").getBoolean()),
271-
_allowKeyConversionToObject(base.get("allowKeyConversionToObject").getBoolean()),
272-
_allowSpecifiedKeys(false),
273-
_fixupGraphInput(false) {
274-
if (base.hasKey("variable") && base.hasKey("alternativeVariable")) {
275-
_variable = Variable::varFromVPack(plan->getAst(), base, "variable");
276-
_alternativeVariable =
277-
Variable::varFromVPack(plan->getAst(), base, "alternativeVariable");
278-
} else {
279-
_variable = plan->getAst()->variables()->getVariable(
280-
base.get("varId").getNumericValue<VariableId>());
281-
_alternativeVariable = plan->getAst()->variables()->getVariable(
282-
base.get("alternativeVarId").getNumericValue<VariableId>());
283-
}
284-
_fixupGraphInput = VelocyPackHelper::getBooleanValue(base, "fixupGraphInput", false);
285-
// if we fixupGraphInput, we are disallowed to create keys: _fixupGraphInput -> !_createKeys
286-
TRI_ASSERT(!_fixupGraphInput || !_createKeys);
276+
_variable(Variable::varFromVPack(plan->getAst(), base, "variable")) {}
277+
278+
/// @brief clone ExecutionNode recursively
279+
ExecutionNode* DistributeNode::clone(ExecutionPlan* plan, bool withDependencies,
280+
bool withProperties) const {
281+
auto c = std::make_unique<DistributeNode>(plan, _id, getScatterType(),
282+
collection(), _variable, _targetNodeId);
283+
c->copyClients(clients());
284+
CollectionAccessingNode::cloneInto(*c);
285+
286+
return cloneHelper(std::move(c), withDependencies, withProperties);
287287
}
288288

289289
/// @brief creates corresponding ExecutionBlock
@@ -292,40 +292,20 @@ std::unique_ptr<ExecutionBlock> DistributeNode::createBlock(
292292
ExecutionNode const* previousNode = getFirstDependency();
293293
TRI_ASSERT(previousNode != nullptr);
294294

295-
RegisterId regId;
296-
RegisterId alternativeRegId = RegisterPlan::MaxRegisterId;
297-
298-
{ // set regId and alternativeRegId:
299-
300-
// get the variable to inspect . . .
301-
VariableId varId = _variable->id;
295+
// get the variable to inspect . . .
296+
VariableId varId = _variable->id;
302297

303-
// get the register id of the variable to inspect . . .
304-
auto it = getRegisterPlan()->varInfo.find(varId);
305-
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
306-
regId = (*it).second.registerId;
298+
// get the register id of the variable to inspect . . .
299+
auto it = getRegisterPlan()->varInfo.find(varId);
300+
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
301+
RegisterId regId = (*it).second.registerId;
307302

308-
TRI_ASSERT(regId < RegisterPlan::MaxRegisterId);
303+
TRI_ASSERT(regId < RegisterPlan::MaxRegisterId);
309304

310-
if (_alternativeVariable != _variable) {
311-
// use second variable
312-
auto it = getRegisterPlan()->varInfo.find(_alternativeVariable->id);
313-
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
314-
alternativeRegId = (*it).second.registerId;
315-
316-
TRI_ASSERT(alternativeRegId < RegisterPlan::MaxRegisterId);
317-
} else {
318-
TRI_ASSERT(alternativeRegId == RegisterPlan::MaxRegisterId);
319-
}
320-
}
321-
auto inAndOutRegs = RegIdSet{regId};
322-
if (alternativeRegId != RegisterPlan::MaxRegisterId) {
323-
inAndOutRegs.emplace(alternativeRegId);
324-
}
325-
auto registerInfos = createRegisterInfos(inAndOutRegs, inAndOutRegs);
326-
auto infos = DistributeExecutorInfos(clients(), collection(), regId, alternativeRegId,
327-
_allowSpecifiedKeys, _allowKeyConversionToObject,
328-
_createKeys, _fixupGraphInput, getScatterType());
305+
auto inRegs = RegIdSet{regId};
306+
auto registerInfos = createRegisterInfos(inRegs, {});
307+
auto infos = DistributeExecutorInfos(clients(), collection(),
308+
regId, getScatterType());
329309

10000
330310
return std::make_unique<ExecutionBlockImpl<DistributeExecutor>>(&engine, this,
331311
std::move(registerInfos),
@@ -344,13 +324,8 @@ void DistributeNode::toVelocyPackHelper(VPackBuilder& builder, unsigned flags,
344324
// serialize clients
345325
writeClientsToVelocyPack(builder);
346326

347-
builder.add("createKeys", VPackValue(_createKeys));
348-
builder.add("allowKeyConversionToObject", VPackValue(_allowKeyConversionToObject));
349-
builder.add("fixupGraphInput", VPackValue(_fixupGraphInput));
350327
builder.add(VPackValue("variable"));
351-
_variable->toVelocyPack(builder);
352-
builder.add(VPackValue("alternativeVariable"));
353-
_alternativeVariable->toVelocyPack(builder);
328+
_variable->toVelocyPack(builder);;
354329

355330
// And close it:
356331
builder.close();
@@ -359,7 +334,6 @@ void DistributeNode::toVelocyPackHelper(VPackBuilder& builder, unsigned flags,
359334
/// @brief getVariablesUsedHere, modifying the set in-place
360335
void DistributeNode::getVariablesUsedHere(VarSet& vars) const {
361336
vars.emplace(_variable);
362-
vars.emplace(_alternativeVariable);
363337
}
364338

365339
/// @brief estimateCost

arangod/Aql/ClusterNodes.h

Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -200,19 +200,7 @@ class DistributeNode final : public ScatterNode, public CollectionAccessingNode
200200
public:
201201
DistributeNode(ExecutionPlan* plan, ExecutionNodeId id,
202202
ScatterNode::ScatterType type, Collection const* collection,
203-
Variable const* variable, Variable const* alternativeVariable,
204-
bool createKeys, bool allowKeyConversionToObject, bool fixupGraphInput)
205-
: ScatterNode(plan, id, type),
206-
CollectionAccessingNode(collection),
207-
_variable(variable),
208-
_alternativeVariable(alternativeVariable),
209-
_createKeys(createKeys),
210-
_allowKeyConversionToObject(allowKeyConversionToObject),
211-
_allowSpecifiedKeys(false),
212-
_fixupGraphInput(fixupGraphInput) {
213-
// if we fixupGraphInput, we are disallowed to create keys: _fixupGraphInput -> !_createKeys
214-
TRI_ASSERT(!_fixupGraphInput || !_createKeys);
215-
}
203+
Variable const* variable, ExecutionNodeId targetNodeId);
216204

217205
DistributeNode(ExecutionPlan*, arangodb::velocypack::Slice const& base);
218206

@@ -230,60 +218,26 @@ class DistributeNode final : public ScatterNode, public CollectionAccessingNode
230218

231219
/// @brief clone ExecutionNode recursively
232220
ExecutionNode* clone(ExecutionPlan* plan, bool withDependencies,
233-
bool withProperties) const override final {
234-
auto c = std::make_unique<DistributeNode>(plan, _id, getScatterType(),
235-
collection(), _variable,
236-
_alternativeVariable, _createKeys,
237-
_allowKeyConversionToObject,
238-
_fixupGraphInput);
239-
c->copyClients(clients());
240-
CollectionAccessingNode::cloneInto(*c);
241-
242-
return cloneHelper(std::move(c), withDependencies, withProperties);
243-
}
221+
bool withProperties) const override final;
244222

245223
/// @brief getVariablesUsedHere, modifying the set in-place
246224
void getVariablesUsedHere(VarSet& vars) const override final;
247225

248226
/// @brief estimateCost
249227
CostEstimate estimateCost() const override final;
250228

251-
void variable(Variable const* variable) { _variable = variable; }
252-
253-
void alternativeVariable(Variable const* variable) {
254-
_alternativeVariable = variable;
255-
}
256-
257-
/// @brief set createKeys
258-
void setCreateKeys(bool b) { _createKeys = b; }
259-
260-
/// @brief set allowKeyConversionToObject
261-
void setAllowKeyConversionToObject(bool b) {
262-
_allowKeyConversionToObject = b;
263-
}
264-
265-
/// @brief set _allowSpecifiedKeys
266-
void setAllowSpecifiedKeys(bool b) { _allowSpecifiedKeys = b; }
229+
Variable const* getVariable() const noexcept { return _variable; }
230+
231+
void setVariable(Variable const* var) noexcept { _variable = var; }
232+
233+
ExecutionNodeId getTargetNodeId() const noexcept { return _targetNodeId; }
267234

268235
private:
269236
/// @brief the variable we must inspect to know where to distribute
270237
Variable const* _variable;
271-
272-
/// @brief an optional second variable we must inspect to know where to
273-
/// distribute
274-
Variable const* _alternativeVariable;
275-
276-
/// @brief the node is responsible for creating document keys
277-
bool _createKeys;
278-
279-
/// @brief allow conversion of key to object
280-
bool _allowKeyConversionToObject;
281-
282-
/// @brief allow specified keys in input even in the non-default sharding case
283-
bool _allowSpecifiedKeys;
284-
285-
/// @brief required to fixup graph input
286-
bool _fixupGraphInput;
238+
239+
/// @brief the id of the target ExecutionNode this DistributeNode belongs to.
240+
ExecutionNodeId _targetNodeId;
287241
};
288242

289243
/// @brief class GatherNode
@@ -435,7 +389,7 @@ class SingleRemoteOperationNode final : public ExecutionNode, public CollectionA
435389
void getVariablesUsedHere(VarSet& vars) const override final;
436390

437391
/// @brief getVariablesSetHere
438-
virtual std::vector<Variable const*> getVariablesSetHere() const override final;
392+
std::vector<Variable const*> getVariablesSetHere() const override final;
439393

440394
/// @brief estimateCost
441395
CostEstimate estimateCost() const override final;

0 commit comments

Comments
 (0)
0