8000 Preserve required !catalog tuples while computing initial decoding sn… · thatguystone/postgres@47f896b · GitHub
[go: up one dir, main page]

Skip to content

Commit 47f896b

Browse files
committed
Preserve required !catalog tuples while computing initial decoding snapshot.
The logical decoding machinery already preserved all the required catalog tuples, which is sufficient in the course of normal logical decoding, but did not guarantee that non-catalog tuples were preserved during computation of the initial snapshot when creating a slot over the replication protocol. This could cause a corrupted initial snapshot being exported. The time window for issues is usually not terribly large, but on a busy server it's perfectly possible to it hit it. Ongoing decoding is not affected by this bug. To avoid increased overhead for the SQL API, only retain additional tuples when a logical slot is being created over the replication protocol. To do so this commit changes the signature of CreateInitDecodingContext(), but it seems unlikely that it's being used in an extension, so that's probably ok. In a drive-by fix, fix handling of ReplicationSlotsComputeRequiredXmin's already_locked argument, which should only apply to ProcArrayLock, not ReplicationSlotControlLock. Reported-By: Erik Rijkers Analyzed-By: Petr Jelinek Author: Petr Jelinek, heavily editorialized by Andres Freund Reviewed-By: Andres Freund Discussion: https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6a95@2ndquadrant.com Backport: 9.4, where logical decoding was introduced.
1 parent dba1f31 commit 47f896b

File tree

8 files changed

+66
-18
lines changed

8 files changed

+66
-18
lines changed

src/backend/replication/logical/logical.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ StartupDecodingContext(List *output_plugin_options,
208208
LogicalDecodingContext *
209209
CreateInitDecodingContext(char *plugin,
210210
List *output_plugin_options,
211+
bool need_full_snapshot,
211212
XLogPageReadCB read_page,
212213
LogicalOutputPluginWriterPrepareWrite prepare_write,
213214
LogicalOutputPluginWriterWrite do_write)
@@ -310,23 +311,31 @@ CreateInitDecodingContext(char *plugin,
310311
* the slot machinery about the new limit. Once that's done the
311312
* ProcArrayLock can be released as the slot machinery now is
312313
* protecting against vacuum.
314+
*
315+
* Note that, temporarily, the data, not just the catalog, xmin has to be
316+
* reserved if a data snapshot is to be exported. Otherwise the initial
317+
* data snapshot created here is not guaranteed to be valid. After that
318+
* the data xmin doesn't need to be managed anymore and the global xmin
319+
* should be recomputed. As we are fine with losing the pegged data xmin
320+
* after crash - no chance a snapshot would get exported anymore - we can
321+
* get away with just setting the slot's
322+
* effective_xmin. ReplicationSlotRelease will reset it again.
323+
*
313324
* ----
314325
*/
315326
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
316327

317-
slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
318-
slot->data.catalog_xmin = slot->effective_catalog_xmin;
328+
xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot);
329+
330+
slot->effective_catalog_xmin = xmin_horizon;
331+
slot->data.catalog_xmin = xmin_horizon;
332+
if (need_full_snapshot)
333+
slot->effective_xmin = xmin_horizon;
319334

320335
ReplicationSlotsComputeRequiredXmin(true);
321336

322337
LWLockRelease(ProcArrayLock);
323338

324-
/*
325-
* tell the snapshot builder to only assemble snapshot once reaching the a
326-
* running_xact's record with the respective xmin.
327-
*/
328-
xmin_horizon = slot->data.catalog_xmin;
329-
330339
ReplicationSlotMarkDirty();
331340
ReplicationSlotSave();
332341

src/backend/replication/logical/snapbuild.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,18 @@ SnapBuildExportSnapshot(SnapBuild *builder)
553553
* mechanism. Due to that we can do this without locks, we're only
554554
* changing our own value.
555555
*/
556+
#ifdef USE_ASSERT_CHECKING
557+
{
558+
TransactionId safeXid;
559+
560+
LWLockAcquire(ProcArrayLock, LW_SHARED);
561+
safeXid = GetOldestSafeDecodingTransactionId(true);
562+
LWLockRelease(ProcArrayLock);
563+
564+
Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
565+
}
566+
#endif
567+
556568
MyPgXact->xmin = snap->xmin;
557569

558570
/* allocate in transaction context */

src/backend/replication/slot.c

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,22 @@ ReplicationSlotRelease(void)
396396
SpinLockRelease(&slot->mutex);
397397
}
398398

