8000 Offer to do 2PC only if snap is consistent. · postgrespro/postgres_cluster@f46f9d8 · GitHub
[go: up one dir, main page]

Skip to content

Commit f46f9d8

Browse files
committed
Offer to do 2PC only if snap is consistent.
1 parent f1bb569 commit f46f9d8

File tree

3 files changed

+37
-28
lines changed

src/backend/replication/logical/decode.c

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
287287
{
288288
xl_xact_parsed_prepare parsed;
289289

290-
/* check that output plugin is capable of twophase decoding */
291-
if (!ctx->options.enable_twophase)
290+
/*
291+
* Check that output plugin is capable of twophase decoding.
292+
* We also don't offer to do 2PC if snap is not yet consistent
293+
* as of reading PREPARE.
294+
*/
295+
if (!ctx->options.enable_twophase ||
296+
SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
292297
{
293298
ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
294299
break;
@@ -584,13 +589,23 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
584589
TimestampTz commit_time = parsed->xact_time;
585590
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
586591
int i;
592+
bool reorderbuffer_has_xid;
587593

588594
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
589595
{
590596
origin_lsn = parsed->origin_lsn;
591597
commit_time = parsed->origin_timestamp;
592598
}
593599

600+
/*
601+
* If this is COMMIT PREPARED and ReorderBuffer doesn't have this xid,
602+
* either the plugin refused to do 2PC on this xact or we didn't have
603+
* consistent snapshot yet during PREPARE processing. Anyway, in this case
604+
* we don't do 2PC and replay xact fully now. We must check this early
605+
* since invalidation addition below might add the record to the RB.
606+
*/
607+
reorderbuffer_has_xid = ReorderBufferHasXid(ctx->reorder, xid);
608+
594609
/*
595610
* Process invalidation messages, even if we're not interested in the
596611
* transaction's contents, since the various caches need to always be
@@ -663,10 +678,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
663678
* For output plugins that do not support PREPARE-time decoding of
664679
* two-phase transactions, we never even see the PREPARE and all two-phase
665680
* transactions simply fall through to the second branch.
681+
*
682+
* We rely on existence of xid in reorderbuffer to determine was 2PC done
683+
* or not. This is correct because we always see PREPARE before COMMIT
684+
* PREPARED if the latter was after consistent point.
685+
*
666686
*/
667687
if (TransactionIdIsValid(parsed->twophase_xid) &&
668-
ReorderBufferTxnIsPrepared(ctx->reorder,
669-
parsed->twophase_xid, parsed->twophase_gid))
688+
!reorderbuffer_has_xid)
670689
{
671690
Assert(xid == parsed->twophase_xid);
672691
/* we are processing COMMIT PREPARED */
@@ -765,7 +784,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
765784
!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
766785
parsed->dbId == ctx->slot->data.database &&
767786
!FilterByOrigin(ctx, origin_id) &&
768-
ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
787+
!ReorderBufferHasXid(ctx->reorder, xid))
769788
{
770789
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
771790
commit_time, origin_id, origin_lsn,

src/backend/replication/logical/reorderbuffer.c

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1932,27 +1932,6 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
19321932
commit_time, origin_id, origin_lsn);
19331933
}
19341934

1935-
/*
1936-
* Check whether this transaction was sent as prepared to subscribers.
1937-
* Called while handling commit|abort prepared.
1938-
*/
1939-
bool
1940-
ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
1941-
const char *gid)
1942-
{
1943-
ReorderBufferTXN *txn;
1944-
1945-
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1946-
false);
1947-
1948-
/*
1949-
* Always call the prepare filter. It's the job of the prepare filter to
1950-
* give us the *same* response for a given xid across multiple calls
1951-
* (including ones on restart)
1952-
*/
1953-
return !(rb->filter_prepare(rb, txn, xid, gid));
1954-
}
1955-
19561935
/*
19571936
* Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
19581937
*/
@@ -2175,6 +2154,18 @@ ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
21752154
ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
21762155
}
21772156

2157+
/*
2158+
* Does reorderbuffer currently holds this xact?
2159+
*/
2160+
bool
2161+
ReorderBufferHasXid(ReorderBuffer *rb, TransactionId xid)
2162+
{
2163+
ReorderBufferTXN *txn;
2164+
2165+
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, true);
2166+
return txn != NULL;
2167+
}
2168+
21782169
/*
21792170
* Add a new snapshot to this transaction that may only used after lsn 'lsn'
21802171
* because the previous snapshot doesn't describe the catalog correctly for

src/include/replication/reorderbuffer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,14 +497,13 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr ls
497497
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
498498
SharedInvalidationMessage *invalidations);
499499
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
500+
bool ReorderBufferHasXid(ReorderBuffer *, TransactionId xid);
500501
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
501502
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
502503
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
503504

504505
bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
505506
const char *gid);
506-
bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
507-
const char *gid);
508507
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
509508
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
510509
TimestampTz commit_time,

0 commit comments

Comments
 (0)
0