@@ -75,7 +75,6 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ?
7575
7676/* User-settable parameters for walsender */
7777int max_wal_senders = 0 ; /* the maximum number of concurrent walsenders */
78- int WalSndDelay = 1000 ; /* max sleep time between some actions */
7978int replication_timeout = 60 * 1000 ; /* maximum time to send one
8079 * WAL data message */
8180
@@ -475,7 +474,7 @@ ProcessRepliesIfAny(void)
475474{
476475 unsigned char firstchar ;
477476 int r ;
478- int received = false;
477+ bool received = false;
479478
480479 for (;;)
481480 {
@@ -709,6 +708,9 @@ WalSndLoop(void)
709708 /* Loop forever, unless we get an error */
710709 for (;;)
711710 {
711+ /* Clear any already-pending wakeups */
712+ ResetLatch (& MyWalSnd -> latch );
713+
712714 /*
713715 * Emergency bailout if postmaster has died. This is to avoid the
714716 * necessity for manual cleanup of all postmaster children.
@@ -727,94 +729,112 @@ WalSndLoop(void)
727729 /* Normal exit from the walsender is here */
728730 if (walsender_shutdown_requested )
729731 {
730- /* Inform the standby that XLOG streaming was done */
732+ /* Inform the standby that XLOG streaming is done */
731733 pq_puttextmessage ('C' , "COPY 0" );
732734 pq_flush ();
733735
734736 proc_exit (0 );
735737 }
736738
739+ /* Check for input from the client */
740+ ProcessRepliesIfAny ();
741+
737742 /*
738743 * If we don't have any pending data in the output buffer, try to send
739- * some more.
744+ * some more. If there is some, we don't bother to call XLogSend
745+ * again until we've flushed it ... but we'd better assume we are not
746+ * caught up.
740747 */
741748 if (!pq_is_send_pending ())
742- {
743749 XLogSend (output_message , & caughtup );
750+ else
751+ caughtup = false;
744752
753+ /* Try to flush pending output to the client */
754+ if (pq_flush_if_writable () != 0 )
755+ break ;
756+
757+ /* If nothing remains to be sent right now ... */
758+ if (caughtup && !pq_is_send_pending ())
759+ {
745760 /*
746- * Even if we wrote all the WAL that was available when we started
747- * sending, more might have arrived while we were sending this
748- * batch. We had the latch set while sending, so we have not
749- * received any signals from that time. Let's arm the latch again,
750- * and after that check that we're still up-to-date.
761+ * If we're in catchup state, move to streaming. This is an
762+ * important state change for users to know about, since before
763+ * this point data loss might occur if the primary dies and we
764+ * need to failover to the standby. The state change is also
765+ * important for synchronous replication, since commits that
766+ * started to wait at that point might wait for some time.
751767 */
752- if (caughtup && ! pq_is_send_pending () )
768+ if (MyWalSnd -> state == WALSNDSTATE_CATCHUP )
753769 {
754- ResetLatch (& MyWalSnd -> latch );
770+ ereport (DEBUG1 ,
771+ (errmsg ("standby \"%s\" has now caught up with primary" ,
772+ application_name )));
773+ WalSndSetState (WALSNDSTATE_STREAMING );
774+ }
755775
776+ /*
777+ * When SIGUSR2 arrives, we send any outstanding logs up to the
778+ * shutdown checkpoint record (i.e., the latest record) and exit.
779+ * This may be a normal termination at shutdown, or a promotion,
780+ * the walsender is not sure which.
781+ */
782+ if (walsender_ready_to_stop )
783+ {
784+ /* ... let's just be real sure we're caught up ... */
756785 XLogSend (output_message , & caughtup );
786+ if (caughtup && !pq_is_send_pending ())
787+ {
788+ walsender_shutdown_requested = true;
789+ continue ; /* don't want to wait more */
790+ }
757791 }
758792 }
759793
760- /* Flush pending output to the client */
761- if (pq_flush_if_writable () != 0 )
762- break ;
763-
764794 /*
765- * When SIGUSR2 arrives, we send any outstanding logs up to the
766- * shutdown checkpoint record (i.e., the latest record) and exit.
767- * This may be a normal termination at shutdown, or a promotion,
768- * the walsender is not sure which.
795+ * We don't block if not caught up, unless there is unsent data
796+ * pending in which case we'd better block until the socket is
797+ * write-ready. This test is only needed for the case where XLogSend
798+ * loaded a subset of the available data but then pq_flush_if_writable
799+ * flushed it all --- we should immediately try to send more.
769800 */
770- if (walsender_ready_to_stop && !pq_is_send_pending ())
771- {
772- XLogSend (output_message , & caughtup );
773- ProcessRepliesIfAny ();
774- if (caughtup && !pq_is_send_pending ())
775- walsender_shutdown_requested = true;
776- }
777-
778- if ((caughtup || pq_is_send_pending ()) &&
779- !got_SIGHUP &&
780- !walsender_shutdown_requested )
801+ if (caughtup || pq_is_send_pending ())
781802 {
782803 TimestampTz finish_time = 0 ;
783- long sleeptime ;
804+ long sleeptime = -1 ;
784805 int wakeEvents ;
785806
786- /* Reschedule replication timeout */
807+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
808+ WL_SOCKET_READABLE ;
809+ if (pq_is_send_pending ())
810+ wakeEvents |= WL_SOCKET_WRITEABLE ;
811+
812+ /* Determine time until replication timeout */
787813 if (replication_timeout > 0 )
788814 {
789815 long secs ;
790816 int usecs ;
791817
792818 finish_time = TimestampTzPlusMilliseconds (last_reply_timestamp ,
793- replication_timeout );
819+ replication_timeout );
794820 TimestampDifference (GetCurrentTimestamp (),
795821 finish_time , & secs , & usecs );
796822 sleeptime = secs * 1000 + usecs / 1000 ;
797- if (WalSndDelay < sleeptime )
798- sleeptime = WalSndDelay ;
799- }
800- else
801- {
802- /*
803- * XXX: Without timeout, we don't really need the periodic
804- * wakeups anymore, WaitLatchOrSocket should reliably wake up
805- * as soon as something interesting happens.
806- */
807- sleeptime = WalSndDelay ;
823+ /* Avoid Assert in WaitLatchOrSocket if timeout is past */
824+ if (sleeptime < 0 )
825+ sleeptime = 0 ;
826+ wakeEvents |= WL_TIMEOUT ;
808827 }
809828
810- /* Sleep */
811- wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT ;
812- if (pq_is_send_pending ())
813- wakeEvents |= WL_SOCKET_WRITEABLE ;
829+ /* Sleep until something happens or replication timeout */
814830 WaitLatchOrSocket (& MyWalSnd -> latch , wakeEvents ,
815831 MyProcPort -> sock , sleeptime );
816832
817- /* Check for replication timeout */
833+ /*
834+ * Check for replication timeout. Note we ignore the corner case
835+ * possibility that the client replied just as we reached the
836+ * timeout ... he's supposed to reply *before* that.
837+ */
818838 if (replication_timeout > 0 &&
819839 GetCurrentTimestamp () >= finish_time )
820840 {
@@ -828,24 +848,6 @@ WalSndLoop(void)
828848 break ;
829849 }
830850 }
831-
832- /*
833- * If we're in catchup state, see if its time to move to streaming.
834- * This is an important state change for users, since before this
835- * point data loss might occur if the primary dies and we need to
836- * failover to the standby. The state change is also important for
837- * synchronous replication, since commits that started to wait at that
838- * point might wait for some time.
839- */
840- if (MyWalSnd -> state == WALSNDSTATE_CATCHUP && caughtup )
841- {
842- ereport (DEBUG1 ,
843- (errmsg ("standby \"%s\" has now caught up with primary" ,
844- application_name )));
845- WalSndSetState (WALSNDSTATE_STREAMING );
846- }
847-
848- ProcessRepliesIfAny ();
849851 }
850852
851853 /*
0 commit comments