16
16
17
17
#include "access/gin_private.h"
18
18
#include "miscadmin.h"
19
+ #include "utils/memutils.h"
19
20
#include "utils/rel.h"
20
21
21
22
static void ginFindParents (GinBtree btree , GinBtreeStack * stack );
@@ -309,28 +310,46 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
309
310
* Insert a new item to a page.
310
311
*
311
312
* Returns true if the insertion was finished. On false, the page was split and
312
- * the parent needs to be updated. (a root split returns true as it doesn't
313
- * need any further action by the caller to complete)
313
+ * the parent needs to be updated. (A root split returns true as it doesn't
314
+ * need any further action by the caller to complete. )
314
315
*
315
- * When inserting a downlink to a internal page, 'childbuf' contains the
316
+ * When inserting a downlink to an internal page, 'childbuf' contains the
316
317
* child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
317
- * atomically with the insert. Also, the existing item at the given location
318
- * is updated to point to ' updateblkno' .
318
+ * atomically with the insert. Also, the existing item at offset stack->off
319
+ * in the target page is updated to point to updateblkno.
319
320
*
320
321
* stack->buffer is locked on entry, and is kept locked.
322
+ * Likewise for childbuf, if given.
321
323
*/
322
324
static bool
323
325
ginPlaceToPage (GinBtree btree , GinBtreeStack * stack ,
324
326
void * insertdata , BlockNumber updateblkno ,
325
327
Buffer childbuf , GinStatsData * buildStats )
326
328
{
327
329
Page page = BufferGetPage (stack -> buffer );
328
- XLogRecData * payloadrdata ;
330
+ bool result ;
329
331
GinPlaceToPageRC rc ;
330
332
uint16 xlflags = 0 ;
331
333
Page childpage = NULL ;
332
334
Page newlpage = NULL ,
333
335
newrpage = NULL ;
336
+ void * ptp_workspace = NULL ;
337
+ XLogRecData payloadrdata [10 ];
338
+ MemoryContext tmpCxt ;
339
+ MemoryContext oldCxt ;
340
+
341
+ /*
342
+ * We do all the work of this function and its subfunctions in a temporary
343
+ * memory context. This avoids leakages and simplifies APIs, since some
344
+ * subfunctions allocate storage that has to survive until we've finished
345
+ * the WAL insertion.
346
+ */
347
+ tmpCxt = AllocSetContextCreate (CurrentMemoryContext ,
348
+ "ginPlaceToPage temporary context" ,
349
+ ALLOCSET_DEFAULT_MINSIZE ,
350
+ ALLOCSET_DEFAULT_INITSIZE ,
351
+ ALLOCSET_DEFAULT_MAXSIZE );
352
+ oldCxt = MemoryContextSwitchTo (tmpCxt );
334
353
335
354
if (GinPageIsData (page ))
336
355
xlflags |= GIN_INSERT_ISDATA ;
@@ -348,21 +367,36 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
348
367
}
349
368
350
369
/*
351
- * Try to put the incoming tuple on the page. placeToPage will decide if
352
- * the page needs to be split.
370
+ * See if the incoming tuple will fit on the page. beginPlaceToPage will
371
+ * decide if the page needs to be split, and will compute the split
372
+ * contents if so. See comments for beginPlaceToPage and execPlaceToPage
373
+ * functions for more details of the API here.
353
374
*/
354
- rc = btree -> placeToPage (btree , stack -> buffer , stack ,
355
- insertdata , updateblkno ,
356
- & payloadrdata , & newlpage , & newrpage );
357
- if (rc == UNMODIFIED )
358
- return true;
359
- else if (rc == INSERTED )
375
+ rc = btree -> beginPlaceToPage (btree , stack -> buffer , stack ,
376
+ insertdata , updateblkno ,
377
+ & ptp_workspace ,
378
+ & newlpage , & newrpage ,
379
+ payloadrdata );
380
+
381
+ if (rc == GPTP_NO_WORK )
360
382
{
361
- /* placeToPage did START_CRIT_SECTION() */
383
+ /* Nothing to do */
384
+ result = true;
385
+ }
386
+ else if (rc == GPTP_INSERT )
387
+ {
388
+ /* It will fit, perform the insertion */
389
+ START_CRIT_SECTION ();
390
+
391
+ /* Perform the page update, and set up WAL data about it */
392
+ btree -> execPlaceToPage (btree , stack -> buffer , stack ,
393
+ insertdata , updateblkno ,
394
+ ptp_workspace , payloadrdata );
395
+
362
396
MarkBufferDirty (stack -> buffer );
363
397
364
398
/* An insert to an internal page finishes the split of the child. */
365
- if (childbuf != InvalidBuffer )
399
+ if (BufferIsValid ( childbuf ) )
366
400
{
367
401
GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
368
402
MarkBufferDirty (childbuf );
@@ -387,7 +421,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
387
421
* Log information about child if this was an insertion of a
388
422
* downlink.
389
423
*/
390
- if (childbuf != InvalidBuffer )
424
+ if (BufferIsValid ( childbuf ) )
391 425
{
392
426
rdata [0 ].next = & rdata [1 ];
393
427
@@ -400,7 +434,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
400
434
rdata [1 ].next = & rdata [2 ];
401
435
402
436
rdata [2 ].buffer = childbuf ;
403
- rdata [2 ].buffer_std = false ;
437
+ rdata [2 ].buffer_std = true ;
404
438
rdata [2 ].data = NULL ;
405
439
rdata [2 ].len = 0 ;
406
440
rdata [2 ].next = payloadrdata ;
@@ -409,25 +443,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
409
443
rdata [0 ].next = payloadrdata ;
410
444
411
445
recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_INSERT , rdata );
446
+
412
447
PageSetLSN (page , recptr );
413
- if (childbuf != InvalidBuffer )
448
+ if (BufferIsValid ( childbuf ) )
414
449
PageSetLSN (childpage , recptr );
415
450
}
416
451
417
452
END_CRIT_SECTION ();
418
453
419
- return true;
454
+ /* Insertion is complete. */
455
+ result = true;
420
456
}
421
- else if (rc == SPLIT )
457
+ else if (rc == GPTP_SPLIT )
422
458
{
423
- /* Didn't fit, have to split */
459
+ /*
460
+ * Didn't fit, need to split. The split has been computed in newlpage
461
+ * and newrpage, which are pointers to palloc'd pages, not associated
462
+ * with buffers. stack->buffer is not touched yet.
463
+ */
424
464
Buffer rbuffer ;
425
465
BlockNumber savedRightLink ;
426
- XLogRecData rdata [2 ];
427
466
ginxlogSplit data ;
428
467
Buffer lbuffer = InvalidBuffer ;
429
468
Page newrootpg = NULL ;
430
469
470
+ /* Get a new index page to become the right page */
431
471
rbuffer = GinNewBuffer (btree -> index );
432
472
433
473
/* During index build, count the new page */
@@ -441,52 +481,27 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
441
481
442
482
savedRightLink = GinPageGetOpaque (page )-> rightlink ;
443
483
444
- /*
445
- * newlpage and newrpage are pointers to memory pages, not associated
446
- * with buffers. stack->buffer is not touched yet.
447
- */
448
-
484
+ /* Begin setting up WAL record (which we might not use) */
449
485
data .node = btree -> index -> rd_node ;
450
486
data .rblkno = BufferGetBlockNumber (rbuffer );
451
487
data .flags = xlflags ;
452
- if (childbuf != InvalidBuffer )
488
+ if (BufferIsValid ( childbuf ) )
453
489
{
454
- Page childpage = BufferGetPage (childbuf );
455
-
456
- GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
457
-
458
490
data .leftChildBlkno = BufferGetBlockNumber (childbuf );
459
491
data .rightChildBlkno = GinPageGetOpaque (childpage )-> rightlink ;
460
492
}
461
493
else
462
494
data .leftChildBlkno = data .rightChildBlkno = InvalidBlockNumber ;
463
495
464
- rdata [0 ].buffer = InvalidBuffer ;
465
- rdata [0 ].data = (char * ) & data ;
466
- rdata [0 ].len = sizeof (ginxlogSplit );
467
-
468
- if (childbuf != InvalidBuffer )
469
- {
470
- rdata [0 ].next = & rdata [1 ];
471
-
472
- rdata [1 ].buffer = childbuf ;
473
- rdata [1 ].buffer_std = false;
474
- rdata [1 ].data = NULL ;
475
- rdata [1 ].len = 0 ;
476
- rdata [1 ].next = payloadrdata ;
477
- }
478
- else
479
- rdata [0 ].next = payloadrdata ;
480
-
481
496
if (stack -> parent == NULL )
482
497
{
483
498
/*
484
- * split root, so we need to allocate new left page and place
485
- * pointer on root to left and right page
499
+ * splitting the root, so we need to allocate new left page and
500
+ * place pointers to left and right page on root page.
486
501
*/
487
502
lbuffer = GinNewBuffer (btree -> index );
488
503
489
- /* During index build, count the newly-added root page */
504
+ /* During index build, count the new left page */
490
505
if (buildStats )
491
506
{
492
507
if (btree -> isData )
@@ -508,9 +523,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
508
523
509
524
/*
510
525
* Construct a new root page containing downlinks to the new left
511
- * and right pages. (do this in a temporary copy first rather than
512
- * overwriting the original page directly, so that we can still
513
- * abort gracefully if this fails .)
526
+ * and right pages. (Do this in a temporary copy rather than
527
+ * overwriting the original page directly, since we're not in the
528
+ * critical section yet .)
514
529
*/
515
530
newrootpg = PageGetTempPage (newrpage );
516
531
GinInitPage (newrootpg , GinPageGetOpaque (newlpage )-> flags & ~(GIN_LEAF | GIN_COMPRESSED ), BLCKSZ );
@@ -521,7 +536,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
521
536
}
522
537
else
523
538
{
524
- /* split non-root page */
539
+ /* splitting a non-root page */
525
540
data .rrlink = savedRightLink ;
526
541
data .lblkno = BufferGetBlockNumber (stack -> buffer );
527
542
@@ -531,48 +546,70 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
531
546
}
532
547
533
548
/*
534
- * Ok , we have the new contents of the left page in a temporary copy
535
- * now (newlpage), and the newly-allocated right block has been filled
536
- * in . The original page is still unchanged.
549
+ * OK , we have the new contents of the left page in a temporary copy
550
+ * now (newlpage), and likewise for the new contents of the
551
+ * newly-allocated right block . The original page is still unchanged.
537
552
*
538
553
* If this is a root split, we also have a temporary page containing
539
- * the new contents of the root. Copy the new left page to a
540
- * newly-allocated block, and initialize the (original) root page the
541
- * new copy. Otherwise, copy over the temporary copy of the new left
542
- * page over the old left page.
554
+ * the new contents of the root.
543
555
*/
544
556
545
557
START_CRIT_SECTION ();
546
558
547
559
MarkBufferDirty (rbuffer );
548
560
MarkBufferDirty (stack -> buffer );
549
10000
- if (BufferIsValid (childbuf ))
550
- MarkBufferDirty (childbuf );
551
561
552
562
/*
553
- * Restore the temporary copies over the real buffers. But don't free
554
- * the temporary copies yet, WAL record data points to them.
563
+ * Restore the temporary copies over the real buffers.
555
564
*/
556
565
if (stack -> parent == NULL )
557
566
{
567
+ /* Splitting the root, three pages to update */
558
568
MarkBufferDirty (lbuffer );
559
- memcpy (BufferGetPage ( stack -> buffer ) , newrootpg , BLCKSZ );
569
+ memcpy (page , newrootpg , BLCKSZ );
560
570
memcpy (BufferGetPage (lbuffer ), newlpage , BLCKSZ );
561
571
memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
562
572
}
563
573
else
564
574
{
565
- memcpy (BufferGetPage (stack -> buffer ), newlpage , BLCKSZ );
575
+ /* Normal split, only two pages to update */
576
+ memcpy (page , newlpage , BLCKSZ );
566
577
memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
567
578
}
568
579
580
+ /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
581
+ if (BufferIsValid (childbuf ))
582
+ {
583
+ GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
584
+ MarkBufferDirty (childbuf );
585
+ }
586
+
569
587
/* write WAL record */
570
588
if (RelationNeedsWAL (btree -> index ))
571
589
{
590
+ XLogRecData rdata [2 ];
572
591
XLogRecPtr recptr ;
573
592
593
+ rdata [0 ].buffer = InvalidBuffer ;
594
+ rdata [0 ].data = (char * ) & data ;
595
+ rdata [0 ].len = sizeof (ginxlogSplit );
596
+
597
+ if (BufferIsValid (childbuf ))
598
+ {
599
+ rdata [0 ].next = & rdata [1 ];
600
+
601
+ rdata [1 ].buffer = childbuf ;
602
+ rdata [1 ].buffer_std = true;
603
+ rdata [1 ].data = NULL ;
604
+ rdata [1 ].len = 0 ;
605
+ rdata [1 ].next = payloadrdata ;
606
+ }
607
+ else
608
+ rdata [0 ].next = payloadrdata ;
609
+
574
610
recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_SPLIT , rdata );
575
- PageSetLSN (BufferGetPage (stack -> buffer ), recptr );
611
+
612
+ PageSetLSN (page , recptr );
576
613
PageSetLSN (BufferGetPage (rbuffer ), recptr );
577
614
if (stack -> parent == NULL )
578
615
PageSetLSN (BufferGetPage (lbuffer ), recptr );
@@ -582,33 +619,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
582
619
END_CRIT_SECTION ();
583
620
584
621
/*
585
- * We can release the lock on the right page now, but keep the
586
- * original buffer locked.
622
+ * We can release the locks/pins on the new pages now, but keep
623
+ * stack-> buffer locked. childbuf doesn't get unlocked either .
587
624
*/
588
625
UnlockReleaseBuffer (rbuffer );
589
626
if (stack -> parent == NULL )
590
627
UnlockReleaseBuffer (lbuffer );
591
628
592
- pfree (newlpage );
593
- pfree (newrpage );
594
- if (newrootpg )
595
- pfree (newrootpg );
596
-
597
629
/*
598
630
* If we split the root, we're done. Otherwise the split is not
599
631
* complete until the downlink for the new page has been inserted to
600
632
* the parent.
601
633
*/
602
- if (stack -> parent == NULL )
603
- return true;
604
- else
605
- return false;
634
+ result = (stack -> parent == NULL );
606
635
}
607
636
else
608
637
{
609
- elog (ERROR , "unknown return code from GIN placeToPage method: %d" , rc );
610
- return false; /* keep compiler quiet */
638
+ elog (ERROR , "invalid return code from GIN placeToPage method: %d" , rc );
639
+ result = false; /* keep compiler quiet */
611
640
}
641
+
642
+ /* Clean up temp context */
643
+ MemoryContextSwitchTo (oldCxt );
644
+ MemoryContextDelete (tmpCxt );
645
+
646
+ return result ;
612
647
}
613
648
614
649
/*
0 commit comments