@@ -449,6 +449,78 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm)
449
449
return false;
450
450
}
451
451
452
+ /*
453
+ * Update neighbor
454
+ */
455
+ static void
456
+ UpdateNeighborOnDisk (HnswElement element , HnswElement newElement , int idx , int m , int lm , int lc , Relation index , bool checkExisting , bool building )
457
+ {
458
+ Buffer buf ;
459
+ Page page ;
460
+ GenericXLogState * state ;
461
+ HnswNeighborTuple ntup ;
462
+ int startIdx ;
463
+ OffsetNumber offno = element -> neighborOffno ;
464
+
465
+ /* Register page */
466
+ buf = ReadBuffer (index , element -> neighborPage );
467
+ LockBuffer (buf , BUFFER_LOCK_EXCLUSIVE );
468
+ if (building )
469
+ {
470
+ state = NULL ;
471
+ page = BufferGetPage (buf );
472
+ }
473
+ else
474
+ {
475
+ state = GenericXLogStart (index );
476
+ page = GenericXLogRegisterBuffer (state , buf , 0 );
477
+ }
478
+
479
+ /* Get tuple */
480
+ ntup = (HnswNeighborTuple ) PageGetItem (page , PageGetItemId (page , offno ));
481
+
482
+ /* Calculate index for update */
483
+ startIdx = (element -> level - lc ) * m ;
484
+
485
+ /* Check for existing connection */
486
+ if (checkExisting && ConnectionExists (newElement , ntup , startIdx , lm ))
487
+ idx = -1 ;
488
+ else if (idx == -2 )
489
+ {
490
+ /* Find free offset if still exists */
491
+ /* TODO Retry updating connections if not */
492
+ for (int j = 0 ; j < lm ; j ++ )
493
+ {
494
+ if (!ItemPointerIsValid (& ntup -> indextids [startIdx + j ]))
495
+ {
496
+ idx = startIdx + j ;
497
+ break ;
498
+ }
499
+ }
500
+ }
501
+ else
502
+ idx += startIdx ;
503
+
504
+ /* Make robust to issues */
505
+ if (idx >= 0 && idx < ntup -> count )
506
+ {
507
+ ItemPointer indextid = & ntup -> indextids [idx ];
508
+
509
+ /* Update neighbor on the buffer */
510
+ ItemPointerSet (indextid , newElement -> blkno , newElement -> offno );
511
+
512
+ /* Commit */
513
+ if (building )
514
+ MarkBufferDirty (buf );
515
+ else
516
+ GenericXLogFinish (state );
517
+ }
518
+ else if (!building )
519
+ GenericXLogAbort (state );
520
+
521
+ UnlockReleaseBuffer (buf );
522
+ }
523
+
452
524
/*
453
525
* Update neighbors
454
526
*/
@@ -465,78 +537,16 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
465
537
for (int i = 0 ; i < neighbors -> length ; i ++ )
466
538
{
467
539
HnswCandidate * hc = & neighbors -> items [i ];
468
- Buffer buf ;
469
- Page page ;
470
- GenericXLogState * state ;
471
- HnswNeighborTuple ntup ;
472
- int idx ;
473
- int startIdx ;
474
540
HnswElement neighborElement = HnswPtrAccess (base , hc -> element );
475
- OffsetNumber offno = neighborElement -> neighborOffno ;
541
+ int idx ;
476
542
477
543
idx = GetUpdateIndex (neighborElement , e , hc -> distance , m , lm , lc , index , procinfo , collation );
478
544
479
545
/* New element was not selected as a neighbor */
480
546
if (idx == -1 )
481
547
continue ;
482
548
483
- /* Register page */
484
- buf = ReadBuffer (index , neighborElement -> neighborPage );
485
- LockBuffer (buf , BUFFER_LOCK_EXCLUSIVE );
486
- if (building )
487
- {
488
- state = NULL ;
489
- page = BufferGetPage (buf );
490
- }
491
- else
492
- {
493
- state = GenericXLogStart (index );
494
- page = GenericXLogRegisterBuffer (state , buf , 0 );
495
- }
496
-
497
- /* Get tuple */
498
- ntup = (HnswNeighborTuple ) PageGetItem (page , PageGetItemId (page , offno ));
499
-
500
- /* Calculate index for update */
501
- startIdx = (neighborElement -> level - lc ) * m ;
502
-
503
- /* Check for existing connection */
504
- if (checkExisting && ConnectionExists (e , ntup , startIdx , lm ))
505
- idx = -1 ;
506
- else if (idx == -2 )
507
- {
508
- /* Find free offset if still exists */
509
- /* TODO Retry updating connections if not */
510
- for (int j = 0 ; j < lm ; j ++ )
511
- {
512
- if (!ItemPointerIsValid (& ntup -> indextids [startIdx + j ]))
513
- {
514
- idx = startIdx + j ;
515
- break ;
516
- }
517
- }
518
- }
519
- else
520
- idx += startIdx ;
521
-
522
- /* Make robust to issues */
523
- if (idx >= 0 && idx < ntup -> count )
524
- {
525
- ItemPointer indextid = & ntup -> indextids [idx ];
526
-
527
- /* Update neighbor on the buffer */
528
- ItemPointerSet (indextid , e -> blkno , e -> offno );
529
-
530
- /* Commit */
531
- if (building )
532
- MarkBufferDirty (buf );
533
- else
534
- GenericXLogFinish (state );
535
- }
536
- else if (!building )
537
- GenericXLogAbort (state );
538
-
539
- UnlockReleaseBuffer (buf );
549
+ UpdateNeighborOnDisk (neighborElement , e , idx , m , lm , lc , index , checkExisting , building );
540
550
}
541
551
}
542
552
}
0 commit comments