@@ -65,7 +65,7 @@ void QuerySnippet::addNode(ExecutionNode* node) {
65
65
case ExecutionNode::K_SHORTEST_PATHS: {
66
66
auto * graphNode = ExecutionNode::castTo<GraphNode*>(node);
67
67
auto const isSatellite = graphNode->isUsedAsSatellite ();
68
- _expansions.emplace_back (node, false , isSatellite);
68
+ _expansions.emplace_back (node, !isSatellite , isSatellite);
69
69
break ;
70
70
}
71
71
case ExecutionNode::ENUMERATE_IRESEARCH_VIEW: {
@@ -77,7 +77,7 @@ void QuerySnippet::addNode(ExecutionNode* node) {
77
77
_expansions.emplace_back (node, false , false );
78
78
break ;
79
79
}
80
- case ExecutionNode::MATERIALIZE: {
80
+ case ExecutionNode::MATERIALIZE: {
81
81
auto collectionAccessingNode = dynamic_cast <CollectionAccessingNode*>(node);
82
82
// Materialize index node - true
83
83
// Materialize view node - false
@@ -334,20 +334,21 @@ ResultT<std::unordered_map<ExecutionNode*, std::set<ShardID>>> QuerySnippet::pre
334
334
}
335
335
}
336
336
}
337
- continue ;
338
- } else if ( exp.node ->getType () == ExecutionNode::TRAVERSAL
339
- || exp.node ->getType () == ExecutionNode::SHORTEST_PATH
340
- || exp.node ->getType () == ExecutionNode::K_SHORTEST_PATHS) {
341
-
337
+ } else if (exp.node ->getType () == ExecutionNode::TRAVERSAL ||
338
+ exp.node ->getType () == ExecutionNode::SHORTEST_PATH ||
339
+ exp.node ->getType () == ExecutionNode::K_SHORTEST_PATHS) {
342
340
// the same translation is copied to all servers
343
341
// there are no local expansions
344
342
345
343
auto * graphNode = ExecutionNode::castTo<GraphNode*>(exp.node );
346
- graphNode->setCollectionToShard ({}); // clear previous information
344
+ graphNode->setCollectionToShard ({}); // clear previous information
347
345
348
346
TRI_ASSERT (graphNode->isUsedAsSatellite () == exp.isSatellite );
349
347
350
348
if (!exp.isSatellite ) {
349
+ // This is either one shard or a single satellite graph which is not used
350
+ // as satellite graph.
351
+ uint64_t numShards = 0 ;
351
352
for (auto * aqlCollection : graphNode->collections ()) {
352
353
auto const & shards = aqlCollection->shardIds ();
353
354
TRI_ASSERT (!shards->empty ());
@@ -358,89 +359,102 @@ ResultT<std::unordered_map<ExecutionNode*, std::set<ShardID>>> QuerySnippet::pre
358
359
// to be used in toVelocyPack methods of classes derived
359
360
// from GraphNode
360
361
graphNode->addCollectionToShard (aqlCollection->name (), shard);
362
+
363
+ numShards++;
361
364
}
362
365
}
363
366
}
367
+
368
+ if (numShards == 0 ) {
369
+ return {TRI_ERROR_CLUSTER_NOT_LEADER};
370
+ }
371
+
372
+ bool foundEnoughShards = numShards == graphNode->collections ().size ();
373
+ TRI_ASSERT (foundEnoughShards);
374
+ if (!foundEnoughShards) {
375
+ THROW_ARANGO_EXCEPTION (TRI_ERROR_INTERNAL_AQL);
376
+ }
364
377
} else {
365
378
TRI_ASSERT (graphNode->isUsedAsSatellite ());
366
379
#ifndef USE_ENTERPRISE
367
380
TRI_ASSERT (false );
368
381
#endif
382
+
369
383
for (auto * aqlCollection : graphNode->collections ()) {
370
384
auto const & shards = shardLocking.shardsForSnippet (id (), aqlCollection);
385
+ TRI_ASSERT (shards.size () == 1 );
371
386
for (auto const & shard : shards) {
372
387
// If we find a shard here that is not in this mapping,
373
388
// we have 1) a problem with locking before that should have thrown
374
389
// 2) a problem with shardMapping lookup that should have thrown before
375
390
TRI_ASSERT (shardMapping.find (shard) != shardMapping.end ());
376
- // Could we replace the outer if/else on isSatellite with this branch
377
- // here, and remove the upper part?
391
+ // Could we replace the outer if/else on isSatellite with this
392
+ // branch here, and remove the upper part?
378
393
// if (check->second == server || exp.isSatellite) {...}
379
394
380
395
graphNode->addCollectionToShard (aqlCollection->name (), shard);
381
-
382
396
// This case currently does not exist and is not handled here.
383
397
TRI_ASSERT (!exp.doExpand );
384
398
}
385
399
}
386
400
}
387
401
388
- continue ; // skip rest - there are no local expansions
389
- }
390
- // exp.node is now either an enumerate collection, index, or modification.
391
-
392
- // It is of utmost importance that this is an ordered set of Shards.
393
- // We can only join identical indexes of shards for each collection
394
- // locally.
395
- std::set<ShardID> myExp;
396
-
397
- auto modNode = dynamic_cast <CollectionAccessingNode const *>(exp. node );
398
- // Only accessing nodes can end up here.
399
- TRI_ASSERT (modNode != nullptr );
400
- auto col = modNode-> collection ();
401
- // Should be hit earlier, a modification node here is required to have a collection
402
- TRI_ASSERT (col != nullptr );
403
- auto const & shards = shardLocking. shardsForSnippet ( id (), col);
404
- for ( auto const & s : shards) {
405
- auto check = shardMapping. find (s);
406
- // If we find a shard here that is not in this mapping,
407
- // we have 1 ) a problem with locking before that should have thrown
408
- // 2) a problem with shardMapping lookup that should have thrown before
409
- TRI_ASSERT (check != shardMapping. end ());
410
- if (check-> second == server || exp. isSatellite ) {
411
- // add all shards on satellites.
412
- // and all shards where this server is the leader
413
- myExp. emplace (s);
402
+ } else {
403
+ // exp.node is now either an enumerate collection, index, or modification.
404
+
405
+ // It is of utmost importance that this is an ordered set of Shards.
406
+ // We can only join identical indexes of shards for each collection
407
+ // locally.
408
+ std::set<ShardID> myExp;
409
+
410
+ auto modNode = dynamic_cast <CollectionAccessingNode const *>(exp. node );
411
+ // Only accessing nodes can end up here.
412
+ TRI_ASSERT (modNode != nullptr );
413
+ auto col = modNode-> collection ( );
414
+ // Should be hit earlier, a modification node here is required to have a collection
415
+ TRI_ASSERT (col != nullptr );
416
+ auto const & shards = shardLocking. shardsForSnippet ( id (), col );
417
+ for ( auto const & s : shards) {
418
+ auto check = shardMapping. find (s);
419
+ // If we find a shard here that is not in this mapping,
420
+ // we have 1) a problem with locking before that should have thrown
421
+ // 2 ) a problem with shardMapping lookup that should have thrown before
422
+ TRI_ASSERT (check != shardMapping. end ());
423
+ if (check-> second == server || exp. isSatellite ) {
424
+ // add all shards on satellites.
425
+ // and all shards where this server is the leader
426
+ myExp. emplace (s);
427
+ }
414
428
}
415
- }
416
- if (myExp.empty ()) {
417
- return {TRI_ERROR_CLUSTER_NOT_LEADER};
418
- }
419
- // For all other Nodes we can inject a single shard at a time.
420
- // Always use the list of nodes we maintain to hold the first
421
- // of all shards.
422
- // We later use a cone mechanism to inject other shards of permutation
423
- auto collectionAccessingNode = dynamic_cast <CollectionAccessingNode*>(exp.node );
424
- TRI_ASSERT (collectionAccessingNode != nullptr );
425
- collectionAccessingNode->setUsedShard (*myExp.begin ());
426
- if (exp.doExpand ) {
427
- TRI_ASSERT (!collectionAccessingNode->isUsedAsSatellite ());
428
- // All parts need to have exact same size, they need to be permutated pairwise!
429
- TRI_ASSERT (numberOfShardsToPermutate == 0 || myExp.size () == numberOfShardsToPermutate);
430
- // set the max loop index (note this will essentially be done only once)
431
- numberOfShardsToPermutate = myExp.size ();
432
- if (numberOfShardsToPermutate > 1 ) {
433
- // Only in this case we really need to do an expansion
434
- // Otherwise we get away with only using the main stream for
435
- // this server
436
- // NOTE: This might differ between servers.
437
- // One server might require an expansion (many shards) while another does not (only one shard).
438
- localExpansions.emplace (exp.node , std::move (myExp));
429
+ if (myExp.empty ()) {
430
+ return {TRI_ERROR_CLUSTER_NOT_LEADER};
431
+ }
432
+ // For all other Nodes we can inject a single shard at a time.
433
+ // Always use the list of nodes we maintain to hold the first
434
+ // of all shards.
435
+ // We later use a cone mechanism to inject other shards of permutation
436
+ auto collectionAccessingNode = dynamic_cast <CollectionAccessingNode*>(exp.node );
437
+ TRI_ASSERT (collectionAccessingNode != nullptr );
438
+ collectionAccessingNode->setUsedShard (*myExp.begin ());
439
+ if (exp.doExpand ) {
440
+ TRI_ASSERT (!collectionAccessingNode->isUsedAsSatellite ());
441
+ // All parts need to have exact same size, they need to be permutated pairwise!
442
+ TRI_ASSERT (numberOfShardsToPermutate == 0 || myExp.size () == numberOfShardsToPermutate);
443
+ // set the max loop index (note this will essentially be done only once)
444
+ numberOfShardsToPermutate = myExp.size ();
445
+ if (numberOfShardsToPermutate > 1 ) {
446
+ // Only in this case we really need to do an expansion
447
+ // Otherwise we get away with only using the main stream for
448
+ // this server
449
+ // NOTE: This might differ between servers.
450
+ // One server might require an expansion (many shards) while another does not (only one shard).
451
+ localExpansions.emplace (exp.node , std::move (myExp));
452
+ }
453
+ } else {
454
+ TRI_ASSERT (myExp.size () == 1 );
439
455
}
440
- } else {
441
- TRI_ASSERT (myExp.size () == 1 );
442
456
}
443
- } // for _expansions - end;
457
+ } // for _expansions - end;
444
458
return {localExpansions};
445
459
}
446
460
0 commit comments