@@ -920,7 +920,8 @@ static void LocalSetXLogInsertAllowed(void);
920
920
static void CreateEndOfRecoveryRecord (void );
921
921
static XLogRecPtr CreateOverwriteContrecordRecord (XLogRecPtr aborted_lsn );
922
922
static void CheckPointGuts (XLogRecPtr checkPointRedo , int flags );
923
- static void KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo );
923
+ static void KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinLSN ,
924
+ XLogSegNo * logSegNo );
924
925
static XLogRecPtr XLogGetReplicationSlotMinimumLSN (void );
925
926
926
927
static void AdvanceXLInsertBuffer (XLogRecPtr upto , bool opportunistic );
@@ -8921,6 +8922,7 @@ CreateCheckPoint(int flags)
8921
8922
XLogRecPtr last_important_lsn ;
8922
8923
VirtualTransactionId * vxids ;
8923
8924
int nvxids ;
8925
+ XLogRecPtr slotsMinReqLSN ;
8924
8926
8925
8927
/*
8926
8928
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -9140,6 +9142,15 @@ CreateCheckPoint(int flags)
9140
9142
*/
9141
9143
END_CRIT_SECTION ();
9142
9144
9145
+ /*
9146
+ * Get the current minimum LSN to be used later in the WAL segment
9147
+ * cleanup. We may clean up only WAL segments, which are not needed
9148
+ * according to synchronized LSNs of replication slots. The slot's LSN
9149
+ * might be advanced concurrently, so we call this before
9150
+ * CheckPointReplicationSlots() synchronizes replication slots.
9151
+ */
9152
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9153
+
9143
9154
/*
9144
9155
* In some cases there are groups of actions that must all occur on one
9145
9156
* side or the other of a checkpoint record. Before flushing the
@@ -9304,15 +9315,23 @@ CreateCheckPoint(int flags)
9304
9315
* prevent the disk holding the xlog from growing full.
9305
9316
*/
9306
9317
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
9307
- KeepLogSeg (recptr , & _logSegNo );
9318
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
9308
9319
if (InvalidateObsoleteReplicationSlots (_logSegNo ))
9309
9320
{
9321
+ /*
9322
+ * Recalculate the current minimum LSN to be used in the WAL segment
9323
+ * cleanup. Then, we must synchronize the replication slots again in
9324
+ * order to make this LSN safe to use.
9325
+ */
9326
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9327
+ CheckPointReplicationSlots ();
9328
+
9310
9329
/*
9311
9330
* Some slots have been invalidated; recalculate the old-segment
9312
9331
* horizon, starting again from RedoRecPtr.
9313
9332
*/
9314
9333
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
9315
- KeepLogSeg (recptr , & _logSegNo );
9334
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
9316
9335
}
9317
9336
_logSegNo -- ;
9318
9337
RemoveOldXlogFiles (_logSegNo , RedoRecPtr , recptr );
@@ -9534,6 +9553,7 @@ CreateRestartPoint(int flags)
9534
9553
XLogRecPtr endptr ;
9535
9554
XLogSegNo _logSegNo ;
9536
9555
TimestampTz xtime ;
9556
+ XLogRecPtr slotsMinReqLSN ;
9537
9557
9538
9558
/*
9539
9559
* Acquire CheckpointLock to ensure only one restartpoint or checkpoint
@@ -9623,6 +9643,15 @@ CreateRestartPoint(int flags)
9623
9643
MemSet (& CheckpointStats , 0 , sizeof (CheckpointStats ));
9624
9644
CheckpointStats .ckpt_start_t = GetCurrentTimestamp ();
9625
9645
9646
+ /*
9647
+ * Get the current minimum LSN to be used later in the WAL segment
9648
+ * cleanup. We may clean up only WAL segments, which are not needed
9649
+ * according to synchronized LSNs of replication slots. The slot's LSN
9650
+ * might be advanced concurrently, so we call this before
9651
+ * CheckPointReplicationSlots() synchronizes replication slots.
9652
+ */
9653
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9654
+
9626
9655
if (log_checkpoints )
9627
9656
LogCheckpointStart (flags , true);
9628
9657
@@ -9708,15 +9737,23 @@ CreateRestartPoint(int flags)
9708
9737
receivePtr = GetWalRcvFlushRecPtr (NULL , NULL );
9709
9738
replayPtr = GetXLogReplayRecPtr (& replayTLI );
9710
9739
endptr = (receivePtr < replayPtr ) ? replayPtr : receivePtr ;
9711
- KeepLogSeg (endptr , & _logSegNo );
9740
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
9712
9741
if (InvalidateObsoleteReplicationSlots (_logSegNo ))
9713
9742
{
9743
+ /*
9744
+ * Recalculate the current minimum LSN to be used in the WAL segment
9745
+ * cleanup. Then, we must synchronize the replication slots again in
9746
+ * order to make this LSN safe to use.
9747
+ */
9748
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9749
+ CheckPointReplicationSlots ();
9750
+
9714
9751
/*
9715
9752
* Some slots have been invalidated; recalculate the old-segment
9716
9753
* horizon, starting again from RedoRecPtr.
9717
9754
*/
9718
9755
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
9719
- KeepLogSeg (endptr , & _logSegNo );
9756
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
9720
9757
}
9721
9758
_logSegNo -- ;
9722
9759
@@ -9818,6 +9855,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
9818
9855
XLogSegNo oldestSegMaxWalSize ; /* oldest segid kept by max_wal_size */
9819
9856
XLogSegNo oldestSlotSeg ; /* oldest segid kept by slot */
9820
9857
uint64 keepSegs ;
9858
+ XLogRecPtr slotsMinReqLSN ;
9821
9859
9822
9860
/*
9823
9861
* slot does not reserve WAL. Either deactivated, or has never been active
@@ -9831,8 +9869,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
9831
9869
* oldestSlotSeg to the current segment.
9832
9870
*/
9833
9871
currpos = GetXLogWriteRecPtr ();
9872
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9834
9873
XLByteToSeg (currpos , oldestSlotSeg , wal_segment_size );
9835
- KeepLogSeg (currpos , & oldestSlotSeg );
9874
+ KeepLogSeg (currpos , slotsMinReqLSN , & oldestSlotSeg );
9836
9875
9837
9876
/*
9838
9877
* Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -9893,7 +9932,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
9893
9932
* invalidation is optionally done here, instead.
9894
9933
*/
9895
9934
static void
9896
- KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo )
9935
+ KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinReqLSN , XLogSegNo * logSegNo )
9897
9936
{
9898
9937
XLogSegNo currSegNo ;
9899
9938
XLogSegNo segno ;
@@ -9906,7 +9945,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
9906
9945
* Calculate how many segments are kept by slots first, adjusting for
9907
9946
* max_slot_wal_keep_size.
9908
9947
*/
9909
- keep = XLogGetReplicationSlotMinimumLSN () ;
9948
+ keep = slotsMinReqLSN ;
9910
9949
if (keep != InvalidXLogRecPtr && keep < recptr )
9911
9950
{
9912
9951
XLByteToSeg (keep , segno , wal_segment_size );
0 commit comments