8000 Refrain from duplicating data in reorderbuffers · postgres/postgres@8e5c2af · GitHub
[go: up one dir, main page]

Skip to content

Commit 8e5c2af

Browse files
committed
Refrain from duplicating data in reorderbuffers
If a walsender exits leaving data in reorderbuffers, the next walsender that tries to decode the same transaction would append its decoded data in the same spill files without truncating it first, which effectively duplicate the data. Avoid that by removing any leftover reorderbuffer spill files when a walsender starts. Backpatch to 9.4; this bug has been there from the very beginning of logical decoding. Author: Craig Ringer, revised by me Reviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada
1 parent 7aba4f2 commit 8e5c2af

File tree

1 file changed

+82
-55
lines changed

1 file changed

+82
-55
lines changed

src/backend/replication/logical/reorderbuffer.c

Lines changed: 82 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
202202
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
203203
char *change);
204204
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
205+
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
206+
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
207+
TransactionId xid, XLogSegNo segno);
205208

206209
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
207210
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -220,7 +223,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
220223

221224

222225
/*
223-
* Allocate a new ReorderBuffer
226+
* Allocate a new ReorderBuffer and clean out any old serialized state from
227+
* prior ReorderBuffer instances for the same slot.
224228
*/
225229
ReorderBuffer *
226230
ReorderBufferAllocate(void)
@@ -229,6 +233,8 @@ ReorderBufferAllocate(void)
229233
HASHCTL hash_ctl;
230234
MemoryContext new_ctx;
231235

236+
Assert(MyReplicationSlot != NULL);
237+
232238
/* allocate memory in own context, to have better accountability */
233239
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
234240
"ReorderBuffer",
@@ -265,6 +271,13 @@ ReorderBufferAllocate(void)
265271
dlist_init(&buffer->cached_changes);
266272
slist_init(&buffer->cached_tuplebufs);
267273

274+
/*
275+
* Ensure there's no stale data from prior uses of this slot, in case some
276+
* prior exit avoided calling ReorderBufferFree. Failure to do this can
277+
* produce duplicated txns, and it's very cheap if there's nothing there.
278+
*/
279+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
280+
268281
return buffer;
269282
}
270283

@@ -281,6 +294,9 @@ ReorderBufferFree(ReorderBuffer *rb)
281294
* memory context.
282295
*/
283296
MemoryContextDelete(context);
297+
298+
/* Free disk space used by unconsumed reorder buffers */
299+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
284300
}
285301

286302
/*
@@ -2117,7 +2133,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21172133
int fd = -1;
21182134
XLogSegNo curOpenSegNo = 0;
21192135
Size spilled = 0;
2120-
char path[MAXPGPATH];
21212136

21222137
elog(DEBUG2, "spill %u changes in XID %u to disk",
21232138
(uint32) txn->nentries_mem, txn->xid);
@@ -2144,21 +2159,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21442159
*/
21452160
if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
21462161
{
2147-
XLogRecPtr recptr;
2162+
char path[MAXPGPATH];
21482163

21492164
if (fd != -1)
21502165
CloseTransientFile(fd);
21512166

21522167
XLByteToSeg(change->lsn, curOpenSegNo);
2153-
XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
21542168

21552169
/*
21562170
* No need to care about TLIs here, only used during a single run,
21572171
* so each LSN only maps to a specific WAL record.
21582172
*/
2159-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2160-
NameStr(MyReplicationSlot->data.name), txn->xid,
2161-
(uint32) (recptr >> 32), (uint32) recptr);
2173+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2174+
curOpenSegNo);
21622175

21632176
/* open segment, create it if necessary */
21642177
fd = OpenTransientFile(path,
@@ -2168,8 +2181,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21682181
if (fd < 0)
21692182
ereport(ERROR,
21702183
(errcode_for_file_access(),
2171-
errmsg("could not open file \"%s\": %m",
2172-
path)));
2184+
errmsg("could not open file \"%s\": %m", path)));
21732185
}
21742186

21752187
ReorderBufferSerializeChange(rb, txn, fd, change);
@@ -2385,25 +2397,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23852397

23862398
if (*fd == -1)
23872399
{
2388-
XLogRecPtr recptr;
23892400
char path[MAXPGPATH];
23902401

23912402
/* first time in */
23922403
if (*segno == 0)
2393-
{
23942404
XLByteToSeg(txn->first_lsn, *segno);
2395-
}
23962405

23972406
Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2398-
XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
23992407

24002408
/*
24012409
* No need to care about TLIs here, only used during a single run,
24022410
* so each LSN only maps to a specific WAL record.
24032411
*/
2404-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2405-
NameStr(MyReplicationSlot->data.name), txn->xid,
2406-
(uint32) (recptr >> 32), (uint32) recptr);
2412+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2413+
*segno);
24072414

