8000 Do not instantiate snipped if not collection is found on the server. … · arangodb/arangodb@98efcbc · GitHub
[go: up one dir, main page]

Skip to content

Commit 98efcbc

Browse files
author
Lars Maier
authored
Do not instantiate snipped if not collection is found on the server. (#11281)
1 parent 38eaa62 commit 98efcbc

File tree

1 file changed

+78
-64
lines changed

1 file changed

+78
-64
lines changed

arangod/Aql/QuerySnippet.cpp

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void QuerySnippet::addNode(ExecutionNode* node) {
6565
case ExecutionNode::K_SHORTEST_PATHS: {
6666
auto* graphNode = ExecutionNode::castTo<GraphNode*>(node);
6767
auto const isSatellite = graphNode->isUsedAsSatellite();
68-
_expansions.emplace_back(node, false, isSatellite);
68+
_expansions.emplace_back(node, !isSatellite, isSatellite);
6969
break;
7070
}
7171
case ExecutionNode::ENUMERATE_IRESEARCH_VIEW: {
@@ -77,7 +77,7 @@ void QuerySnippet::addNode(ExecutionNode* node) {
7777
_expansions.emplace_back(node, false, false);
7878
break;
7979
}
80-
case ExecutionNode::MATERIALIZE: {
80+
case ExecutionNode::MATERIALIZE: {
8181
auto collectionAccessingNode = dynamic_cast<CollectionAccessingNode*>(node);
8282
// Materialize index node - true
8383
// Materialize view node - false
@@ -334,20 +334,21 @@ ResultT<std::unordered_map<ExecutionNode*, std::set<ShardID>>> QuerySnippet::pre
334334
}
335335
}
336336
}
337-
continue;
338-
} else if ( exp.node->getType() == ExecutionNode::TRAVERSAL
339-
|| exp.node->getType() == ExecutionNode::SHORTEST_PATH
340-
|| exp.node->getType() == ExecutionNode::K_SHORTEST_PATHS) {
341-
337+
} else if (exp.node->getType() == ExecutionNode::TRAVERSAL ||
338+
exp.node->getType() == ExecutionNode::SHORTEST_PATH ||
339+
exp.node->getType() == ExecutionNode::K_SHORTEST_PATHS) {
342340
// the same translation is copied to all servers
343341
// there are no local expansions
344342

345343
auto* graphNode = ExecutionNode::castTo<GraphNode*>(exp.node);
346-
graphNode->setCollectionToShard({}); //clear previous information
344+
graphNode->setCollectionToShard({}); // clear previous information
347345

348346
TRI_ASSERT(graphNode->isUsedAsSatellite() == exp.isSatellite);
349347

350348
if (!exp.isSatellite) {
349+
// This is either one shard or a single satellite graph which is not used
350+
// as satellite graph.
351+
uint64_t numShards = 0;
351352
for (auto* aqlCollection : graphNode->collections()) {
352353
auto const& shards = aqlCollection->shardIds();
353354
TRI_ASSERT(!shards->empty());
@@ -358,89 +359,102 @@ ResultT<std::unordered_map<ExecutionNode*, std::set<ShardID>>> QuerySnippet::pre
358359
// to be used in toVelocyPack methods of classes derived
359360
// from GraphNode
360361
graphNode->addCollectionToShard(aqlCollection->name(), shard);
362+
363+
numShards++;
361364
}
362365
}
363366
}
367+
368+
if (numShards == 0) {
369+
return {TRI_ERROR_CLUSTER_NOT_LEADER};
370+
}
371+
372+
bool foundEnoughShards = numShards == graphNode->collections().size();
373+
TRI_ASSERT(foundEnoughShards);
374+
if (!foundEnoughShards) {
375+
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL_AQL);
376+
}
364377
} else {
365378
TRI_ASSERT(graphNode->isUsedAsSatellite());
366379
#ifndef USE_ENTERPRISE
367380
TRI_ASSERT(false);
368381
#endif
382+
369383
for (auto* aqlCollection : graphNode->collections()) {
370384
auto const& shards = shardLocking.shardsForSnippet(id(), aqlCollection);
385+
TRI_ASSERT(shards.size() == 1);
371386
for (auto const& shard : shards) {
372387
// If we find a shard here that is not in this mapping,
373388
// we have 1) a problem with locking before that should have thrown
374389
// 2) a problem with shardMapping lookup that should have thrown before
375390
TRI_ASSERT(shardMapping.find(shard) != shardMapping.end());
376-
// Could we replace the outer if/else on isSatellite with this branch
377-
// here, and remove the upper part?
391+
// Could we replace the outer if/else on isSatellite with this
392+
// branch here, and remove the upper part?
378393
// if (check->second == server || exp.isSatellite) {...}
379394

380395
graphNode->addCollectionToShard(aqlCollection->name(), shard);
381-
382396
// This case currently does not exist and is not handled here.
383397
TRI_ASSERT(!exp.doExpand);
384398
}
385399
}
386400
}
387401

