8000 Feature/hybrid smart graph (#14602) · arangodb/arangodb@0849e98 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0849e98

Browse files
authored
Feature/hybrid smart graph (#14602)
1 parent b12ef28 commit 0849e98

File tree

132 files changed

+5167
-2698
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

132 files changed

+5167
-2698
lines changed

arangod/Aql/AqlFunctionsInternalCache.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@
2525

2626
#include "Aql/AqlValue.h"
2727
#include "Basics/Common.h"
28-
#include <VocBase/Validators.h>
28+
#include "VocBase/Validators.h"
2929

3030
#include <unicode/regex.h>
3131
#include <memory>
3232

33-
34-
3533
namespace arangodb {
3634

3735
namespace transaction {
@@ -49,6 +47,8 @@ class AqlFunctionsInternalCache final {
4947
AqlFunctionsInternalCache() = default;
5048
~AqlFunctionsInternalCache();
5149

50+
AqlFunctionsInternalCache(AqlFunctionsInternalCache&&) = default;
51+
5252
void clear() noexcept;
5353

5454
icu::RegexMatcher* buildRegexMatcher(char const* ptr, size_t length, bool caseInsensitive);
@@ -97,4 +97,3 @@ class AqlFunctionsInternalCache final {
9797

9898
} // namespace aql
9999
} // namespace arangodb
100-

arangod/Aql/ClusterNodes.cpp

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,31 @@ DistributeNode::DistributeNode(ExecutionPlan* plan, ExecutionNodeId id,
263263
DistributeNode::DistributeNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base)
264264
: ScatterNode(plan, base),
265265
CollectionAccessingNode(plan, base),
266-
_variable(Variable::varFromVPack(plan->getAst(), base, "variable")) {}
267-
266+
_variable(Variable::varFromVPack(plan->getAst(), base, "variable")) {
267+
auto sats = base.get("satelliteCollections");
268+
if (sats.isArray()) {
269+
auto& queryCols = plan->getAst()->query().collections();
270+
_satellites.reserve(sats.length());
271+
272+
for (VPackSlice it : VPackArrayIterator(sats)) {
273+
std::string v = arangodb::basics::VelocyPackHelper::getStringValue(it, "");
274+
auto c = queryCols.add(v, AccessMode::Type::READ, aql::Collection::Hint::Collection);
275+
addSatellite(c);
276+
}
277+
}
278+
}
279+
268280
/// @brief clone ExecutionNode recursively
269281
ExecutionNode* DistributeNode::clone(ExecutionPlan* plan, bool withDependencies,
270282
bool withProperties) const {
271283
auto c = std::make_unique<DistributeNode>(plan, _id, getScatterType(),
272284
collection(), _variable, _targetNodeId);
273285
c->copyClients(clients());
274286
CollectionAccessingNode::cloneInto(*c);
287+
c->_satellites.reserve(_satellites.size());
288+
for (auto& it : _satellites) {
289+
c->_satellites.emplace_back(it);
290+
}
275291

276292
return cloneHelper(std::move(c), withDependencies, withProperties);
277293
}
@@ -294,8 +310,8 @@ std::unique_ptr<ExecutionBlock> DistributeNode::createBlock(
294310

295311
auto inRegs = RegIdSet{regId};
296312
auto registerInfos = createRegisterInfos(inRegs, {});
297-
auto infos = DistributeExecutorInfos(clients(), collection(),
298-
regId, getScatterType());
313+
auto infos = DistributeExecutorInfos(clients(), collection(), regId,
314+
getScatterType(), getSatellites());
299315

300316
return std::make_unique<ExecutionBlockImpl<DistributeExecutor>>(&engine, this,
301317
std::move(registerInfos),
@@ -311,6 +327,16 @@ void DistributeNode::doToVelocyPack(VPackBuilder& builder, unsigned flags) const
311327

312328
builder.add(VPackValue("variable"));
313329
_variable->toVelocyPack(builder);
330+
331+
if (!_satellites.empty()) {
332+
builder.add(VPackValue("satelliteCollections"));
333+
{
334+
VPackArrayBuilder guard(&builder);
335+
for (auto const& v : _satellites) {
336+
builder.add(VPackValue(v->name()));
337+
}
338+
}
339+
}
314340
}
315341

316342
void DistributeNode::replaceVariables(std::unordered_map<VariableId, Variable const*> const& replacements) {
@@ -329,6 +355,12 @@ CostEstimate DistributeNode::estimateCost() const {
329355
return estimate;
330356
}
331357

358+
void DistributeNode::addSatellite(aql::Collection* satellite) {
359+
// Only relevant for enterprise disjoint smart graphs
360+
TRI_ASSERT(satellite->isSatellite());
361+
_satellites.emplace_back(satellite);
362+
}
363+
332364
/*static*/ Collection const* GatherNode::findCollection(GatherNode const& root) noexcept {
333365
ExecutionNode const* node = root.getFirstDependency();
334366

arangod/Aql/ClusterNodes.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,21 +222,31 @@ class DistributeNode final : public ScatterNode, public CollectionAccessingNode
222222
CostEstimate estimateCost() const override final;
223223

224224
Variable const* getVariable() const noexcept { return _variable; }
225-
225+
226226
void setVariable(Variable const* var) noexcept { _variable = var; }
227-
227+
228228
ExecutionNodeId getTargetNodeId() const noexcept { return _targetNodeId; }
229229

230+
void addSatellite(aql::Collection*);
231+
232+
std::vector<aql::Collection*> const getSatellites() const noexcept {
233+
return _satellites;
234+
}
235+
230236
protected:
231237
/// @brief export to VelocyPack
232238
void doToVelocyPack(arangodb::velocypack::Builder&, unsigned flags) const override final;
233239

234240
private:
235241
/// @brief the variable we must inspect to know where to distribute
236242
Variable const* _variable;
237-
243+
238244
/// @brief the id of the target ExecutionNode this DistributeNode belongs to.
239245
ExecutionNodeId _targetNodeId;
246+
247+
/// @brief List of Satellite collections this node needs to distribute data to
248+
/// in a satellite manner.
249+
std::vector<aql::Collection*> _satellites;
240250
};
241251

242252
/// @brief class GatherNode

arangod/Aql/Collection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ std::vector<std::string> Collection::shardKeys(bool normalize) const {
186186
if (normalize && coll->isSmart() && coll->type() == TRI_COL_TYPE_DOCUMENT) {
187187
// smart vertex collection always has ["_key:"] as shard keys
188188
TRI_ASSERT(originalKeys.size() == 1);
189-
TRI_ASSERT(originalKeys[0] == "_key:");
189+
TRI_ASSERT(originalKeys[0] == StaticStrings::PrefixOfKeyString);
190190
// now normalize it this to _key
191191
return std::vector<std::string>{StaticStrings::KeyString};
192192
}

arangod/Aql/DistributeExecutor.cpp

Lines changed: 66 additions & 15 deletions
C7E5
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@
4141
using namespace arangodb;
4242
using namespace arangodb::aql;
4343

44-
DistributeExecutorInfos::DistributeExecutorInfos(
45-
std::vector<std::string> clientIds, Collection const* collection,
46-
RegisterId regId, ScatterNode::ScatterType type)
44+
DistributeExecutorInfos::DistributeExecutorInfos(std::vector<std::string> clientIds,
45+
Collection const* collection, RegisterId regId,
46+
ScatterNode::ScatterType type,
47+
std::vector<aql::Collection*> satellites)
4748
: ClientsExecutorInfos(std::move(clientIds)),
4849
_regId(regId),
4950
_collection(collection),
5051
_logCol(collection->getCollection()),
51-
_type(type) {}
52+
_type(type),
53+
_satellites(std::move(satellites)) {}
5254

5355
auto DistributeExecutorInfos::registerId() const noexcept -> RegisterId {
5456
TRI_ASSERT(_regId.isValid());
@@ -77,6 +79,35 @@ auto DistributeExecutorInfos::getResponsibleClient(arangodb::velocypack::Slice v
7779
8020 return shardId;
7880
}
7981

82+
auto DistributeExecutorInfos::shouldDistributeToAll(arangodb::velocypack::Slice value) const
83+
-> bool {
84+
if (_satellites.empty()) {
85+
// We can only distribute to all on Satellite Collections
86+
return false;
87+
}
88+
auto id = value.get(StaticStrings::IdString);
89+
if (!id.isString()) {
90+
// We can only distribute to all if we can detect the collection name
91+
return false;
92+
}
93+
94+
// NOTE: Copy Paste code, shall be unified
95+
VPackStringRef vid(id);
96+
size_t pos = vid.find('/');
97+
if (pos == std::string::npos) {
98+
// Invalid input. Let the sharding take care of it, one server shall complain
99+
return false;
100+
}
101+
vid = vid.substr(0, pos);
102+
for (auto const& it : _satellites) {
103+
if (vid.equals(it->name())) {
104+
// This vertex is from a satellite collection start everywhere!
105+
return true;
106+
}
107+
}
108+
return false;
109+
}
110+
80111
DistributeExecutor::DistributeExecutor(DistributeExecutorInfos const& infos)
81112
: _infos(infos) {}
82113

@@ -94,11 +125,25 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
94125
choosenMap[key].emplace_back(i);
95126
}
96127
} else {
97-
auto client = getClient(block, i);
98-
if (!client.empty()) {
99-
// We can only have clients we are prepared for
100-
TRI_ASSERT(blockMap.find(client) != blockMap.end());
101-
choosenMap[client].emplace_back(i);
128+
auto input = extractInput(block, i);
129+
if (!input.isNone()) {
130+
// NONE is ignored.
131+
// Object is processd
132+
// All others throw.
133+
TRI_ASSERT(input.isObject());
134+
if (_infos.shouldDistributeToAll(input)) {
135+
// This input should be added to all clients
136+
for (auto const& [key, value] : blockMap) {
137+
choosenMap[key].emplace_back(i);
138+
}
139+
} else {
140+
auto client = getClient(input);
141+
if (!client.empty()) {
142+
// We can only have clients we are prepared for
143+
TRI_ASSERT(blockMap.find(client) != blockMap.end());
144+
choosenMap[client].emplace_back(i);
145+
}
146+
}
102147
}
103148
}
104149
}
@@ -123,19 +168,24 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
123168
}
124169
}
125170

126-
auto DistributeExecutor::getClient(SharedAqlItemBlockPtr const& block, size_t rowIndex) const
127-
-> std::string {
171+
auto DistributeExecutor::extractInput(SharedAqlItemBlockPtr const& block,
172+
size_t rowIndex) const -> VPackSlice {
128173
// check first input register
129174
AqlValue val = InputAqlItemRow{block, rowIndex}.getValue(_infos.registerId());
130175

131176
VPackSlice input = val.slice(); // will throw when wrong type
132177
if (input.isNone()) {
133-
return {};
178+
return input;
134179
}
135-
180+
136181
if (!input.isObject()) {
137-
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "DistributeExecutor requires an object as input");
182+
THROW_ARANGO_EXCEPTION_MESSAGE(
183+
TRI_ERROR_INTERNAL, "DistributeExecutor requires an object as input");
138184
}
185+
return input;
186+
}
187+
188+
auto DistributeExecutor::getClient(VPackSlice input) const -> std::string {
139189
auto res = _infos.getResponsibleClient(input);
140190
if (res.fail()) {
141191
THROW_ARANGO_EXCEPTION(std::move(res).result());
@@ -147,4 +197,5 @@ ExecutionBlockImpl<DistributeExecutor>::ExecutionBlockImpl(ExecutionEngine* engi
147197
DistributeNode const* node,
148198
RegisterInfos registerInfos,
149199
DistributeExecutorInfos&& executorInfos)
150-
: BlocksWithClientsImpl(engine, node, std::move(registerInfos), std::move(executorInfos)) {}
200+
: BlocksWithClientsImpl(engine, node, std::move(registerInfos),
201+
std::move(executorInfos)) {}

arangod/Aql/DistributeExecutor.h

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,17 @@ class DistributeNode;
4343
class DistributeExecutorInfos : public ClientsExecutorInfos {
4444
public:
4545
DistributeExecutorInfos(std::vector<std::string> clientIds, Collection const* collection,
46-
RegisterId regId, ScatterNode::ScatterType type);
46+
RegisterId regId, ScatterNode::ScatterType type,
47+
std::vector<aql::Collection*> satellites);
4748

4849
auto registerId() const noexcept -> RegisterId;
4950
auto scatterType() const noexcept -> ScatterNode::ScatterType;
5051

5152
auto getResponsibleClient(arangodb::velocypack::Slice value) const
5253
-> ResultT<std::string>;
5354

55+
auto shouldDistributeToAll(arangodb::velocypack::Slice value) const -> bool;
56+
5457
private:
5558
RegisterId _regId;
5659

@@ -63,6 +66,9 @@ class DistributeExecutorInfos : public ClientsExecutorInfos {
6366

6467
/// @brief type of distribution that this nodes follows.
6568
ScatterNode::ScatterType _type;
69+
70+
/// @brief list of collections that should be used
71+
std::vector<aql::Collection*> _satellites;
6672
};
6773

6874
// The DistributeBlock is actually implemented by specializing
@@ -93,16 +99,13 @@ class DistributeExecutor {
9399
private:
94100
/**
95101
* @brief Compute which client needs to get this row
96-
* NOTE: Has SideEffects
97-
* If the input value does not contain an object, it is modified inplace with
98-
* a new Object containing a key value!
99-
* Hence this method is not const ;(
100-
*
101102
* @param block The input block
102103
* @param rowIndex
103104
* @return std::string Identifier used by the client
104105
*/
105-
auto getClient(SharedAqlItemBlockPtr const& block, size_t rowIndex) const -> std::string;
106+
auto extractInput(SharedAqlItemBlockPtr const& block, size_t rowIndex) const
107+
-> velocypack::Slice;
108+
auto getClient(velocypack::Slice input) const -> std::string;
106109

107110
private:
108111
DistributeExecutorInfos const& _infos;

0 commit comments

Comments
 (0)
0