@@ -83,6 +83,7 @@ typedef struct {
83
83
bool isReplicated ; /* transaction on replica */
84
84
bool isDistributed ; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
85
85
bool isPrepared ; /* transaction is perpared at first stage of 2PC */
86
+ bool isSuspended ; /* prepared transaction is suspended because coordinator node is switch to offline */
86
87
bool isTransactionBlock ; /* is transaction block */
87
88
bool containsDML ; /* transaction contains DML statements */
88
89
XidStatus status ; /* transaction status */
@@ -712,7 +713,7 @@ MtmXactCallback(XactEvent event, void *arg)
712
713
}
713
714
714
715
/*
715
- * Check if this is "normal" user trnsaction which should be distributed to other nodes
716
+ * Check if this is "normal" user transaction which should be distributed to other nodes
716
717
*/
717
718
static bool
718
719
MtmIsUserTransaction ()
@@ -734,6 +735,7 @@ MtmResetTransaction()
734
735
x -> gtid .xid = InvalidTransactionId ;
735
736
x -> isDistributed = false;
736
737
x -> isPrepared = false;
738
+ x -> isSuspended = false;
737
739
x -> isTwoPhase = false;
738
740
x -> csn =
739
741
x -> status = TRANSACTION_STATUS_UNKNOWN ;
@@ -763,6 +765,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
763
765
x -> isReplicated = MtmIsLogicalReceiver ;
764
766
x -> isDistributed = MtmIsUserTransaction ();
765
767
x -> isPrepared = false;
768
+ x -> isSuspended = false;
766
769
x -> isTwoPhase = false;
767
770
x -> isTransactionBlock = IsTransactionBlock ();
768
771
/* Application name can be changed usnig PGAPPNAME environment variable */
@@ -1004,14 +1007,18 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
1004
1007
A3E2
}
1005
1008
if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1006
1009
if (ts -> isPrepared ) {
1007
- elog (ERROR , "Commit of distributed transaction %s is suspended because node is switched to %s mode" , ts -> gid , MtmNodeStatusMnem [Mtm -> status ]);
1008
- }
1009
- if (Mtm -> status != MTM_ONLINE ) {
1010
- elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1011
- } else {
1012
- elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
1010
+ // GetNewTransactionId(false); /* force increment of transaction counter */
1011
+ // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1012
+ elog (WARNING , "Commit of distributed transaction %s is suspended because node is switched to %s mode" , ts -> gid , MtmNodeStatusMnem [Mtm -> status ]);
1013
+ x -> isSuspended = true;
1014
+ } else {
1015
+ if (Mtm -> status != MTM_ONLINE ) {
1016
+ elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1017
+ } else {
1018
+ elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
1019
+ }
1020
+ MtmAbortTransaction (ts );
1013
1021
}
1014
- MtmAbortTransaction (ts );
1015
1022
}
1016
1023
x -> status = ts -> status ;
1017
1024
MTM_LOG3 ("%d: Result of vote: %d" , MyProcPid , ts -> status );
@@ -1078,14 +1085,18 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
1078
1085
}
1079
1086
if (ts -> status != TRANSACTION_STATUS_ABORTED && !ts -> votingCompleted ) {
1080
1087
if (ts -> isPrepared ) {
1081
- elog (ERROR , "Commit of distributed transaction %s is suspended because node is switched to %s mode" , ts -> gid , MtmNodeStatusMnem [Mtm -> status ]);
1082
- }
1083
- if (Mtm -> status != MTM_ONLINE ) {
1084
- elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1085
- } else {
1086
- elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
1088
+ // GetNewTransactionId(false); /* force increment of transaction counter */
1089
+ // elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1090
+ elog (WARNING , "Commit of distributed transaction %s is suspended because node is switched to %s mode" , ts -> gid , MtmNodeStatusMnem [Mtm -> status ]);
1091
+ x -> isSuspended = true;
1092
+ } else {
1093
+ if (Mtm -> status != MTM_ONLINE ) {
1094
+ elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1095
+ } else {
1096
+ elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
1097
+ }
1098
+ MtmAbortTransaction (ts );
1087
1099
}
1088
- MtmAbortTransaction (ts );
1089
1100
}
1090
1101
x -> status = ts -> status ;
1091
1102
x -> xid = ts -> xid ;
@@ -1293,6 +1304,7 @@ static void MtmStartRecovery()
1293
1304
MtmLock (LW_EXCLUSIVE );
1294
1305
BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
1295
1306
MtmSwitchClusterMode (MTM_RECOVERY );
1307
+ Mtm -> recoveredLSN = InvalidXLogRecPtr ;
1296
1308
MtmUnlock ();
1297
1309
}
1298
1310
@@ -1604,6 +1616,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
1604
1616
MTM_LOG1 ("%d: node %d is caugth-up without locking cluster" , MyProcPid , nodeId );
1605
1617
/* We are lucky: caugth-up without locking cluster! */
1606
1618
}
1619
+ Mtm -> recoveredLSN = walLSN ;
1607
1620
MtmEnableNode (nodeId );
1608
1621
Mtm -> nConfigChanges += 1 ;
1609
1622
caughtUp = true;
@@ -2075,6 +2088,7 @@ static void MtmInitialize()
2075
2088
Mtm -> walSenderLockerMask = 0 ;
2076
2089
Mtm -> nodeLockerMask = 0 ;
2077
2090
Mtm -> reconnectMask = 0 ;
2091
+ Mtm -> recoveredLSN = InvalidXLogRecPtr ;
2078
2092
Mtm -> nLockers = 0 ;
2079
2093
Mtm -> nActiveTransactions = 0 ;
2080
2094
Mtm -> votingTransactions = NULL ;
@@ -2102,13 +2116,14 @@ static void MtmInitialize()
2102
2116
Mtm -> nodes [i ].con = MtmConnections [i ];
2103
2117
Mtm -> nodes [i ].flushPos = 0 ;
2104
2118
Mtm -> nodes [i ].lastHeartbeat = 0 ;
2105
- Mtm -> nodes [i ].restartLsn = 0 ;
2119
+ Mtm -> nodes [i ].restartLSN = InvalidXLogRecPtr ;
2106
2120
Mtm -> nodes [i ].originId = InvalidRepOriginId ;
2107
2121
Mtm -> nodes [i ].timeline = 0 ;
2122
+ Mtm -> nodes [i ].recoveredLSN = InvalidXLogRecPtr ;
2108
2123
}
2109
2124
Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
2110
2125
/* All transaction originated from the current node should be ignored during recovery */
2111
- Mtm -> nodes [MtmNodeId - 1 ].restartLsn = (XLogRecPtr )PG_UINT64_MAX ;
2126
+ Mtm -> nodes [MtmNodeId - 1 ].restartLSN = (XLogRecPtr )PG_UINT64_MAX ;
2112
2127
PGSemaphoreCreate (& Mtm -> sendSemaphore );
2113
2128
PGSemaphoreReset (& Mtm -> sendSemaphore );
2114
2129
SpinLockInit (& Mtm -> spinlock );
@@ -2850,18 +2865,21 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
2850
2865
*/
2851
2866
MtmReplicationMode MtmGetReplicationMode (int nodeId , sig_atomic_t volatile * shutdown )
2852
2867
{
2853
- bool recovery = false ;
2868
+ MtmReplicationMode mode = REPLMODE_OPEN_EXISTED ;
2854
2869
2855
- while (Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE )
2870
+ while (( Mtm -> status != MTM_CONNECTED && Mtm -> status != MTM_ONLINE ) || BIT_CHECK ( Mtm -> disabledNodeMask , nodeId - 1 ) )
2856
2871
{
2857
2872
if (* shutdown )
2858
2873
{
2859
2874
return REPLMODE_EXIT ;
2860
2875
}
2861
- MTM_LOG2 ("%d: receiver slot mode %s" , MyProcPid , MtmNodeStatusMnem [Mtm -> status ]);
2862
2876
MtmLock (LW_EXCLUSIVE );
2877
+ if (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
2878
+ mode = REPLMODE_CREATE_NEW ;
2879
+ }
2880
+ MTM_LOG2 ("%d: receiver slot mode %s" , MyProcPid , MtmNodeStatusMnem [Mtm -> status ]);
2863
2881
if (Mtm -> status == MTM_RECOVERY ) {
2864
- recovery = true ;
2882
+ mode = REPLMODE_RECOVERED ;
2865
2883
if ((Mtm -> recoverySlot == 0 && (Mtm -> donorNodeId == MtmNodeId || Mtm -> donorNodeId == nodeId ))
2866
2884
|| Mtm -> recoverySlot == nodeId )
2867
2885
{
@@ -2879,13 +2897,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2879
2897
/* delay opening of other slots until recovery is completed */
2880
2898
MtmSleep (STATUS_POLL_DELAY );
2881
2899
}
2882
- if (recovery ) {
2900
+ if (mode == REPLMODE_RECOVERED ) {
2883
2901
MTM_LOG1 ("%d: Restart replication from node %d after end of recovery" , MyProcPid , nodeId );
2902
+ } else if (mode == REPLMODE_CREATE_NEW ) {
2903
+ MTM_LOG1 ("%d: Start replication from recovered node %d" , MyProcPid , nodeId );
2884
2904
} else {
2885
2905
MTM_LOG1 ("%d: Continue replication from node %d" , MyProcPid , nodeId );
2886
2906
}
2887
- /* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2888
- return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL ;
2907
+ return mode ;
2889
2908
}
2890
2909
2891
2910
static bool MtmIsBroadcast ()
@@ -2964,7 +2983,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
2964
2983
MtmIsRecoverySession = true;
2965
2984
} else if (strcmp (strVal (elem -> arg ), "recovered" ) == 0 ) {
2966
2985
recoveryCompleted = true;
2967
- } else if (strcmp (strVal (elem -> arg ), "normal " ) != 0 ) {
2986
+ } else if (strcmp (strVal (elem -> arg ), "open_existed" ) != 0 && strcmp ( strVal ( elem -> arg ), "create_new " ) != 0 ) {
2968
2987
elog (ERROR , "Illegal recovery mode %s" , strVal (elem -> arg ));
2969
2988
}
2970
2989
} else {
@@ -2976,14 +2995,20 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
2976
2995
} else {
2977
2996
elog (ERROR , "Restart position is not specified" );
2978
2997
}
2998
+ } else if (strcmp ("mtm_recovered_pos" , elem -> defname ) == 0 ) {
2999
+ if (elem -> arg != NULL && strVal (elem -> arg ) != NULL ) {
3000
+ sscanf (strVal (elem -> arg ), "%lx" , & Mtm -> nodes [MtmReplicationNodeId - 1 ].recoveredLSN );
3001
+ } else {
3002
+ elog (ERROR , "Recovered position is not specified" );
3003
+ }
2979
3004
}
2980
F438
3005
}
2981
3006
MtmLock (LW_EXCLUSIVE );
2982
3007
if (MtmIsRecoverySession ) {
2983
3008
MTM_LOG1 ("%d: Node %d start recovery of node %d at position %lx" , MyProcPid , MtmNodeId , MtmReplicationNodeId , recoveryStartPos );
2984
3009
Assert (MyReplicationSlot != NULL );
2985
3010
if (recoveryStartPos < MyReplicationSlot -> data .restart_lsn ) {
2986
- elog (ERROR , "Specified recovery start position %lx is beyond restart lsn %lx" , recoveryStartPos , MyReplicationSlot -> data .restart_lsn );
3011
+ elog (WARNING , "Specified recovery start position %lx is beyond restart lsn %lx" , recoveryStartPos , MyReplicationSlot -> data .restart_lsn );
2987
3012
}
2988
3013
if (!BIT_CHECK (Mtm -> disabledNodeMask , MtmReplicationNodeId - 1 )) {
2989
3014
MtmDisableNode (MtmReplicationNodeId );
@@ -3132,17 +3157,17 @@ bool MtmFilterTransaction(char* record, int size)
3132
3157
default :
3133
3158
break ;
3134
3159
}
3135
- duplicate = Mtm -> status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm -> nodes [origin_node - 1 ].restartLsn ;
3160
+ duplicate = Mtm -> status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm -> nodes [origin_node - 1 ].restartLSN ;
3136
3161
3137
3162
MTM_LOG1 ("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx" ,
3138
- duplicate ? "Ignore" : "Apply" , gid , replication_node , end_lsn , origin_node , origin_lsn , Mtm -> nodes [origin_node - 1 ].restartLsn );
3163
+ duplicate ? "Ignore" : "Apply" , gid , replication_node , end_lsn , origin_node , origin_lsn , Mtm -> nodes [origin_node - 1 ].restartLSN );
3139
3164
if (Mtm -> status == MTM_RECOVERY ) {
3140
- if (Mtm -> nodes [origin_node - 1 ].restartLsn < origin_lsn ) {
3141
- Mtm -> nodes [origin_node - 1 ].restartLsn = origin_lsn ;
3165
+ if (Mtm -> nodes [origin_node - 1 ].restartLSN < origin_lsn ) {
3166
+ Mtm -> nodes [origin_node - 1 ].restartLSN = origin_lsn ;
3142
3167
}
3143
3168
} else {
3144
- if (Mtm -> nodes [replication_node - 1 ].restartLsn < end_lsn ) {
3145
- Mtm -> nodes [replication_node - 1 ].restartLsn = end_lsn ;
3169
+ if (Mtm -> nodes [replication_node - 1 ].restartLSN < end_lsn ) {
3170
+ Mtm -> nodes [replication_node - 1 ].restartLSN = end_lsn ;
3146
3171
}
3147
3172
}
3148
3173
return duplicate ;
@@ -3757,12 +3782,16 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
3757
3782
/* ??? Should we do explicit rollback */
3758
3783
} else {
3759
3784
CommitTransactionCommand ();
3760
-
10000
StartTransactionCommand ();
3761
- if (MtmGetCurrentTransactionStatus () == TRANSACTION_STATUS_ABORTED ) {
3762
- FinishPreparedTransaction (x -> gid , false);
3763
- elog (ERROR , "Transaction %s is aborted by DTM" , x -> gid );
3764
- } else {
3765
- FinishPreparedTransaction (x -> gid , true);
3785
+ if (x -> isSuspended ) {
3786
+ elog (WARNING , "Transaction %s is left in prepared state because coordinator onde is not online" , x -> gid );
3787
+ } else {
3788
+ StartTransactionCommand ();
3789
+ if (x -> status == TRANSACTION_STATUS_ABORTED ) {
3790
+ FinishPreparedTransaction (x -> gid , false);
3791
+ elog (ERROR , "Transaction %s is aborted by DTM" , x -> gid );
3792
+ } else {
3793
+ FinishPreparedTransaction (x -> gid , true);
3794
+ }
3766
3795
}
3767
3796
}
3768
3797
return true;
0 commit comments