15
15
#include "miscadmin.h"
16
16
17
17
#include "libpq-fe.h"
18
+ #include "lib/stringinfo.h"
19
+ #include "libpq/pqformat.h"
18
20
#include "common/username.h"
19
21
20
22
#include "postmaster/postmaster.h"
@@ -926,7 +928,9 @@ MtmVotingCompleted(MtmTransState* ts)
926
928
ts -> votingCompleted = true;
927
929
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
928
930
return true;
929
- } else {
931
+ } else {
932
+ MTM_LOG1 ("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)" ,
933
+ ts -> gid , ts -> status , ts -> participantsMask , Mtm -> disabledNodeMask , ts -> votedMask );
930
934
ts -> isPrepared = true;
931
935
if (ts -> isTwoPhase ) {
932
936
ts -> votingCompleted = true;
@@ -980,9 +984,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980
984
MtmResetTransaction ();
981
985
} else {
982
986
int result = 0 ;
983
-
987
+ int nConfigChanges = Mtm -> nConfigChanges ;
984
988
/* Wait votes from all nodes until: */
985
- while (!MtmVotingCompleted (ts ))
989
+ while (!MtmVotingCompleted (ts )
990
+ && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
986
991
{
987
992
MtmUnlock ();
988
993
MTM_TXTRACE (x , "PostPrepareTransaction WaitLatch Start" );
@@ -998,8 +1003,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
998
1003
MtmLock (LW_EXCLUSIVE );
999
1004
}
1000
1005
if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1006
+ if (ts -> isPrepared ) {
1007
+ elog (ERROR , "Commit of distributed transaction %s is suspended because node is switched to %s mode" , ts -> gid , MtmNodeStatusMnem [Mtm -> status ]);
1008
+ }
1009
+ if (Mtm -> status != MTM_ONLINE ) {
1010
+ elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1011
+ } else {
1012
+ elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
1013
+ }
1001
1014
MtmAbortTransaction (ts );
1002
- elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1003
1015
}
1004
1016
x -> status = ts -> status ;
1005
1017
MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
@@ -1032,6 +1044,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
1032
1044
elog (WARNING , "Global transaciton ID '%s' is not found" , x -> gid );
1033
1045
} else {
1034
1046
int result = 0 ;
1047
+ int nConfigChanges = Mtm -> nConfigChanges ;
1035
1048
1036
1049
Assert (tm -> state != NULL );
1037
1050
MTM_LOG3 ("Commit prepared transaction %d with gid='%s'" , x -> xid , x -> gid );
@@ -1046,7 +1059,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
1046
1059
MtmSend2PCMessage (ts , MSG_PRECOMMIT );
1047
1060
1048
1061
/* Wait votes from all nodes until: */
1049
- while (!MtmVotingCompleted (ts ))
1062
+ while (!MtmVotingCompleted (ts )
1063
+ && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
1050
1064
{
1051
1065
MtmUnlock ();
1052
1066
MTM_TXTRACE (x , "CommitPreparedTransaction WaitLatch Start" );
@@ -1063,8 +1077,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
1063
1077
}
1064
1078
}
1065
1079
if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1080
+ if (ts -> isPrepared ) {
1081
+ elog (ERROR , "Commit of distributed transaction %s is suspended because node is switched to %s mode" , ts -> gid , MtmNodeStatusMnem [Mtm -> status ]);
1082
+ }
1083
+ if (Mtm -> status != MTM_ONLINE ) {
1084
+ elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1085
+ } else {
1086
+ elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
1087
+ }
1066
1088
MtmAbortTransaction (ts );
1067
- elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1068
1089
}
1069
1090
x -> status = ts -> status ;
1070
1091
x -> xid = ts -> xid ;
@@ -1166,11 +1187,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1166
1187
}
1167
1188
ts -> status = TRANSACTION_STATUS_ABORTED ;
1168
1189
ts -> isLocal = true;
1190
+ ts -> isPrepared = false;
1169
1191
ts -> snapshot = x -> snapshot ;
1192
+ ts -> isTwoPhase = x -> isTwoPhase ;
1170
1193
ts -> csn = MtmAssignCSN ();
1171
1194
ts -> gtid = x -> gtid ;
1172
1195
ts -> nSubxids = 0 ;
1173
1196
ts -> votingCompleted = true;
1197
+ strcpy (ts -> gid , x -> gid );
1174
1198
if (ts -> isActive ) {
1175
1199
ts -> isActive = false;
1176
1200
Assert (Mtm -> nActiveTransactions != 0 );
@@ -1226,8 +1250,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
1226
1250
int i ;
1227
1251
for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
1228
1252
{
1229
- if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask , i ) && TransactionIdIsValid ( ts -> xids [ i ]) )
1253
+ if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask , i ))
1230
1254
{
1255
+ Assert (TransactionIdIsValid (ts -> xids [i ]));
1231
1256
msg .node = i + 1 ;
1232
1257
msg .dxid = ts -> xids [i ];
1233
1258
MtmSendMessage (& msg );
@@ -1655,7 +1680,7 @@ MtmCheckClusterLock()
1655
1680
continue ;
1656
1681
} else {
1657
1682
/* All lockers are synchronized their logs */
1658
- /* Remove lock and mark them as rceovered */
1683
+ /* Remove lock and mark them as recovered */
1659
1684
MTM_LOG1 ("Complete recovery of %d nodes (node mask %lx)" , Mtm -> nLockers , (long ) Mtm -> nodeLockerMask );
1660
1685
Assert (Mtm -> walSenderLockerMask == 0 );
1661
1686
Assert ((Mtm -> nodeLockerMask & Mtm -> disabledNodeMask ) == Mtm -> nodeLockerMask );
@@ -2082,6 +2107,8 @@ static void MtmInitialize()
2082
2107
Mtm -> nodes [i ].timeline = 0 ;
2083
2108
}
2084
2109
Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
2110
+ /* All transaction originated from the current node should be ignored during recovery */
2111
+ Mtm -> nodes [MtmNodeId - 1 ].restartLsn = (XLogRecPtr )PG_UINT64_MAX ;
2085
2112
PGSemaphoreCreate (& Mtm -> sendSemaphore );
2086
2113
PGSemaphoreReset (& Mtm -> sendSemaphore );
2087
2114
SpinLockInit (& Mtm -> spinlock );
@@ -2806,12 +2833,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
2806
2833
Assert (!IsTransactionState ());
2807
2834
MtmResetTransaction ();
2808
2835
StartTransactionCommand ();
2809
- #if 0
2810
- if (Mtm -> nodes [MtmNodeId - 1 ].originId == InvalidRepOriginId ) {
2811
- /* This dummy origin is used for local commits/aborts which should not be replicated */
2812
- Mtm -> nodes [MtmNodeId - 1 ].originId = replorigin_create (psprintf (MULTIMASTER_SLOT_PATTERN , MtmNodeId ));
2813
- }
2814
- #endif
2836
+
2815
2837
MtmBeginSession (MtmNodeId );
2816
2838
MtmSetCurrentTransactionCSN (ts -> csn );
2817
2839
MtmSetCurrentTransactionGID (ts -> gid );
@@ -2828,7 +2850,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
2828
2850
*/
2829
2851
MtmReplicationMode MtmGetReplicationMode (int nodeId , sig_atomic_t volatile * shutdown )
2830
2852
{
2831
- int i ;
2832
2853
bool recovery = false;
2833
2854
2834
2855
while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE )
@@ -2850,9 +2871,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2850
2871
Mtm -> nReceivers = 0 ;
2851
2872
Mtm -> recoveryCount += 1 ;
2852
2873
Mtm -> pglogicalNodeMask = 0 ;
2853
- for (i = 0 ; i < Mtm -> nAllNodes ; i ++ ) {
2854
- Mtm -> nodes [i ].restartLsn = InvalidXLogRecPtr ;
2855
- }
2856
2874
MtmUnlock ();
2857
2875
return REPLMODE_RECOVERY ;
2858
2876
}
@@ -3069,6 +3087,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
3069
3087
return isDistributed ;
3070
3088
}
3071
3089
3090
+ bool MtmFilterTransaction (char * record , int size )
3091
+ {
3092
+ StringInfoData s ;
3093
+ uint8 flags ;
3094
+ XLogRecPtr origin_lsn ;
3095
+ XLogRecPtr end_lsn ;
3096
+ int replication_node ;
3097
+ int origin_node ;
3098
+ char const * gid = "" ;
3099
+ bool duplicate ;
3100
+
3101
+ s .data = record ;
3102
+ s .len = size ;
3103
+ s .maxlen = -1 ;
3104
+ s .cursor = 0 ;
3105
+
3106
+ Assert (pq_getmsgbyte (& s ) == 'C' );
3107
+ flags = pq_getmsgbyte (& s ); /* flags */
3108
+ replication_node = pq_getmsgbyte (& s );
3109
+
3110
+ /* read fields */
3111
+ pq_getmsgint64 (& s ); /* commit_lsn */
3112
+ end_lsn = pq_getmsgint64 (& s ); /* end_lsn */
3113
+ pq_getmsgint64 (& s ); /* commit_time */
3114
+
3115
+ origin_node = pq_getmsgbyte (& s );
3116
+ origin_lsn = pq_getmsgint64 (& s );
3117
+
3118
+ Assert (replication_node == MtmReplicationNodeId &&
3119
+ origin_node != 0 &&
3120
+ (Mtm -> status == MTM_RECOVERY || origin_node == replication_node ));
3121
+
3122
+ switch (PGLOGICAL_XACT_EVENT (flags ))
3123
+ {
3124
+ case PGLOGICAL_PREPARE :
3125
+ case PGLOGICAL_ABORT_PREPARED :
3126
+ gid = pq_getmsgstring (& s );
3127
+ break ;
3128
+ case PGLOGICAL_COMMIT_PREPARED :
3129
+ pq_getmsgint64 (& s ); /* CSN */
3130
+ gid = pq_getmsgstring (& s );
3131
+ break ;
3132
+ default :
3133
+ break ;
3134
+ }
3135
+ duplicate = Mtm -> status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm -> nodes [origin_node - 1 ].restartLsn ;
3136
+
3137
+ MTM_LOG1 ("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx" ,
3138
+ duplicate ? "Ignore" : "Apply" , gid , replication_node , end_lsn , origin_node , origin_lsn , Mtm -> nodes [origin_node - 1 ].restartLsn );
3139
+ if (Mtm -> status == MTM_RECOVERY ) {
3140
+ if (Mtm -> nodes [origin_node - 1 ].restartLsn < origin_lsn ) {
3141
+ Mtm -> nodes [origin_node - 1 ].restartLsn = origin_lsn ;
3142
+ }
3143
+ } else {
3144
+ if (Mtm -> nodes [replication_node - 1 ].restartLsn < end_lsn ) {
3145
+ Mtm -> nodes [replication_node - 1 ].restartLsn = end_lsn ;
3146
+ }
3147
+ }
3148
+ return duplicate ;
3149
+ }
3150
+
3072
3151
void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
3073
3152
{
3074
3153
hooks -> startup_hook = MtmReplicationStartupHook ;
0 commit comments