24082415
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
24092416
if (*fd < 0 && errno == ENOENT)
@@ -2635,20 +2642,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
26352642
for (cur = first; cur <= last; cur++)
26362643
{
26372644
char path[MAXPGPATH];
2638-
XLogRecPtr recptr;
2639-
2640-
XLogSegNoOffsetToRecPtr(cur, 0, recptr);
26412645

2642-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2643-
NameStr(MyReplicationSlot->data.name), txn->xid,
2644-
(uint32) (recptr >> 32), (uint32) recptr);
2646+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
26452647
if (unlink(path) != 0 && errno != ENOENT)
26462648
ereport(ERROR,
26472649
(errcode_for_file_access(),
26482650
errmsg("could not remove file \"%s\": %m", path)));
26492651
}
26502652
}
26512653

2654+
/*
2655+
* Remove any leftover serialized reorder buffers from a slot directory after a
2656+
* prior crash or decoding session exit.
2657+
*/
2658+
static void
2659+
ReorderBufferCleanupSerializedTXNs(const char *slotname)
2660+
{
2661+
DIR *spill_dir;
2662+
struct dirent *spill_de;
2663+
struct stat statbuf;
2664+
char path[MAXPGPATH * 2 + 12];
2665+
2666+
10000 sprintf(path, "pg_replslot/%s", slotname);
2667+
2668+
/* we're only handling directories here, skip if it's not ours */
2669+
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2670+
return;
2671+
2672+
spill_dir = AllocateDir(path);
2673+
while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2674+
{
2675+
/* only look at names that can be ours */
2676+
if (strncmp(spill_de->d_name, "xid", 3) == 0)
2677+
{
2678+
snprintf(path, sizeof(path),
2679+
"pg_replslot/%s/%s", slotname,
2680+
spill_de->d_name);
2681+
2682+
if (unlink(path) != 0)
2683+
ereport(ERROR,
2684+
(errcode_for_file_access(),
2685+
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2686+
path, slotname)));
2687+
}
2688+
}
2689+
FreeDir(spill_dir);
2690+
}
2691+
2692+
/*
2693+
* Given a replication slot, transaction ID and segment number, fill in the
2694+
* corresponding spill file into 'path', which is a caller-owned buffer of size
2695+
* at least MAXPGPATH.
2696+
*/
2697+
static void
2698+
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2699+
XLogSegNo segno)
2700+
{
2701+
XLogRecPtr recptr;
2702+
2703+
XLogSegNoOffsetToRecPtr(segno, 0, recptr);
2704+
2705+
snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2706+
NameStr(MyReplicationSlot->data.name),
2707+
xid,
2708+
(uint32) (recptr >> 32), (uint32) recptr);
2709+
}
2710+
26522711
/*
26532712
* Delete all data spilled to disk after we've restarted/crashed. It will be
26542713
* recreated when the respective slots are reused.
@@ -2659,15 +2718,9 @@ StartupReorderBuffer(void)
26592718
DIR *logical_dir;
26602719
struct dirent *logical_de;
26612720

2662-
DIR *spill_dir;
2663-
struct dirent *spill_de;
2664-
26652721
logical_dir = AllocateDir("pg_replslot");
26662722
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
26672723
{
2668-
struct stat statbuf;
2669-
char path[MAXPGPATH * 2 + 12];
2670-
26712724
if (strcmp(logical_de->d_name, ".") == 0 ||
26722725
strcmp(logical_de->d_name, "..") == 0)
26732726
continue;
@@ -2680,33 +2733,7 @@ StartupReorderBuffer(void)
26802733
* ok, has to be a surviving logical slot, iterate and delete
26812734
* everything starting with xid-*
26822735
*/
2683-
sprintf(path, "pg_replslot/%s", logical_de->d_name);
2684-
2685-
/* we're only creating directories here, skip if it's not our's */
2686-
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2687-
continue;
2688-
2689-
spill_dir = AllocateDir(path);
2690-
while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2691-
{
2692-
if (strcmp(spill_de->d_name, ".") == 0 ||
2693-
strcmp(spill_de->d_name, "..") == 0)
2694-
continue;
2695-
2696-
/* only look at names that can be ours */
2697-
if (strncmp(spill_de->d_name, "xid", 3) == 0)
2698-
{
2699-
sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2700-
spill_de->d_name);
2701-
2702-
if (unlink(path) != 0)
2703-
ereport(PANIC,
2704-
(errcode_for_file_access(),
2705-
errmsg("could not remove file \"%s\": %m",
2706-
path)));
2707-
}
2708-
}
2709-
FreeDir(spill_dir);
2736+
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
27102737
}
27112738
FreeDir(logical_dir);
27122739
}

0 commit comments

Comments
 (0)
0