1616
1717#include "access/gin_private.h"
1818#include "miscadmin.h"
19+ #include "utils/memutils.h"
1920#include "utils/rel.h"
2021
2122static void ginFindParents (GinBtree btree , GinBtreeStack * stack );
@@ -309,28 +310,46 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
309310 * Insert a new item to a page.
310311 *
311312 * Returns true if the insertion was finished. On false, the page was split and
312- * the parent needs to be updated. (a root split returns true as it doesn't
313- * need any further action by the caller to complete)
313+ * the parent needs to be updated. (A root split returns true as it doesn't
314+ * need any further action by the caller to complete. )
314315 *
315- * When inserting a downlink to a internal page, 'childbuf' contains the
316+ * When inserting a downlink to an internal page, 'childbuf' contains the
316317 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
317- * atomically with the insert. Also, the existing item at the given location
318- * is updated to point to ' updateblkno' .
318+ * atomically with the insert. Also, the existing item at offset stack->off
319+ * in the target page is updated to point to updateblkno.
319320 *
320321 * stack->buffer is locked on entry, and is kept locked.
322+ * Likewise for childbuf, if given.
321323 */
322324static bool
323325ginPlaceToPage (GinBtree btree , GinBtreeStack * stack ,
324326 void * insertdata , BlockNumber updateblkno ,
325327 Buffer childbuf , GinStatsData * buildStats )
326328{
327329 Page page = BufferGetPage (stack -> buffer );
328- XLogRecData * payloadrdata ;
330+ bool result ;
329331 GinPlaceToPageRC rc ;
330332 uint16 xlflags = 0 ;
331333 Page childpage = NULL ;
332334 Page newlpage = NULL ,
333335 newrpage = NULL ;
336+ void * ptp_workspace = NULL ;
337+ XLogRecData payloadrdata [10 ];
338+ MemoryContext tmpCxt ;
339+ MemoryContext oldCxt ;
340+
341+ /*
342+ * We do all the work of this function and its subfunctions in a temporary
343+ * memory context. This avoids leakages and simplifies APIs, since some
344+ * subfunctions allocate storage that has to survive until we've finished
345+ * the WAL insertion.
346+ */
347+ tmpCxt = AllocSetContextCreate (CurrentMemoryContext ,
348+ "ginPlaceToPage temporary context" ,
349+ ALLOCSET_DEFAULT_MINSIZE ,
350+ ALLOCSET_DEFAULT_INITSIZE ,
351+ ALLOCSET_DEFAULT_MAXSIZE );
352+ oldCxt = MemoryContextSwitchTo (tmpCxt );
334353
335354 if (GinPageIsData (page ))
336355 xlflags |= GIN_INSERT_ISDATA ;
@@ -348,21 +367,36 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
348367 }
349368
350369 /*
351- * Try to put the incoming tuple on the page. placeToPage will decide if
352- * the page needs to be split.
370+ * See if the incoming tuple will fit on the page. beginPlaceToPage will
371+ * decide if the page needs to be split, and will compute the split
372+ * contents if so. See comments for beginPlaceToPage and execPlaceToPage
373+ * functions for more details of the API here.
353374 */
354- rc = btree -> placeToPage (btree , stack -> buffer , stack ,
355- insertdata , updateblkno ,
356- & payloadrdata , & newlpage , & newrpage );
357- if (rc == UNMODIFIED )
358- return true;
359- else if (rc == INSERTED )
375+ rc = btree -> beginPlaceToPage (btree , stack -> buffer , stack ,
376+ insertdata , updateblkno ,
377+ & ptp_workspace ,
378+ & newlpage , & newrpage ,
379+ payloadrdata );
380+
381+ if (rc == GPTP_NO_WORK )
360382 {
361- /* placeToPage did START_CRIT_SECTION() */
383+ /* Nothing to do */
384+ result = true;
385+ }
386+ else if (rc == GPTP_INSERT )
387+ {
388+ /* It will fit, perform the insertion */
389+ START_CRIT_SECTION ();
390+
391+ /* Perform the page update, and set up WAL data about it */
392+ btree -> execPlaceToPage (btree , stack -> buffer , stack ,
393+ insertdata , updateblkno ,
394+ ptp_workspace , payloadrdata );
395+
362396 MarkBufferDirty (stack -> buffer );
363397
364398 /* An insert to an internal page finishes the split of the child. */
365- if (childbuf != InvalidBuffer )
399+ if (BufferIsValid ( childbuf ) )
366400 {
367401 GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
368402 MarkBufferDirty (childbuf );
@@ -387,7 +421,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
387421 * Log information about child if this was an insertion of a
388422 * downlink.
389423 */
390- if (childbuf != InvalidBuffer )
424+ if (BufferIsValid ( childbuf ) )
391425 {
392426 rdata [0 ].next = & rdata [1 ];
393427
@@ -400,7 +434,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
400434 rdata [1 ].next = & rdata [2 ];
401435
402436 rdata [2 ].buffer = childbuf ;
403- rdata [2 ].buffer_std = false ;
437+ rdata [2 ].buffer_std = true ;
404438 rdata [2 ].data = NULL ;
405439 rdata [2 ].len = 0 ;
406440 rdata [2 ].next = payloadrdata ;
@@ -409,25 +443,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
409443 rdata [0 ].next = payloadrdata ;
410444
411445 recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_INSERT , rdata );
446+
412447 PageSetLSN (page , recptr );
413- if (childbuf != InvalidBuffer )
448+ if (BufferIsValid ( childbuf ) )
414449 PageSetLSN (childpage , recptr );
415450 }
416451
417452 END_CRIT_SECTION ();
418453
419- return true;
454+ /* Insertion is complete. */
455+ result = true;
420456 }
421- else if (rc == SPLIT )
457+ else if (rc == GPTP_SPLIT )
422458 {
423- /* Didn't fit, have to split */
459+ /*
460+ * Didn't fit, need to split. The split has been computed in newlpage
461+ * and newrpage, which are pointers to palloc'd pages, not associated
462+ * with buffers. stack->buffer is not touched yet.
463+ */
424464 Buffer rbuffer ;
425465 BlockNumber savedRightLink ;
426- XLogRecData rdata [2 ];
427466 ginxlogSplit data ;
428467 Buffer lbuffer = InvalidBuffer ;
429468 Page newrootpg = NULL ;
430469
470+ /* Get a new index page to become the right page */
431471 rbuffer = GinNewBuffer (btree -> index );
432472
433473 /* During index build, count the new page */
@@ -441,52 +481,27 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
441481
442482 savedRightLink = GinPageGetOpaque (page )-> rightlink ;
443483
444- /*
445- * newlpage and newrpage are pointers to memory pages, not associated
446- * with buffers. stack->buffer is not touched yet.
447- */
448-
484+ /* Begin setting up WAL record (which we might not use) */
449485 data .node = btree -> index -> rd_node ;
450486 data .rblkno = BufferGetBlockNumber (rbuffer );
451487 data .flags = xlflags ;
452- if (childbuf != InvalidBuffer )
488+ if (BufferIsValid ( childbuf ) )
453489 {
454- Page childpage = BufferGetPage (childbuf );
455-
456- GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
457-
458490 data .leftChildBlkno = BufferGetBlockNumber (childbuf );
459491 data .rightChildBlkno = GinPageGetOpaque (childpage )-> rightlink ;
460492 }
461493 else
462494 data .leftChildBlkno = data .rightChildBlkno = InvalidBlockNumber ;
463495
464- rdata [0 ].buffer = InvalidBuffer ;
465- rdata [0 ].data = (char * ) & data ;
466- rdata [0 ].len = sizeof (ginxlogSplit );
467-
468- if (childbuf != InvalidBuffer )
469- {
470- rdata [0 ].next = & rdata [1 ];
471-
472- rdata [1 ].buffer = childbuf ;
473- rdata [1 ].buffer_std = false;
474- rdata [1 ].data = NULL ;
475- rdata [1 ].len = 0 ;
476- rdata [1 ].next = payloadrdata ;
477- }
478- else
479- rdata [0 ].next = payloadrdata ;
480-
481496 if (stack -> parent == NULL )
482497 {
483498 /*
484- * split root, so we need to allocate new left page and place
485- * pointer on root to left and right page
499+ * splitting the root, so we need to allocate new left page and
500+ * place pointers to left and right page on root page.
486501 */
487502 lbuffer = GinNewBuffer (btree -> index );
488503
489- /* During index build, count the newly-added root page */
504+ /* During index build, count the new left page */
490505 if (buildStats )
491506 {
492507 if (btree -> isData )
@@ -508,9 +523,9 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
508523
509524 /*
510525 * Construct a new root page containing downlinks to the new left
511- * and right pages. (do this in a temporary copy first rather than
512- * overwriting the original page directly, so that we can still
513- * abort gracefully if this fails .)
526+ * and right pages. (Do this in a temporary copy rather than
527+ * overwriting the original page directly, since we're not in the
528+ * critical section yet .)
514529 */
515530 newrootpg = PageGetTempPage (newrpage );
516531 GinInitPage (newrootpg , GinPageGetOpaque (newlpage )-> flags & ~(GIN_LEAF | GIN_COMPRESSED ), BLCKSZ );
@@ -521,7 +536,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
521536 }
522537 else
523538 {
524- /* split non-root page */
539+ /* splitting a non-root page */
525540 data .rrlink = savedRightLink ;
526541 data .lblkno = BufferGetBlockNumber (stack -> buffer );
527542
@@ -531,48 +546,70 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
531546 }
532547
533548 /*
534- * Ok , we have the new contents of the left page in a temporary copy
535- * now (newlpage), and the newly-allocated right block has been filled
536- * in . The original page is still unchanged.
549+ * OK , we have the new contents of the left page in a temporary copy
550+ * now (newlpage), and likewise for the new contents of the
551+ * newly-allocated right block . The original page is still unchanged.
537552 *
538553 * If this is a root split, we also have a temporary page containing
539- * the new contents of the root. Copy the new left page to a
540- * newly-allocated block, and initialize the (original) root page the
541- * new copy. Otherwise, copy over the temporary copy of the new left
542- * page over the old left page.
554+ * the new contents of the root.
543555 */
544556
545557 START_CRIT_SECTION ();
546558
547559 MarkBufferDirty (rbuffer );
548560 MarkBufferDirty (stack -> buffer );
549- if (BufferIsValid (childbuf ))
550- MarkBufferDirty (childbuf );
551561
552562 /*
553- * Restore the temporary copies over the real buffers. But don't free
554- * the temporary copies yet, WAL record data points to them.
563+ * Restore the temporary copies over the real buffers.
555564 */
556565 if (stack -> parent == NULL )
557566 {
567+ /* Splitting the root, three pages to update */
558568 MarkBufferDirty (lbuffer );
559- memcpy (BufferGetPage ( stack -> buffer ) , newrootpg , BLCKSZ );
569+ memcpy (page , newrootpg , BLCKSZ );
560570 memcpy (BufferGetPage (lbuffer ), newlpage , BLCKSZ );
561571 memcpy (BufferGetPage (rbuffer ), newrpage , BLCKSZ );
562572 }
563573 else
564574 {
565- memcpy (BufferGetPage (stack -> buffer ), newlpage , BLCKSZ );
575+ /* Normal split, only two pages to update */
576+ memcpy (page , newlpage , BLCKSZ );
566577 memcpy (BufferGetPage(rbuffer ), newrpage , BLCKSZ );
567578 }
568579
580+ /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
581+ if (BufferIsValid (childbuf ))
582+ {
583+ GinPageGetOpaque (childpage )-> flags &= ~GIN_INCOMPLETE_SPLIT ;
584+ MarkBufferDirty (childbuf );
585+ }
586+
569587 /* write WAL record */
570588 if (RelationNeedsWAL (btree -> index ))
571589 {
590+ XLogRecData rdata [2 ];
572591 XLogRecPtr recptr ;
573592
593+ rdata [0 ].buffer = InvalidBuffer ;
594+ rdata [0 ].data = (char * ) & data ;
595+ rdata [0 ].len = sizeof (ginxlogSplit );
596+
597+ if (BufferIsValid (childbuf ))
598+ {
599+ rdata [0 ].next = & rdata [1 ];
600+
601+ rdata [1 ].buffer = childbuf ;
602+ rdata [1 ].buffer_std = true;
603+ rdata [1 ].data = NULL ;
604+ rdata [1 ].len = 0 ;
605+ rdata [1 ].next = payloadrdata ;
606+ }
607+ else
608+ rdata [0 ].next = payloadrdata ;
609+
574610 recptr = XLogInsert (RM_GIN_ID , XLOG_GIN_SPLIT , rdata );
575- PageSetLSN (BufferGetPage (stack -> buffer ), recptr );
611+
612+ PageSetLSN (page , recptr );
576613 PageSetLSN (BufferGetPage (rbuffer ), recptr );
577614 if (stack -> parent == NULL )
578615 PageSetLSN (BufferGetPage (lbuffer ), recptr );
@@ -582,33 +619,31 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
582619 END_CRIT_SECTION ();
583620
584621 /*
585- * We can release the lock on the right page now, but keep the
586- * original buffer locked.
622+ * We can release the locks/pins on the new pages now, but keep
623+ * stack-> buffer locked. childbuf doesn't get unlocked either .
587624 */
588625 UnlockReleaseBuffer (rbuffer );
589626 if (stack -> parent == NULL )
590627 UnlockReleaseBuffer (lbuffer );
591628
592- pfree (newlpage );
593- pfree (newrpage );
594- if (newrootpg )
595- pfree (newrootpg );
596-
597629 /*
598630 * If we split the root, we're done. Otherwise the split is not
599631 * complete until the downlink for the new page has been inserted to
600632 * the parent.
601633 */
602- if (stack -> parent == NULL )
603- return true;
604- else
605- return false;
634+ result = (stack -> parent == NULL );
606635 }
607636 else
608637 {
609- elog (ERROR , "unknown return code from GIN placeToPage method: %d" , rc );
610- return false; /* keep compiler quiet */
638+ elog (ERROR , "invalid return code from GIN placeToPage method: %d" , rc );
639+ result = false; /* keep compiler quiet */
611640 }
641+
642+ /* Clean up temp context */
643+ MemoryContextSwitchTo (oldCxt );
644+ MemoryContextDelete (tmpCxt );
645+
646+ return result ;
612647}
613648
614649/*
0 commit comments