@@ -202,6 +202,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
202
202
static void ReorderBufferRestoreChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
203
203
char * change );
204
204
static void ReorderBufferRestoreCleanup (ReorderBuffer * rb , ReorderBufferTXN * txn );
205
+ static void ReorderBufferCleanupSerializedTXNs (const char * slotname );
206
+ static void ReorderBufferSerializedPath (char * path , ReplicationSlot * slot ,
207
+ TransactionId xid , XLogSegNo segno );
205
208
206
209
static void ReorderBufferFreeSnap (ReorderBuffer * rb , Snapshot snap );
207
210
static Snapshot ReorderBufferCopySnap (ReorderBuffer * rb , Snapshot orig_snap ,
@@ -220,7 +223,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
220
223
221
224
222
225
/*
223
- * Allocate a new ReorderBuffer
226
+ * Allocate a new ReorderBuffer and clean out any old serialized state from
227
+ * prior ReorderBuffer instances for the same slot.
224
228
*/
225
229
ReorderBuffer *
226
230
ReorderBufferAllocate (void )
@@ -229,6 +233,8 @@ ReorderBufferAllocate(void)
229
233
HASHCTL hash_ctl ;
230
234
MemoryContext new_ctx ;
231
235
236
+ Assert (MyReplicationSlot != NULL );
237
+
232
238
/* allocate memory in own context, to have better accountability */
233
239
new_ctx = AllocSetContextCreate (CurrentMemoryContext ,
234
240
"ReorderBuffer" ,
@@ -265,6 +271,13 @@ ReorderBufferAllocate(void)
265
271
dlist_init (& buffer -> cached_changes );
266
272
slist_init (& buffer -> cached_tuplebufs );
267
273
274
+ /*
275
+ * Ensure there's no stale data from prior uses of this slot, in case some
276
+ * prior exit avoided calling ReorderBufferFree. Failure to do this can
277
+ * produce duplicated txns, and it's very cheap if there's nothing there.
278
+ */
279
+ ReorderBufferCleanupSerializedTXNs (NameStr (MyReplicationSlot -> data .name ));
280
+
268
281
return buffer ;
269
282
}
270
283
@@ -281,6 +294,9 @@ ReorderBufferFree(ReorderBuffer *rb)
281
294
* memory context.
282
295
*/
283
296
MemoryContextDelete (context );
297
+
298
+ /* Free disk space used by unconsumed reorder buffers */
299
+ ReorderBufferCleanupSerializedTXNs (NameStr (MyReplicationSlot -> data .name ));
284
300
}
285
301
286
302
/*
@@ -2117,7 +2133,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2117
2133
int fd = -1 ;
2118
2134
XLogSegNo curOpenSegNo = 0 ;
2119
2135
Size spilled = 0 ;
2120
- char path [MAXPGPATH ];
2121
2136
2122
2137
elog (DEBUG2 , "spill %u changes in XID %u to disk" ,
2123
2138
(uint32 ) txn -> nentries_mem , txn -> xid );
@@ -2144,21 +2159,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2144
2159
*/
2145
2160
if (fd == -1 || !XLByteInSeg (change -> lsn , curOpenSegNo ))
2146
2161
{
2147
- XLogRecPtr recptr ;
2162
+ char path [ MAXPGPATH ] ;
2148
2163
2149
2164
if (fd != -1 )
2150
2165
CloseTransientFile (fd );
2151
2166
2152
2167
XLByteToSeg (change -> lsn , curOpenSegNo );
2153
- XLogSegNoOffsetToRecPtr (curOpenSegNo , 0 , recptr );
2154
2168
2155
2169
/*
2156
2170
* No need to care about TLIs here, only used during a single run,
2157
2171
* so each LSN only maps to a specific WAL record.
2158
2172
*/
2159
- sprintf (path , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2160
- NameStr (MyReplicationSlot -> data .name ), txn -> xid ,
2161
- (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2173
+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
2174
+ curOpenSegNo );
2162
2175
2163
2176
/* open segment, create it if necessary */
2164
2177
fd = OpenTransientFile (path ,
@@ -2168,8 +2181,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2168
2181
if (fd < 0 )
2169
2182
ereport (ERROR ,
2170
2183
(errcode_for_file_access (),
2171
- errmsg ("could not open file \"%s\": %m" ,
2172
- path )));
2184
+ errmsg ("could not open file \"%s\": %m" , path )));
2173
2185
}
2174
2186
2175
2187
ReorderBufferSerializeChange (rb , txn , fd , change );
@@ -2385,25 +2397,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2385
2397
2386
2398
if (* fd == -1 )
2387
2399
{
2388
- XLogRecPtr recptr ;
2389
2400
char path [MAXPGPATH ];
2390
2401
2391
2402
/* first time in */
2392
2403
if (* segno == 0 )
2393
- {
2394
2404
XLByteToSeg (txn -> first_lsn , * segno );
2395
- }
2396
2405
2397
2406
Assert (* segno != 0 || dlist_is_empty (& txn -> changes ));
2398
- XLogSegNoOffsetToRecPtr (* segno , 0 , recptr );
2399
2407
2400
2408
/*
2401
2409
* No need to care about TLIs here, only used during a single run,
2402
2410
* so each LSN only maps to a specific WAL record.
2403
2411
*/
2404
- sprintf (path , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2405
- NameStr (MyReplicationSlot -> data .name ), txn -> xid ,
2406
- (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2412
+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
2413
+ * segno );
2407
2414
2408
2415
* fd = OpenTransientFile (path , O_RDONLY | PG_BINARY , 0 );
2409
2416
if (* fd < 0 && errno == ENOENT )
@@ -2635,20 +2642,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2635
2642
for (cur = first ; cur <= last ; cur ++ )
2636
2643
{
2637
2644
char path [MAXPGPATH ];
2638
- XLogRecPtr recptr ;
2639
-
2640
- XLogSegNoOffsetToRecPtr (cur , 0 , recptr );
2641
2645
2642
- sprintf (path , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2643
- NameStr (MyReplicationSlot -> data .name ), txn -> xid ,
2644
- (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2646
+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid , cur );
2645
2647
if (unlink (path ) != 0 && errno != ENOENT )
2646
2648
ereport (ERROR ,
2647
2649
(errcode_for_file_access (),
2648
2650
errmsg ("could not remove file \"%s\": %m" , path )));
2649
2651
}
2650
2652
}
2651
2653
2654
+ /*
2655
+ * Remove any leftover serialized reorder buffers from a slot directory after a
2656
+ * prior crash or decoding session exit.
2657
+ */
2658
+ static void
2659
+ ReorderBufferCleanupSerializedTXNs (const char * slotname )
2660
+ {
2661
+ DIR * spill_dir ;
2662
+ struct dirent * spill_de ;
2663
+ struct stat statbuf ;
2664
+ char path [MAXPGPATH * 2 + 12 ];
2665
+
2666
+ sprintf (path , "pg_replslot/%s" , slotname );
2667
+
2668
+ /* we're only handling directories here, skip if it's not ours */
2669
+ if (lstat (path , & statbuf ) == 0 && !S_ISDIR (statbuf .st_mode ))
2670
+ return ;
2671
+
2672
+ spill_dir = AllocateDir (path );
2673
+ while ((spill_de = ReadDirExtended (spill_dir , path , INFO )) != NULL )
2674
+ {
2675
+ /* only look at names that can be ours */
2676
+ if (strncmp (spill_de -> d_name , "xid" , 3 ) == 0 )
2677
+ {
2678
+ snprintf (path , sizeof (path ),
2679
+ "pg_replslot/%s/%s" , slotname ,
2680
+ spill_de -> d_name );
2681
+
2682
+ if (unlink (path ) != 0 )
2683
+ ereport (ERROR ,
2684
+ (errcode_for_file_access (),
2685
+ errmsg ("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m" ,
2686
+ path , slotname )));
2687
+ }
2688
+ }
2689
+ FreeDir (spill_dir );
2690
+ }
2691
+
2692
+ /*
2693
+ * Given a replication slot, transaction ID and segment number, fill in the
2694
+ * corresponding spill file into 'path', which is a caller-owned buffer of size
2695
+ * at least MAXPGPATH.
2696
+ */
2697
+ static void
2698
+ ReorderBufferSerializedPath (char * path , ReplicationSlot * slot , TransactionId xid ,
2699
+ XLogSegNo segno )
2700
+ {
2701
+ XLogRecPtr recptr ;
2702
+
2703
+ XLogSegNoOffsetToRecPtr (segno , 0 , recptr );
2704
+
2705
+ snprintf (path , MAXPGPATH , "pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2706
+ NameStr (MyReplicationSlot -> data .name ),
2707
+ xid ,
2708
+ (uint32 ) (recptr >> 32 ), (uint32 ) recptr );
2709
+ }
2710
+
2652
2711
/*
2653
2712
* Delete all data spilled to disk after we've restarted/crashed. It will be
2654
2713
* recreated when the respective slots are reused.
@@ -2659,15 +2718,9 @@ StartupReorderBuffer(void)
2659
2718
DIR * logical_dir ;
2660
2719
struct dirent * logical_de ;
2661
2720
2662
- DIR * spill_dir ;
2663
- struct dirent * spill_de ;
2664
-
2665
2721
logical_dir = AllocateDir ("pg_replslot" );
2666
2722
while ((logical_de = ReadDir (logical_dir , "pg_replslot" )) != NULL )
2667
2723
{
2668
- struct stat statbuf ;
2669
- char path [MAXPGPATH * 2 + 12 ];
2670
-
2671
2724
if (strcmp (logical_de -> d_name , "." ) == 0 ||
2672
2725
strcmp (logical_de -> d_name , ".." ) == 0 )
2673
2726
continue ;
@@ -2680,33 +2733,7 @@ StartupReorderBuffer(void)
2680
2733
* ok, has to be a surviving logical slot, iterate and delete
2681
2734
* everything starting with xid-*
2682
2735
*/
2683
- sprintf (path , "pg_replslot/%s" , logical_de -> d_name );
2684
-
2685
- /* we're only creating directories here, skip if it's not our's */
2686
- if (lstat (path , & statbuf ) == 0 && !S_ISDIR (statbuf .st_mode ))
2687
- continue ;
2688
-
2689
- spill_dir = AllocateDir (path );
2690
- while ((spill_de = ReadDir (spill_dir , path )) != NULL )
2691
- {
2692
- if (strcmp (spill_de -> d_name , "." ) == 0 ||
2693
- strcmp (spill_de -> d_name , ".." ) == 0 )
2694
- continue ;
2695
-
2696
- /* only look at names that can be ours */
2697
- if (strncmp (spill_de -> d_name , "xid" , 3 ) == 0 )
2698
- {
2699
- sprintf (path , "pg_replslot/%s/%s" , logical_de -> d_name ,
2700
- spill_de -> d_name );
2701
-
2702
- if (unlink (path ) != 0 )
2703
- ereport (PANIC ,
2704
- (errcode_for_file_access (),
2705
- errmsg ("could not remove file \"%s\": %m" ,
2706
- path )));
2707
- }
2708
- }
2709
- FreeDir (spill_dir );
2736
+ ReorderBufferCleanupSerializedTXNs (logical_de -> d_name );
2710
2737
}
2711
2738
FreeDir (logical_dir );
2712
2739
}
0 commit comments