8000 Moved logic to get update neighbor on disk to separate function · postgrespro/pgvector@ff6da4f · GitHub
[go: up one dir, main page]

Skip to content

Commit ff6da4f

Browse files
committed
Moved logic to get update neighbor on disk to separate function
1 parent a8b4b66 commit ff6da4f

File tree

1 file changed

+74
-64
lines changed

1 file changed

+74
-64
lines changed

src/hnswinsert.c

Lines changed: 74 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,78 @@ ConnectionExists(HnswElement e, HnswNeighborTuple ntup, int startIdx, int lm)
449449
return false;
450450
}
451451

452+
/*
453+
* Update neighbor
454+
*/
455+
static void
456+
UpdateNeighborOnDisk(HnswElement element, HnswElement newElement, int idx, int m, int lm, int lc, Relation index, bool checkExisting, bool building)
457+
{
458+
Buffer buf;
459+
Page page;
460+
GenericXLogState *state;
461+
HnswNeighborTuple ntup;
462+
int startIdx;
463+
OffsetNumber offno = element->neighborOffno;
464+
465+
/* Register page */
466+
buf = ReadBuffer(index, element->neighborPage);
467+
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
468+
if (building)
469+
{
470+
state = NULL;
471+
page = BufferGetPage(buf);
472+
}
473+
else
474+
{
475+
state = GenericXLogStart(index);
476+
page = GenericXLogRegisterBuffer(state, buf, 0);
477+
}
478+
479+
/* Get tuple */
480+
ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno));
481+
482+
/* Calculate index for update */
483+
startIdx = (element->level - lc) * m;
484+
485+
/* Check for existing connection */
486+
if (checkExisting && ConnectionExists(newElement, ntup, startIdx, lm))
487+
idx = -1;
488+
else if (idx == -2)
489+
{
490+
/* Find free offset if still exists */
491+
/* TODO Retry updating connections if not */
492+
for (int j = 0; j < lm; j++)
493+
{
494+
if (!ItemPointerIsValid(&ntup->indextids[startIdx + j]))
495+
{
496+
idx = startIdx + j;
497+
break;
498+
}
499+
}
500+
}
501+
else
502+
idx += startIdx;
503+
504+
/* Make robust to issues */
505+
if (idx >= 0 && idx < ntup->count)
506+
{
507+
ItemPointer indextid = &ntup->indextids[idx];
508+
509+
/* Update neighbor on the buffer */
510+
ItemPointerSet(indextid, newElement->blkno, newElement->offno);
511+
512+
/* Commit */
513+
if (building)
514+
MarkBufferDirty(buf);
515+
else
516+
GenericXLogFinish(state);
517+
}
518+
else if (!building)
519+
GenericXLogAbort(state);
520+
521+
UnlockReleaseBuffer(buf);
522+
}
523+
452524
/*
453525
* Update neighbors
454526
*/
@@ -465,78 +537,16 @@ HnswUpdateNeighborsOnDisk(Relation index, FmgrInfo *procinfo, Oid collation, Hns
465537
for (int i = 0; i < neighbors->length; i++)
466538
{
467539
HnswCandidate *hc = &neighbors->items[i];
468-
Buffer buf;
469-
Page page;
470-
GenericXLogState *state;
471-
HnswNeighborTuple ntup;
472-
int idx;
473-
int startIdx;
474540
HnswElement neighborElement = HnswPtrAccess(base, hc->element);
475-
OffsetNumber offno = neighborElement->neighborOffno;
541+
int idx;
476542

477543
idx = GetUpdateIndex(neighborElement, e, hc->distance, m, lm, lc, index, procinfo, collation);
478544

479545
/* New element was not selected as a neighbor */
480546
if (idx == -1)
481547
continue;
482548

483-
/* Register page */
484-
buf = ReadBuffer(index, neighborElement->neighborPage);
485-
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
486-
if (building)
487-
{
488-
state = NULL;
489-
page = BufferGetPage(buf);
490-
}
491-
else
492-
{
493-
state = GenericXLogStart(index);
494-
page = GenericXLogRegisterBuffer(state, buf, 0);
495-
}
496-
497-
/* Get tuple */
498-
ntup = (HnswNeighborTuple) PageGetItem(page, PageGetItemId(page, offno));
499-
500-
/* Calculate index for update */
501-
startIdx = (neighborElement->level - lc) * m;
502-
503-
/* Check for existing connection */
504-
if (checkExisting && ConnectionExists(e, ntup, startIdx, lm))
505-
idx = -1;
506-
else if (idx == -2)
507-
{
508-
/* Find free offset if still exists */
509-
/* TODO Retry updating connections if not */
510-
for (int j = 0; j < lm; j++)
511-
{
512-
if (!ItemPointerIsValid(&ntup->indextids[startIdx + j]))
513-
{
514-
idx = startIdx + j;
515-
break;
516-
}
517-
}
518-
}
519-
else
520-
idx += startIdx;
521-
522-
/* Make robust to issues */
523-
if (idx >= 0 && idx < ntup->count)
524-
{
525-
ItemPointer indextid = &ntup->indextids[idx];
526-
527-
/* Update neighbor on the buffer */
528-
ItemPointerSet(indextid, e->blkno, e->offno);
529-
530-
/* Commit */
531-
if (building)
532-
MarkBufferDirty(buf);
533-
else
534-
GenericXLogFinish(state);
535-
}
536-
else if (!building)
537-
GenericXLogAbort(state);
538-
539-
UnlockReleaseBuffer(buf);
549+
UpdateNeighborOnDisk(neighborElement, e, idx, m, lm, lc, index, checkExisting, building);
540550
}
541551
}
542552
}

0 commit comments

Comments
 (0)
0