4141using namespace arangodb ;
4242using namespace arangodb ::aql;
4343
44- DistributeExecutorInfos::DistributeExecutorInfos (
45- std::vector<std::string> clientIds, Collection const * collection,
46- RegisterId regId, ScatterNode::ScatterType type)
44+ DistributeExecutorInfos::DistributeExecutorInfos (std::vector<std::string> clientIds,
45+ Collection const * collection, RegisterId regId,
46+ ScatterNode::ScatterType type,
47+ std::vector<aql::Collection*> satellites)
4748 : ClientsExecutorInfos(std::move(clientIds)),
4849 _regId(regId),
4950 _collection(collection),
5051 _logCol(collection->getCollection ()),
51- _type(type) {}
52+ _type(type),
53+ _satellites(std::move(satellites)) {}
5254
5355auto DistributeExecutorInfos::registerId () const noexcept -> RegisterId {
5456 TRI_ASSERT (_regId.isValid ());
@@ -77,6 +79,35 @@ auto DistributeExecutorInfos::getResponsibleClient(arangodb::velocypack::Slice v
7779
8020
return shardId;
7880}
7981
82+ auto DistributeExecutorInfos::shouldDistributeToAll (arangodb::velocypack::Slice value) const
83+ -> bool {
84+ if (_satellites.empty ()) {
85+ // We can only distribute to all on Satellite Collections
86+ return false ;
87+ }
88+ auto id = value.get (StaticStrings::IdString);
89+ if (!id.isString ()) {
90+ // We can only distribute to all if we can detect the collection name
91+ return false ;
92+ }
93+
94+ // NOTE: Copy Paste code, shall be unified
95+ VPackStringRef vid (id);
96+ size_t pos = vid.find (' /' );
97+ if (pos == std::string::npos) {
98+ // Invalid input. Let the sharding take care of it, one server shall complain
99+ return false ;
100+ }
101+ vid = vid.substr (0 , pos);
102+ for (auto const & it : _satellites) {
103+ if (vid.equals (it->name ())) {
104+ // This vertex is from a satellite collection start everywhere!
105+ return true ;
106+ }
107+ }
108+ return false ;
109+ }
110+
80111DistributeExecutor::DistributeExecutor (DistributeExecutorInfos const & infos)
81112 : _infos(infos) {}
82113
@@ -94,11 +125,25 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
C7E5
94125 choosenMap[key].emplace_back (i);
95126 }
96127 } else {
97- auto client = getClient (block, i);
98- if (!client.empty ()) {
99- // We can only have clients we are prepared for
100- TRI_ASSERT (blockMap.find (client) != blockMap.end ());
101- choosenMap[client].emplace_back (i);
128+ auto input = extractInput (block, i);
129+ if (!input.isNone ()) {
130+ // NONE is ignored.
131+ // Object is processd
132+ // All others throw.
133+ TRI_ASSERT (input.isObject ());
134+ if (_infos.shouldDistributeToAll (input)) {
135+ // This input should be added to all clients
136+ for (auto const & [key, value] : blockMap) {
137+ choosenMap[key].emplace_back (i);
138+ }
139+ } else {
140+ auto client = getClient (input);
141+ if (!client.empty ()) {
142+ // We can only have clients we are prepared for
143+ TRI_ASSERT (blockMap.find (client) != blockMap.end ());
144+ choosenMap[client].emplace_back (i);
145+ }
146+ }
102147 }
103148 }
104149 }
@@ -123,19 +168,24 @@ auto DistributeExecutor::distributeBlock(SharedAqlItemBlockPtr const& block, Ski
123168 }
124169}
125170
126- auto DistributeExecutor::getClient (SharedAqlItemBlockPtr const & block, size_t rowIndex) const
127- -> std::string {
171+ auto DistributeExecutor::extractInput (SharedAqlItemBlockPtr const & block,
172+ size_t rowIndex) const -> VPackSlice {
128173 // check first input register
129174 AqlValue val = InputAqlItemRow{block, rowIndex}.getValue (_infos.registerId ());
130175
131176 VPackSlice input = val.slice (); // will throw when wrong type
132177 if (input.isNone ()) {
133- return {} ;
178+ return input ;
134179 }
135-
180+
136181 if (!input.isObject ()) {
137- THROW_ARANGO_EXCEPTION_MESSAGE (TRI_ERROR_INTERNAL, " DistributeExecutor requires an object as input" );
182+ THROW_ARANGO_EXCEPTION_MESSAGE (
183+ TRI_ERROR_INTERNAL, " DistributeExecutor requires an object as input" );
138184 }
185+ return input;
186+ }
187+
188+ auto DistributeExecutor::getClient (VPackSlice input) const -> std::string {
139189 auto res = _infos.getResponsibleClient (input);
140190 if (res.fail ()) {
141191 THROW_ARANGO_EXCEPTION (std::move (res).result ());
@@ -147,4 +197,5 @@ ExecutionBlockImpl<DistributeExecutor>::ExecutionBlockImpl(ExecutionEngine* engi
147197 DistributeNode const * node,
148198 RegisterInfos registerInfos,
149199 DistributeExecutorInfos&& executorInfos)
150- : BlocksWithClientsImpl(engine, node, std::move(registerInfos), std::move(executorInfos)) {}
200+ : BlocksWithClientsImpl(engine, node, std::move(registerInfos),
201+ std::move(executorInfos)) {}
0 commit comments