388-
continue; // skip rest - there are no local expansions
389-
}
390-
// exp.node is now either an enumerate collection, index, or modification.
391-
392-
// It is of utmost importance that this is an ordered set of Shards.
393-
// We can only join identical indexes of shards for each collection
394-
// locally.
395-
std::set<ShardID> myExp;
396-
397-
auto modNode = dynamic_cast<CollectionAccessingNode const*>(exp.node);
398-
// Only accessing nodes can end up here.
399-
TRI_ASSERT(modNode != nullptr);
400-
auto col = modNode->collection();
401-
// Should be hit earlier, a modification node here is required to have a collection
402-
TRI_ASSERT(col != nullptr);
403-
auto const& shards = shardLocking.shardsForSnippet(id(), col);
404-
for (auto const& s : shards) {
405-
auto check = shardMapping.find(s);
406-
// If we find a shard here that is not in this mapping,
407-
// we have 1) a problem with locking before that should have thrown
408-
// 2) a problem with shardMapping lookup that should have thrown before
409-
TRI_ASSERT(check != shardMapping.end());
410-
if (check->second == server || exp.isSatellite) {
411-
// add all shards on satellites.
412-
// and all shards where this server is the leader
413-
myExp.emplace(s);
402+
} else {
403+
// exp.node is now either an enumerate collection, index, or modification.
404+
405+
// It is of utmost importance that this is an ordered set of Shards.
406+
// We can only join identical indexes of shards for each collection
407+
// locally.
408+
std::set<ShardID> myExp;
409+
410+
auto modNode = dynamic_cast<CollectionAccessingNode const*>(exp.node);
411+
// Only accessing nodes can end up here.
412+
TRI_ASSERT(modNode != nullptr);
413+
auto col = modNode->collection();
414+
// Should be hit earlier, a modification node here is required to have a collection
415+
TRI_ASSERT(col != nullptr);
416+
auto const& shards = shardLocking.shardsForSnippet(id(), col);
417+
for (auto const& s : shards) {
418+
auto check = shardMapping.find(s);
419+
// If we find a shard here that is not in this mapping,
420+
// we have 1) a problem with locking before that should have thrown
421+
// 2) a problem with shardMapping lookup that should have thrown before
422+
TRI_ASSERT(check != shardMapping.end());
423+
if (check->second == server || exp.isSatellite) {
424+
// add all shards on satellites.
425+
// and all shards where this server is the leader
426+
myExp.emplace(s);
427+
}
414428
}
415-
}
416-
if (myExp.empty()) {
417-
return {TRI_ERROR_CLUSTER_NOT_LEADER};
418-
}
419-
// For all other Nodes we can inject a single shard at a time.
420-
// Always use the list of nodes we maintain to hold the first
421-
// of all shards.
422-
// We later use a cone mechanism to inject other shards of permutation
423-
auto collectionAccessingNode = dynamic_cast<CollectionAccessingNode*>(exp.node);
424-
TRI_ASSERT(collectionAccessingNode != nullptr);
425-
collectionAccessingNode->setUsedShard(*myExp.begin());
426-
if (exp.doExpand) {
427-
TRI_ASSERT(!collectionAccessingNode->isUsedAsSatellite());
428-
// All parts need to have exact same size, they need to be permutated pairwise!
429-
TRI_ASSERT(numberOfShardsToPermutate == 0 || myExp.size() == numberOfShardsToPermutate);
430-
// set the max loop index (note this will essentially be done only once)
431-
numberOfShardsToPermutate = myExp.size();
432-
if (numberOfShardsToPermutate > 1) {
433-
// Only in this case we really need to do an expansion
434-
// Otherwise we get away with only using the main stream for
435-
// this server
436-
// NOTE: This might differ between servers.
437-
// One server might require an expansion (many shards) while another does not (only one shard).
438-
localExpansions.emplace(exp.node, std::move(myExp));
429+
if (myExp.empty()) {
430+
return {TRI_ERROR_CLUSTER_NOT_LEADER};
431+
}
432+
// For all other Nodes we can inject a single shard at a time.
433+
// Always use the list of nodes we maintain to hold the first
434+
// of all shards.
435+
// We later use a cone mechanism to inject other shards of permutation
436+
auto collectionAccessingNode = dynamic_cast<CollectionAccessingNode*>(exp.node);
437+
TRI_ASSERT(collectionAccessingNode != nullptr);
438+
collectionAccessingNode->setUsedShard(*myExp.begin());
439+
if (exp.doExpand) {
440+
TRI_ASSERT(!collectionAccessingNode->isUsedAsSatellite());
441+
// All parts need to have exact same size, they need to be permutated pairwise!
442+
TRI_ASSERT(numberOfShardsToPermutate == 0 || myExp.size() == numberOfShardsToPermutate);
443+
// set the max loop index (note this will essentially be done only once)
444+
numberOfShardsToPermutate = myExp.size();
445+
if (numberOfShardsToPermutate > 1) {
446+
// Only in this case we really need to do an expansion
447+
// Otherwise we get away with only using the main stream for
448+
// this server
449+
// NOTE: This might differ between servers.
450+
// One server might require an expansion (many shards) while another does not (only one shard).
451+
localExpansions.emplace(exp.node, std::move(myExp));
452+
}
453+
} else {
454+
TRI_ASSERT(myExp.size() == 1);
439455
}
440-
} else {
441-
TRI_ASSERT(myExp.size() == 1);
442456
}
443-
} // for _expansions - end;
457+
} // for _expansions - end;
444458
return {localExpansions};
445459
}
446460

0 commit comments

Comments
 (0)
0