@@ -583,7 +583,9 @@ MtmAdjustOldestXid(TransactionId xid)
583583
584584 for (ts = Mtm -> transListHead ;
585585 ts != NULL
586+ && (ts -> status == TRANSACTION_STATUS_ABORTED || ts -> status == TRANSACTION_STATUS_COMMITTED )
586587 && ts -> csn < oldestSnapshot
588+ && !ts -> isPinned
587589 && TransactionIdPrecedes (ts -> xid , xid );
588590 prev = ts , ts = ts -> next )
589591 {
@@ -653,6 +655,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
653655 sts = (MtmTransState * )hash_search (MtmXid2State , & subxids [i ], HASH_ENTER , & found );
654656 Assert (!found );
655657 sts -> isActive = false;
658+ sts -> isPinned = false;
656659 sts -> status = ts -> status ;
657660 sts -> csn = ts -> csn ;
658661 sts -> votingCompleted = true;
@@ -814,6 +817,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
814817 ts -> isLocal = true;
815818 ts -> isPrepared = false;
816819 ts -> isTwoPhase = x -> isTwoPhase ;
820+ ts -> isPinned = false;
817821 ts -> votingCompleted = false;
818822 if (!found ) {
819823 ts -> isEnqueued = false;
@@ -963,8 +967,13 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
963967{
964968 int result = 0 ;
965969 int nConfigChanges = Mtm -> nConfigChanges ;
966- timestamp_t elapsed , start = MtmGetSystemTime ();
967- timestamp_t deadline = 0 ;
970+ timestamp_t prepareTime = ts -> csn - ts -> snapshot ;
971+ timestamp_t timeout = Max (prepareTime + MSEC_TO_USEC (MtmMin2PCTimeout ), prepareTime * MtmMax2PCRatio /100 );
972+ timestamp_t deadline = MtmGetSystemTime () + timeout ;
973+ timestamp_t now ;
974+
975+ Assert (ts -> csn > ts -> snapshot );
976+
968977 /* Wait votes from all nodes until: */
969978 while (!MtmVotingCompleted (ts )
970979 && (ts -> isPrepared || nConfigChanges == Mtm -> nConfigChanges ))
@@ -980,19 +989,16 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
980989 if (result & WL_LATCH_SET ) {
981990 ResetLatch (& MyProc -> procLatch );
982991 }
983- elapsed = MtmGetSystemTime () - start ;
992+ now = MtmGetSystemTime ();
984993 MtmLo
8000
ck (LW_EXCLUSIVE );
985- if (deadline == 0 && ts -> votedMask != 0 ) {
986- deadline = Max (MSEC_TO_USEC (MtmMin2PCTimeout ), elapsed * MtmMax2PCRatio /100 );
987- } else {
994+ if (now > deadline ) {
988995 if (ts -> isPrepared ) {
989996 /* resend precommit message */
990997 MtmSend2PCMessage (ts , MSG_PRECOMMIT );
991998 } else {
992- if (elapsed > deadline ) {
993- elog (WARNING , "Commit of distributed transaction is canceled because of %ld msec timeout expiration" , USEC_TO_MSEC (elapsed ));
994- MtmAbortTransaction (ts );
995- }
999+ elog (WARNING , "Commit of distributed transaction is canceled because of %ld msec timeout expiration" , USEC_TO_MSEC (timeout ));
1000+ MtmAbortTransaction (ts );
1001+ break ;
9961002 }
9971003 }
9981004 }
@@ -1005,7 +1011,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10051011 } else {
10061012 if (Mtm -> status != MTM_ONLINE ) {
10071013 elog (WARNING , "Commit of distributed transaction is canceled because node is switched to %s mode" , MtmNodeStatusMnem [Mtm -> status ]);
1008- } else if ( nConfigChanges != Mtm -> nConfigChanges ) {
1014+ } else {
10091015 elog (WARNING , "Commit of distributed transaction is canceled because cluster configuration was changed" );
10101016 }
10111017 MtmAbortTransaction (ts );
@@ -1202,6 +1208,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021208 ts -> status = TRANSACTION_STATUS_ABORTED ;
12031209 ts -> isLocal = true;
12041210 ts -> isPrepared = false;
1211+ ts -> isPinned = false;
12051212 ts -> snapshot = x -> snapshot ;
12061213 ts -> isTwoPhase = x -> isTwoPhase ;
12071214 ts -> csn = MtmAssignCSN ();
@@ -1280,7 +1287,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12801287 }
12811288}
12821289
1283- void MtmBroadcastPollMessage (MtmTransState * ts )
1290+ static void MtmBroadcastPollMessage (MtmTransState * ts )
12841291{
12851292 int i ;
12861293 MtmArbiterMessage m
6D40
sg ;
@@ -1293,7 +1300,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
12931300
12941301 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
12951302 {
1296- if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ ts -> votedMask , i ))
1303+ if (BIT_CHECK (ts -> participantsMask & ~Mtm -> disabledNodeMask , i ))
12971304 {
12981305 msg .node = i + 1 ;
12991306 MTM_LOG3 ("Send request for transaction %s to node %d" , msg .gid , msg .node );
@@ -1480,15 +1487,17 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14801487 Assert (ts -> gid [0 ]);
14811488 if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
14821489 elog (LOG , "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d" , ts -> gid , MtmNodeId );
1483- //MtmUnlock();
1490+ ts -> isPinned = true;
1491+ MtmUnlock ();
14841492 MtmFinishPreparedTransaction (ts , false);
1485- //MtmLock(LW_EXCLUSIVE);
1493+ MtmLock (LW_EXCLUSIVE );
1494+ ts -> isPinned = false;
14861495 } else {
14871496 MTM_LOG1 ("Poll state of transaction %d (%s)" , ts -> xid , ts -> gid );
14881497 MtmBroadcastPollMessage (ts );
14891498 }
14901499 } else {
1491- MTM_LOG2 ("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx" ,
1500+ MTM_LOG1 ("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx" ,
14921501 ts -> xid , ts -> gid , ts -> status , ts -> gtid .node , ts -> gtid .xid , ts -> votedMask );
14931502 }
14941503 }
@@ -3214,8 +3223,13 @@ bool MtmFilterTransaction(char* record, int size)
32143223 duplicate = true;
32153224 }
32163225
3217- MTM_LOG2 ("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3218- duplicate ? "Ignore" : "Apply" , gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
3226+ if (duplicate ) {
3227+ MTM_LOG1 ("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3228+ gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
3229+ } else {
3230+ MTM_LOG2 ("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3231+ gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
3232+ }
32193233 return duplicate ;
32203234}
32213235
@@ -3829,7 +3843,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38293843 } else {
38303844 CommitTransactionCommand ();
38313845 if (x -> isSuspended ) {
3832- elog (WARNING , "Transaction %s is left in prepared state because coordinator onde is not online" , x -> gid );
3846+ elog (WARNING , "Transaction %s is left in prepared state because coordinator node is not online" , x -> gid );
38333847 } else {
38343848 StartTransactionCommand ();
38353849 if (x -> status == TRANSACTION_STATUS_ABORTED ) {
0 commit comments