8000 Gracefully handle concurrent aborts of uncommitted transactions that … · postgrespro/postgres_cluster@7dce5ca · GitHub
[go: up one dir, main page]

Skip to content

Commit 7dce5ca

Browse files
Nikhil Sontakkearssher
Nikhil Sontakke
authored andcommitted
Gracefully handle concurrent aborts of uncommitted transactions that are being decoded alongside.
When a transaction aborts, it's changes are considered unnecessary for other transactions. That means the changes may be either cleaned up by vacuum or removed from HOT chains (thus made inaccessible through indexes), and there may be other such consequences. When decoding committed transactions this is not an issue, and we never decode transactions that abort before the decoding starts. But for in-progress transactions - for example when decoding prepared transactions on PREPARE (and not COMMIT PREPARED as before), this may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend decoding a specific uncommitted transaction. The decoding logic on the receipt of such an sqlerrcode aborts the ongoing decoding and returns gracefully.
1 parent 6334bec commit 7dce5ca

File tree

6 files changed

+143
-9
lines changed

6 files changed

+143
-9
lines changed

doc/src/sgml/logicaldecoding.sgml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
421421
ALTER TABLE user_catalog_table SET (user_catalog_table = true);
422422
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
423423
</programlisting>
424-
Any actions leading to transaction ID assignment are prohibited. That, among others,
424+
Note that access to user catalog tables or regular system catalog tables
425+
in the output plugins has to be done via the <literal>systable_*</literal> scan APIs only.
426+
Access via the <literal>heap_*</literal> scan APIs will error out.
427+
Additionally, any actions leading to transaction ID assignment are prohibited. That, among others,
425428
includes writing to tables, performing DDL changes, and
426429
calling <literal>txid_current()</literal>.
427430
</para>

src/backend/access/heap/heapam.c

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,6 +1834,17 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
18341834
HeapTuple
18351835
heap_getnext(HeapScanDesc scan, ScanDirection direction)
18361836
{
1837+
/*
1838+
* We don't expect direct calls to heap_getnext with valid
1839+
* CheckXidAlive for regular tables. Track that below.
1840+
*/
1841+
if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
1842+
!(IsCatalogRelation(scan->rs_rd) ||
1843+
RelationIsUsedAsCatalogTable(scan->rs_rd))))
1844+
ereport(ERROR,
1845+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
1846+
errmsg("improper heap_getnext call")));
1847+
18371848
/* Note: no locking manipulations needed */
18381849

18391850
HEAPDEBUG_1; /* heap_getnext( info ) */
@@ -1914,6 +1925,16 @@ heap_fetch(Relation relation,
19141925
OffsetNumber offnum;
19151926
bool valid;
19161927

1928+
/*
1929+
* We don't expect direct calls to heap_fetch with valid
1930+
* CheckXidAlive for regular tables. Track that below.
1931+
*/
1932+
if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
1933+
!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
1934+
ereport(ERROR,
1935+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
1936+
errmsg("improper heap_fetch call")));
1937+
19171938
/*
19181939
* Fetch and pin the appropriate page of the relation.
19191940
*/
@@ -2046,6 +2067,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
20462067
bool valid;
20472068
bool skip;
20482069

2070+
/*
2071+
* We don't expect direct calls to heap_hot_search_buffer with
2072+
* valid CheckXidAlive for regular tables. Track that below.
2073+
*/
2074+
if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
2075+
!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
2076+
ereport(ERROR,
2077+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2078+
errmsg("improper heap_hot_search_buffer call")));
2079+
20492080
/* If this is not the first call, previous call returned a (live!) tuple */
20502081
if (all_dead)
20512082
*all_dead = first_call;
@@ -2187,6 +2218,16 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
21872218
Buffer buffer;
21882219
HeapTupleData heapTuple;
21892220

