34
34
#include "access/xlogutils.h"
35
35
#include "access/xlogreader.h"
36
36
#include "access/xlogrecord.h"
37
+ #include "access/twophase.h"
37
38
38
39
#include "catalog/pg_control.h"
39
40
@@ -73,6 +74,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
73
74
xl_xact_parsed_commit * parsed , TransactionId xid );
74
75
static void DecodeAbort (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
75
76
xl_xact_parsed_abort * parsed , TransactionId xid );
77
+ static void DecodePrepare (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
78
+ xl_xact_parsed_prepare * parsed );
76
79
77
80
/* common function to decode tuples */
78
81
static void DecodeXLogTuple (char * data , Size len , ReorderBufferTupleBuf * tup );
@@ -281,16 +284,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
281
284
break ;
282
285
}
283
286
case XLOG_XACT_PREPARE :
287
+ {
288
+ xl_xact_parsed_prepare parsed ;
284
289
285
- /*
286
- * Currently decoding ignores PREPARE TRANSACTION and will just
287
- * decode the transaction when the COMMIT PREPARED is sent or
288
- * throw away the transaction's contents when a ROLLBACK PREPARED
289
- * is received. In the future we could add code to expose prepared
290
- * transactions in the changestream allowing for a kind of
291
- * distributed 2PC.
292
- */
293
- ReorderBufferProcessXid (reorder , XLogRecGetXid (r ), buf -> origptr );
290
+ /* check that output plugin is capable of twophase decoding */
291
+ if (!ctx -> options .enable_twophase )
292
+ {
293
+ ReorderBufferProcessXid (reorder , XLogRecGetXid (r ), buf -> origptr );
294
+ break ;
295
+ }
296
+
297
+ /* ok, parse it */
298
+ ParsePrepareRecord (XLogRecGetInfo (buf -> record ),
299
+ XLogRecGetData (buf -> record ), & parsed );
300
+
301
+ /* does output plugin want this particular transaction? */
302
+ if (ctx -> callbacks .filter_prepare_cb &&
303
+ ReorderBufferPrepareNeedSkip (reorder , parsed .twophase_xid ,
304
+ parsed .twophase_gid ))
305
+ {
306
+ ReorderBufferProcessXid (reorder , parsed .twophase_xid ,
307
+ buf -> origptr );
308
+ break ;
309
+ }
310
+
311
+ DecodePrepare (ctx , buf , & parsed );
312
+ break ;
313
+ }
294
314
break ;
295
315
default :
296
316
elog (ERROR , "unexpected RM_XACT_ID record type: %u" , info );
@@ -633,9 +653,90 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
633
653
buf -> origptr , buf -> endptr );
634
654
}
635
655
656
+ /*
657
+ * Decide if we're processing COMMIT PREPARED, or a regular COMMIT.
658
+ * Regular commit simply triggers a replay of transaction changes from the
659
+ * reorder buffer. For COMMIT PREPARED that however already happened at
660
+ * PREPARE time, and so we only need to notify the subscriber that the GID
661
+ * finally committed.
662
+ *
663
+ * For output plugins that do not support PREPARE-time decoding of
664
+ * two-phase transactions, we never even see the PREPARE and all two-phase
665
+ * transactions simply fall through to the second branch.
666
+ */
667
+ if (TransactionIdIsValid (parsed -> twophase_xid ) &&
668
+ ReorderBufferTxnIsPrepared (ctx -> reorder ,
669
+ parsed -> twophase_xid , parsed -> twophase_gid ))
670
+ {
671
+ Assert (xid == parsed -> twophase_xid );
672
+ /* we are processing COMMIT PREPARED */
673
+ ReorderBufferFinishPrepared (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
674
+ commit_time , origin_id , origin_lsn ,
675
+ parsed -> twophase_gid , true);
676
+ }
677
+ else
678
+ {
679
+ /* replay actions of all transaction + subtransactions in order */
680
+ ReorderBufferCommit (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
681
+ commit_time , origin_id , origin_lsn );
682
+ }
683
+ }
684
+
685
+ /*
686
+ * Decode PREPARE record. Similar logic as in COMMIT
687
+ */
688
+ static void
689
+ DecodePrepare (LogicalDecodingContext * ctx , XLogRecordBuffer * buf ,
690
+ xl_xact_parsed_prepare * parsed )
691
+ {
692
+ XLogRecPtr origin_lsn = parsed -> origin_lsn ;
693
+ TimestampTz commit_time = parsed -> origin_timestamp ;
694
+ XLogRecPtr origin_id = XLogRecGetOrigin (buf -> record );
695
+ int i ;
696
+ TransactionId xid = parsed -> twophase_xid ;
697
+
698
+ /*
699
+ * Process invalidation messages, even if we're not interested in the
700
+ * transaction's contents, since the various caches need to always be
701
+ * consistent.
702
+ */
703
+ if (parsed -> nmsgs > 0 )
704
+ {
705
+ if (!ctx -> fast_forward )
706
+ ReorderBufferAddInvalidations (ctx -> reorder , xid , buf -> origptr ,
707
+ parsed -> nmsgs , parsed -> msgs );
708
+ ReorderBufferXidSetCatalogChanges (ctx -> reorder , xid , buf -> origptr );
709
+ }
710
+
711
+ /*
712
+ * Tell the reorderbuffer about the surviving subtransactions. We need to
713
+ * do this because the main transaction itself has not committed since we
714
+ * are in the prepare phase right now. So we need to be sure the snapshot
715
+ * is setup correctly for the main transaction in case all changes
716
+ * happened in subtransanctions
717
+ */
718
+ for (i = 0 ; i < parsed -> nsubxacts ; i ++ )
719
+ {
720
+ ReorderBufferCommitChild (ctx -> reorder , xid , parsed -> subxacts [i ],
721
+ buf -> origptr , buf -> endptr );
722
+ }
723
+
724
+ if (SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) ||
725
+ (parsed -> dbId != InvalidOid && parsed -> dbId != ctx -> slot -> data .database ) ||
726
+ ctx -> fast_forward || FilterByOrigin (ctx , origin_id ))
727
+ {
728
+ for (i = 0 ; i < parsed -> nsubxacts ; i ++ )
729
+ {
730
+ ReorderBufferForget (ctx -> reorder , parsed -> subxacts [i ], buf -> origptr );
731
+ }
732
+ ReorderBufferForget (ctx -> reorder , xid , buf -> origptr );
733
+
734
+ return ;
735
+ }
736
+
636
737
/* replay actions of all transaction + subtransactions in order */
637
- ReorderBufferCommit (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
638
- commit_time , origin_id , origin_lsn );
738
+ ReorderBufferPrepare (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
739
+ commit_time , origin_id , origin_lsn , parsed -> twophase_gid );
639
740
}
640
741
641
742
/*
@@ -647,6 +748,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
647
748
xl_xact_parsed_abort * parsed , TransactionId xid )
648
749
{
649
750
int i ;
751
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr ;
752
+ TimestampTz commit_time = 0 ;
753
+ XLogRecPtr origin_id = XLogRecGetOrigin (buf -> record );
754
+
755
+ if (parsed -> xinfo & XACT_XINFO_HAS_ORIGIN )
756
+ {
757
+ origin_lsn = parsed -> origin_lsn ;
758
+ commit_time = parsed -> origin_timestamp ;
759
+ }
760
+
761
+ /*
762
+ * If it's ROLLBACK PREPARED then handle it via callbacks.
763
+ */
764
+ if (TransactionIdIsValid (xid ) &&
765
+ !SnapBuildXactNeedsSkip (ctx -> snapshot_builder , buf -> origptr ) &&
766
+ parsed -> dbId == ctx -> slot -> data .database &&
767
+ !FilterByOrigin (ctx , origin_id ) &&
768
+ ReorderBufferTxnIsPrepared (ctx -> reorder , xid , parsed -> twophase_gid ))
769
+ {
770
+ ReorderBufferFinishPrepared (ctx -> reorder , xid , buf -> origptr , buf -> endptr ,
771
+ commit_time , origin_id , origin_lsn ,
772
+ parsed -> twophase_gid , false);
773
+ return ;
774
+ }
650
775
651
776
for (i = 0 ; i < parsed -> nsubxacts ; i ++ )
652
777
{
0 commit comments