399+
400+
/*
401+
* If slot needed to temporarily restrain both data and catalog xmin to
402+
* create the catalog snapshot, remove that temporary constraint.
403+
* Snapshots can only be exported while the initial snapshot is still
404+
* acquired.
405+
*/
406+
if (!TransactionIdIsValid(slot->data.xmin) &&
407+
TransactionIdIsValid(slot->effective_xmin))
408+
{
409+
SpinLockAcquire(&slot->mutex);
410+
slot->effective_xmin = InvalidTransactionId;
411+
SpinLockRelease(&slot->mutex);
412+
ReplicationSlotsComputeRequiredXmin(false);
413+
}
414+
399415
MyReplicationSlot = NULL;
400416

401417
/* might not have been set when we've been a plain slot */
@@ -580,6 +596,9 @@ ReplicationSlotPersist(void)
580596

581597
/*
582598
* Compute the oldest xmin across all slots and store it in the ProcArray.
599+
*
600+
* If already_locked is true, ProcArrayLock has already been acquired
601+
* exclusively.
583602
*/
584603
void
585604
ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -590,8 +609,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
590609

591610
Assert(ReplicationSlotCtl != NULL);
592611

593-
if (!already_locked)
594-
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
612+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
595613

596614
for (i = 0; i < max_replication_slots; i++)
597615
{
@@ -624,8 +642,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
624642
agg_catalog_xmin = effective_catalog_xmin;
625643
}
626644

627-
if< F438 /span> (!already_locked)
628-
LWLockRelease(ReplicationSlotControlLock);
645+
LWLockRelease(ReplicationSlotControlLock);
629646

630647
ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
631648
}

src/backend/replication/slotfuncs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
109109
/*
110110
* Create logical decoding context, to build the initial snapshot.
111111
*/
112-
ctx = CreateInitDecodingContext(
113-
NameStr(*plugin), NIL,
112+
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
113+
false, /* do not build snapshot */
114114
logical_read_local_xlog_page, NULL, NULL);
115115

116116
/* build initial snapshot, might take a while */

src/backend/replication/walsender.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
812812
LogicalDecodingContext *ctx;
813813

814814
ctx = CreateInitDecodingContext(cmd->plugin, NIL,
815+
true, /* build snapshot */
815816
logical_read_xlog_page,
816817
WalSndPrepareWrite, WalSndWriteData);
817818

src/backend/storage/ipc/procarray.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1978,7 +1978,7 @@ GetOldestActiveTransactionId(void)
19781978
* that the caller will immediately use the xid to peg the xmin horizon.
19791979
*/
19801980
TransactionId
1981-
GetOldestSafeDecodingTransactionId(void)
1981+
GetOldestSafeDecodingTransactionId(bool catalogOnly)
19821982
{
19831983
ProcArrayStruct *arrayP = procArray;
19841984
TransactionId oldestSafeXid;
@@ -2001,9 +2001,17 @@ GetOldestSafeDecodingTransactionId(void)
20012001
/*
20022002
* If there's already a slot pegging the xmin horizon, we can start with
20032003
* that value, it's guaranteed to be safe since it's computed by this
2004-
* routine initially and has been enforced since.
2004+
* routine initially and has been enforced since. We can always use the
2005+
* slot's general xmin horizon, but the catalog horizon is only usable
2006+
* when we only catalog data is going to be looked at.
20052007
*/
2006-
if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2008+
if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2009+
TransactionIdPrecedes(procArray->replication_slot_xmin,
2010+
oldestSafeXid))
2011+
oldestSafeXid = procArray->replication_slot_xmin;
2012+
2013+
if (catalogOnly &&
2014+
TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
20072015
TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
20082016
oldestSafeXid))
20092017
oldestSafeXid = procArray->replication_slot_catalog_xmin;

src/include/replication/logical.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ extern void CheckLogicalDecodingRequirements(void);
7979

8080
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
8181
List *output_plugin_options,
82+
bool need_full_snapshot,
8283
XLogPageReadCB read_page,
8384
LogicalOutputPluginWriterPrepareWrite prepare_write,
8485
LogicalOutputPluginWriterWrite do_write);

src/include/storage/procarray.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
5454
extern bool TransactionIdIsActive(TransactionId xid);
5555
extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
5656
extern TransactionId GetOldestActiveTransactionId(void);
57-
extern TransactionId GetOldestSafeDecodingTransactionId(void);
57+
extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
5858

5959
extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
6060
extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);

0 commit comments

Comments
 (0)
0