2221+
/*
2222+
* We don't expect direct calls to heap_hot_search with
2223+
* valid CheckXidAlive for regular tables. Track that below.
2224+
*/
2225+
if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
2226+
!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
2227+
ereport(ERROR,
2228+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2229+
errmsg("improper heap_hot_search call")));
2230+
21902231
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
21912232
LockBuffer(buffer, BUFFER_LOCK_SHARE);
21922233
result = heap_hot_search_buffer(tid, relation, buffer, snapshot,
@@ -2216,6 +2257,16 @@ heap_get_latest_tid(Relation relation,
22162257
ItemPointerData ctid;
22172258
TransactionId priorXmax;
22182259

2260+
/*
2261+
* We don't expect direct calls to heap_get_latest_tid with valid
2262+
* CheckXidAlive for regular tables. Track that below.
2263+
*/
2264+
if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
2265+
!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
2266+
ereport(ERROR,
2267+
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2268+
errmsg("improper heap_get_latest_tid call")));
2269+
22192270
/* this is to avoid Assert failures on bad input */
22202271
if (!ItemPointerIsValid(tid))
22212272
return;

src/backend/access/index/genam.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "lib/stringinfo.h"
2626
#include "miscadmin.h"
2727
#include "storage/bufmgr.h"
28+
#include "storage/procarray.h"
2829
#include "utils/acl.h"
2930
#include "utils/builtins.h"
3031
#include "utils/lsyscache.h"
@@ -437,6 +438,17 @@ systable_getnext(SysScanDesc sysscan)
437438
else
438439
htup = heap_getnext(sysscan->scan, ForwardScanDirection);
439440

441+
/*
442+
* If CheckXidAlive is valid, then we check if it aborted. If it did, we
443+
* error out
444+
*/
445+
if (TransactionIdIsValid(CheckXidAlive) &&
446+
!TransactionIdIsInProgress(CheckXidAlive) &&
447+
!TransactionIdDidCommit(CheckXidAlive))
448+
ereport(ERROR,
449+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
450+
errmsg("transaction aborted during system catalog scan")));
451+
440452
return htup;
441453
}
442454

@@ -490,6 +502,18 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
490502
result = HeapTupleSatisfiesVisibility(tup, freshsnap, scan->rs_cbuf);
491503
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
492504
}
505+
506+
/*
507+
* If CheckXidAlive is valid, then we check if it aborted. If it did, we
508+
* error out
509+
*/
510+
if (TransactionIdIsValid(CheckXidAlive) &&
511+
!TransactionIdIsInProgress(CheckXidAlive) &&
512+
!TransactionIdDidCommit(CheckXidAlive))
513+
ereport(ERROR,
514+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
515+
errmsg("transaction aborted during system catalog scan")));
516+
493517
return result;
494518
}
495519

@@ -607,6 +631,17 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
607631
if (htup && sysscan->iscan->xs_recheck)
608632
elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
609633

634+
/*
635+
* If CheckXidAlive is valid, then we check if it aborted. If it did, we
636+
* error out
637+
*/
638+
if (TransactionIdIsValid(CheckXidAlive) &&
639+
!TransactionIdIsInProgress(CheckXidAlive) &&
640+
!TransactionIdDidCommit(CheckXidAlive))
641+
ereport(ERROR,
642+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
643+
errmsg("transaction aborted during system catalog scan")));
644+
610645
return htup;
611646
}
612647

src/backend/replication/logical/reorderbuffer.c

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
636636
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
637637

