@@ -430,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
430430 elog (PANIC , "cannot wait without a PGPROC structure" );
431431
432432 proc -> lwWaiting = true;
433- proc -> lwExclusive = ( mode == LW_EXCLUSIVE ) ;
433+ proc -> lwWaitMode = mode ;
434434 proc -> lwWaitLink = NULL ;
435435 if (lock -> head == NULL )
436436 lock -> head = proc ;
@@ -564,6 +564,144 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
564564 return !mustwait ;
565565}
566566
567+ /*
568+ * LWLockWaitUntilFree - Wait until a lock is free
569+ *
570+ * The semantics of this function are a bit funky. If the lock is currently
571+ * free, it is acquired in the given mode, and the function returns true. If
572+ * the lock isn't immediately free, the function waits until it is released
573+ * and returns false, but does not acquire the lock.
574+ *
575+ * This is currently used for WALWriteLock: when a backend flushes the WAL,
576+ * holding WALWriteLock, it can flush the commit records of many other
577+ * backends as a side-effect. Those other backends need to wait until the
578+ * fl
55C6
ush finishes, but don't need to acquire the lock anymore. They can just
579+ * wake up, observe that their records have already been flushed, and return.
580+ */
581+ bool
582+ LWLockWaitUntilFree (LWLockId lockid , LWLockMode mode )
583+ {
584+ volatile LWLock * lock = & (LWLockArray [lockid ].lock );
585+ PGPROC * proc = MyProc ;
586+ bool mustwait ;
587+ int extraWaits = 0 ;
588+
589+ PRINT_LWDEBUG ("LWLockWaitUntilFree" , lockid , lock );
590+
591+ /* Ensure we will have room to remember the lock */
592+ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS )
593+ elog (ERROR , "too many LWLocks taken" );
594+
595+ /*
596+ * Lock out cancel/die interrupts until we exit the code section protected
597+ * by the LWLock. This ensures that interrupts will not interfere with
598+ * manipulations of data structures in shared memory.
599+ */
600+ HOLD_INTERRUPTS ();
601+
602+ /* Acquire mutex. Time spent holding mutex should be short! */
603+ SpinLockAcquire (& lock -> mutex );
604+
605+ /* If I can get the lock, do so quickly. */
606+ if (mode == LW_EXCLUSIVE )
607+ {
608+ if (lock -> exclusive == 0 && lock -> shared == 0 )
609+ {
610+ lock -> exclusive ++ ;
611+ mustwait = false;
612+ }
613+ else
614+ mustwait = true;
615+ }
616+ else
617+ {
618+ if (lock -> exclusive == 0 )
619+ {
620+ lock -> shared ++ ;
621+ mustwait = false;
622+ }
623+ else
624+ mustwait = true;
625+ }
626+
627+ if (mustwait )
628+ {
629+ /*
630+ * Add myself to wait queue.
631+ *
632+ * If we don't have a PGPROC structure, there's no way to wait. This
633+ * should never occur, since MyProc should only be null during shared
634+ * memory initialization.
635+ */
636+ if (proc == NULL )
637+ elog (PANIC , "cannot wait without a PGPROC structure" );
638+
639+ proc -> lwWaiting = true;
640+ proc -> lwWaitMode = LW_WAIT_UNTIL_FREE ;
641+ proc -> lwWaitLink = NULL ;
642+ if (lock -> head == NULL )
643+ lock -> head = proc ;
644+ else
645+ lock -> tail -> lwWaitLink = proc ;
646+ lock -> tail = proc ;
647+
648+ /* Can release the mutex now */
649+ SpinLockRelease (& lock -> mutex );
650+
651+ /*
652+ * Wait until awakened. Like in LWLockAcquire, be prepared for bogus
653+ * wakups, because we share the semaphore with ProcWaitForSignal.
654+ */
655+ LOG_LWDEBUG ("LWLockWaitUntilFree" , lockid , "waiting" );
656+
657+ #ifdef LWLOCK_STATS
658+ block_counts [lockid ]++ ;
659+ #endif
660+
661+ TRACE_POSTGRESQL_LWLOCK_WAIT_START (lockid , mode );
662+
663+ for (;;)
664+ {
665+ /* "false" means cannot accept cancel/die interrupt here. */
666+ PGSemaphoreLock (& proc -> sem , false);
667+ if (!proc -> lwWaiting )
668+ break ;
669+ extraWaits ++ ;
670+ }
671+
672+ TRACE_POSTGRESQL_LWLOCK_WAIT_DONE (lockid , mode );
673+
674+ LOG_LWDEBUG ("LWLockWaitUntilFree" , lockid , "awakened" );
675+ }
676+ else
677+ {
678+ /* We are done updating shared state of the lock itself. */
679+ SpinLockRelease (& lock -> mutex );
680+ }
681+
682+ /*
683+ * Fix the process wait semaphore's count for any absorbed wakeups.
684+ */
685+ while (extraWaits -- > 0 )
686+ PGSemaphoreUnlock (& proc -> sem );
687+
688+ if (mustwait )
689+ {
690+ /* Failed to get lock, so release interrupt holdoff */
691+ RESUME_INTERRUPTS ();
692+ LOG_LWDEBUG ("LWLockWaitUntilFree" , lockid , "failed" );
693+ TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL (lockid , mode );
694+ }
695+ else
696+ {
697+ /* Add lock to list of locks held by this backend */
698+ held_lwlocks [num_held_lwlocks ++ ] = lockid ;
699+ TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE (lockid , mode );
700+ }
701+
702+ return !mustwait ;
703+ }
704+
567705/*
568706 * LWLockRelease - release a previously acquired lock
569707 */
@@ -618,20 +756,36 @@ LWLockRelease(LWLockId lockid)
618756 /*
619757 * Remove the to-be-awakened PGPROCs from the queue. If the front
620758 * waiter wants exclusive lock, awaken him only. Otherwise awaken
621- * as many waiters as want shared access.
759+ * as many waiters as want shared access (or just want to be
760+ * woken up when the lock becomes free without acquiring it,
761+ * ie. LWLockWaitUntilFree).
622762 */
763+ bool releaseOK = true;
764+
623765 proc =
BB94
head ;
624- if (! proc -> lwExclusive )
766+ if (proc -> lwWaitMode != LW_EXCLUSIVE )
625767 {
626768 while (proc -> lwWaitLink != NULL &&
627- !proc -> lwWaitLink -> lwExclusive )
769+ proc -> lwWaitLink -> lwWaitMode != LW_EXCLUSIVE )
770+ {
628771 proc = proc -> lwWaitLink ;
772+ if (proc -> lwWaitMode != LW_WAIT_UNTIL_FREE )
773+ releaseOK = false;
774+ }
629775 }
630776 /* proc is now the last PGPROC to be released */
631777 lock -> head = proc -> lwWaitLink ;
632778 proc -> lwWaitLink = NULL ;
633- /* prevent additional wakeups until retryer gets to run */
634- lock -> releaseOK = false;
779+ /*
780+ * Prevent additional wakeups until retryer gets to run. Backends
781+ * that are just waiting for the lock to become free don't prevent
782+ * wakeups, because they might decide that they don't want the
783+ * lock, after all.
784+ */
785+ if (proc -> lwWaitMode != LW_WAIT_UNTIL_FREE )
786+ releaseOK = false;
787+
788+ lock -> releaseOK = releaseOK ;
635789 }
636790 else
637791 {
0 commit comments