@@ -287,8 +287,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
287
287
{
288
288
xl_xact_parsed_prepare parsed ;
289
289
290
- /* check that output plugin is capable of twophase decoding */
291
- if (!ctx -> options .enable_twophase )
290
+ /*
291
+ * Check that output plugin is capable of twophase decoding.
292
+ * We also don't offer to do 2PC if snap is not yet consistent
293
+ * as of reading PREPARE.
294
+ */
295
+ if (!ctx -> options .enable_twophase ||
296
+ SnapBuildCurrentState (builder ) < SNAPBUILD_CONSISTENT )
292
297
{
293
298
ReorderBufferProcessXid (reorder , XLogRecGetXid (r ), buf -> origptr );
294
299
break ;
@@ -584,13 +589,23 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
584
589
TimestampTz commit_time = parsed -> xact_time ;
585
590
RepOriginId origin_id = XLogRecGetOrigin (buf -> record );
586
591
int i ;
592
+ bool reorderbuffer_has_xid ;
587
593
588
594
if (parsed -> xinfo & XACT_XINFO_HAS_ORIGIN )
589
595
{
590
596
origin_lsn = parsed -> origin_lsn ;
591
597
commit_time = parsed -> origin_timestamp ;
592
598
}
593
599
600
+ /*
601
+ * If this is COMMIT PREPARED and ReorderBuffer doesn't have this xid,
602
+ * either the plugin refused to do 2PC on this xact or we didn't have
603
+ * consistent snapshot yet during PREPARE processing. Anyway, in this case
604
+ * we don't do 2PC and replay xact fully now. We must check this early
605
+ * since invalidation addition below might add the record to the RB.
606
+ */
607
+ reorderbuffer_has_xid = ReorderBufferHasXid (ctx -> reorder , xid );
608
+
594
609
/*
595
610
* Process invalidation messages, even if we're not interested in the
596
611
* transaction's contents, since the various caches need to always be
@@ -663,10 +678,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
663
678
* For output plugins that do not support PREPARE-time decoding of
664
679
* two-phase transactions, we never even see the PREPARE and all two-phase
665
680
* transactions simply fall through to the second branch.
681
+ *
682
+ * We rely on existence of xid in reorderbuffer to determine was 2PC done
683
+ * or not. This is correct because we always see PREPARE before COMMIT
684
+ * PREPARED if the latter was after consistent point.
685
+ *
666
686
*/
667
687
if (TransactionIdIsValid (parsed -> twophase_xid ) &&
668
- ReorderBufferTxnIsPrepared (ctx -> reorder ,
669
- parsed -> twophase_xid , parsed -> twophase_gid ))
688
+ !reorderbuffer_has_xid )
670
689
{
671
690
Assert (xid == parsed -> twophase_xid );
672
691
/* we are processing COMMIT PREPARED */
@@ -765,7 +784,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
765
784
!SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) &&
766
785
parsed -> dbId == ctx -> slot -> data .database &&
767
786
!FilterByOrigin (ctx , origin_id ) &&
768
- ReorderBufferTxnIsPrepared (ctx -> reorder , xid , parsed -> twophase_gid ))
787
+ ! ReorderBufferHasXid (ctx -> reorder , xid ))
769
788
{
770
789
ReorderBufferFinishPrepared (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
771
790
commit_time , origin_id , origin_lsn ,
0 commit comments