638638
/* setup snapshot to allow catalog access */
639-
SetupHistoricSnapshot(snapshot_now, NULL);
639+
SetupHistoricSnapshot(snapshot_now, NULL, xid);
640640
PG_TRY();
641641
{
642642
rb->message(rb, txn, lsn, false, prefix, message_size, message);
@@ -1442,6 +1442,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
14421442
volatile CommandId command_id = FirstCommandId;
14431443
bool using_subtxn;
14441444
ReorderBufferIterTXNState *volatile iterstate = NULL;
1445+
MemoryContext ccxt = CurrentMemoryContext;
14451446

14461447
txn->final_lsn = commit_lsn;
14471448
txn->end_lsn = end_lsn;
@@ -1468,7 +1469,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
14681469
ReorderBufferBuildTupleCidHash(rb, txn);
14691470

14701471
/* setup the initial snapshot */
1471-
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1472+
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
14721473

14731474
/*
14741475
* Decoding needs access to syscaches et al., which in turn use
@@ -1709,7 +1710,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
17091710

17101711

17111712
/* and continue with the new one */
1712-
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1713+
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
17131714
break;
17141715

17151716
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1729,7 +1730,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
17291730
snapshot_now->curcid = command_id;
17301731

17311732
TeardownHistoricSnapshot(false);
1732-
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1733+
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
17331734

17341735
/*
17351736
* Every time the CommandId is incremented, we could
@@ -1814,6 +1815,20 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
18141815
PG_CATCH();
18151816
{
18161817
/* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1818+
MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
1819+
ErrorData *errdata = CopyErrorData();
1820+
1821+
/*
1822+
* if the catalog scan access returned an error of
1823+
* rollback, then abort on the other side as well
1824+
*/
1825+
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
1826+
{
1827+
elog(LOG, "stopping decoding of %s (%u)",
1828+
txn->gid[0] != '\0'? txn->gid:"", txn->xid);
1829+
rb->abort(rb, txn, commit_lsn);
1830+
}
1831+
18171832
if (iterstate)
18181833
ReorderBufferIterTXNFinish(rb, iterstate);
18191834

@@ -1837,7 +1852,14 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
18371852
/* remove potential on-disk data, and deallocate */
18381853
ReorderBufferCleanupTXN(rb, txn);
18391854

1840-
PG_RE_THROW();
1855+
/* re-throw only if it's not an abort */
1856+
if (errdata->sqlerrcode != ERRCODE_TRANSACTION_ROLLBACK)
1857+
{
1858+
MemoryContextSwitchTo(ecxt);
1859+
PG_RE_THROW();
1860+
}
1861+
else
1862+
FlushErrorState();
18411863
}
18421864
PG_END_TRY();
18431865
}

src/backend/utils/time/snapmgr.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ static Snapshot SecondarySnapshot = NULL;
152152
static Snapshot CatalogSnapshot = NULL;
153153
static Snapshot HistoricSnapshot = NULL;
154154

155+
/*
156+
* An xid value pointing to a possibly ongoing or a prepared transaction.
157+
* Currently used in logical decoding. It's possible that such transactions
158+
* can get aborted while the decoding is ongoing.
159+
*/
160+
TransactionId CheckXidAlive = InvalidTransactionId;
161+
155162
/*
156163
* These are updated by GetSnapshotData. We initialize them this way
157164
* for the convenience of TransactionIdIsInProgress: even in bootstrap
@@ -2000,10 +2007,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
20002007
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
20012008
* access to behave just like it did at a certain point in the past.
20022009
*
2010+
* If a valid xid is passed in, we check if it is uncommitted and track it in
2011+
* CheckXidAlive. This is to re-check XID status while accessing catalog.
2012+
*
20032013
* Needed for logical decoding.
20042014
*/
20052015
void
2006-
SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
2016+
SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids,
2017+
TransactionId snapshot_xid)
20072018
{
20082019
Assert(historic_snapshot != NULL);
20092020

@@ -2012,8 +2023,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
20122023

20132024
/* setup (cmin, cmax) lookup hash */
20142025
tuplecid_data = tuplecids;
2015-
}
20162026

2027+
/*
2028+
* setup CheckXidAlive if it's not committed yet. We don't check
2029+
* if the xid aborted. That will happen during catalog access.
2030+
*/
2031+
if (TransactionIdIsValid(snapshot_xid) &&
2032+
!TransactionIdDidCommit(snapshot_xid))
2033+
CheckXidAlive = snapshot_xid;
2034+
else
2035+
CheckXidAlive = InvalidTransactionId;
2036+
}
20172037

20182038
/*
20192039
* Make catalog snapshots behave normally again.
@@ -2023,6 +2043,7 @@ TeardownHistoricSnapshot(bool is_error)
20232043
{
20242044
HistoricSnapshot = NULL;
20252045
tuplecid_data = NULL;
2046+
CheckXidAlive = InvalidTransactionId;
20262047
}
20272048

20282049
bool

src/include/utils/snapmgr.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,10 @@ extern char *ExportSnapshot(Snapshot snapshot);
103103

104104
/* Support for catalog timetravel for logical decoding */
105105
struct HTAB;
106+
extern TransactionId CheckXidAlive;
106107
extern struct HTAB *HistoricSnapshotGetTupleCids(void);
107-
extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids);
108+
extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids,
109+
TransactionId snapshot_xid);
108110
extern void TeardownHistoricSnapshot(bool is_error);
109111
extern bool HistoricSnapshotActive(void);
110112

0 commit comments

Comments
 (0)
0