8000 Rewrite origin LSN calculation · postgrespro/postgres_cluster@1ae54f5 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1ae54f5

Browse files
committed
Rewrite origin LSN calculation
1 parent 7d99788 commit 1ae54f5

File tree

4 files changed

+58
-41
lines changed

4 files changed

+58
-41
lines changed

contrib/mmts/multimaster.c

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,16 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11331133
}
11341134
}
11351135

1136+
static void
1137+
MtmLogAbortLogicalMessage(int nodeId, char const* gid)
1138+
{
1139+
MtmAbortLogicalMessage msg;
1140+
strcpy(msg.gid, gid);
1141+
msg.origin_node = nodeId;
1142+
msg.origin_lsn = replorigin_session_origin_lsn;
1143+
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
1144+
}
1145+
11361146
static void
11371147
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11381148
{
@@ -1154,7 +1164,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541164
}
11551165
if (ts != NULL) {
11561166
if (*ts->gid)
1157-
MTM_LOG2("TRANSLOG: %s transaction %s status %d", (commit ? "commit" : "rollback"), ts->gid, ts->status);
1167+
MTM_LOG1("TRANSLOG: %s transaction git=%s xid=%d node=%d dxid=%d status %d",
1168+
(commit ? "commit" : "rollback"), ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, ts->status);
11581169
if (commit) {
11591170
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
11601171
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
@@ -1213,7 +1224,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12131224
}
12141225
MtmTransactionListAppend(ts);
12151226
if (*x->gid) {
1216-
LogLogicalMessage("A", x->gid, strlen(x->gid) + 1, false);
1227+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
1228+
MtmLogAbortLogicalMessage(MtmNodeId, x->gid);
12171229
}
12181230
}
12191231
MtmSend2PCMessage(ts, MSG_ABORTED); /* send notification to coordinator */
@@ -2824,20 +2836,23 @@ void MtmReleaseRecoverySlot(int nodeId)
28242836
if (Mtm->recoverySlot == nodeId) {
28252837
Mtm->recoverySlot = 0;
28262838
}
2827-
}
2839+
}
28282840

2829-
void MtmRollbackPreparedTransaction(char const* gid)
2841+
void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28302842
{
2831-
MTM_LOG1("Abort prepared transaction %s", gid);
2832-
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
2843+
XidStatus status = MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED);
2844+
MTM_LOG1("Abort prepared transaction %s status %d", gid, status);
2845+
if (status == TRANSACTION_STATUS_UNKNOWN) {
28332846
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
28342847
MtmResetTransaction();
28352848
StartTransactionCommand();
2836-
MtmBeginSession(MtmReplicationNodeId);
2849+
MtmBeginSession(nodeId);
28372850
MtmSetCurrentTransactionGID(gid);
28382851
FinishPreparedTransaction(gid, false);
28392852
CommitTransactionCommand();
2840-
MtmEndSession(MtmReplicationNodeId, true);
2853+
MtmEndSession(nodeId, true);
2854+
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
2855+
MtmLogAbortLogicalMessage(nodeId, gid);
28412856
}
28422857
}
28432858

@@ -3157,19 +3172,11 @@ bool MtmFilterTransaction(char* record, int size)
31573172
default:
31583173
break;
31593174
}
3160-
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3175+
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3176+
duplicate = origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31613177

3162-
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3163-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
3164-
if (Mtm->status == MTM_RECOVERY) {
3165-
if (Mtm->nodes[origin_node-1].restartLSN < origin_lsn) {
3166-
Mtm->nodes[origin_node-1].restartLSN = origin_lsn;
3167-
}
3168-
} else {
3169-
if (Mtm->nodes[replication_node-1].restartLSN < end_lsn) {
3170-
Mtm->nodes[replication_node-1].restartLSN = end_lsn;
3171-
}
3172-
}
3178+
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3179+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
31733 341A 180
return duplicate;
31743181
}
31753182

contrib/mmts/multimaster.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@ typedef struct
147147
pgid_t gid; /* Global F438 transaction identifier */
148148
} MtmArbiterMessage;
149149

150+
typedef struct MtmAbortLogicalMessage
151+
{
152+
pgid_t gid;
153+
int origin_node;
154+
XLogRecPtr origin_lsn;
155+
} MtmAbortLogicalMessage;
156+
150157
typedef struct MtmMessageQueue
151158
{
152159
MtmArbiterMessage msg;
@@ -362,7 +369,7 @@ extern PGconn *PQconnectdb_safe(const char *conninfo);
362369
extern void MtmBeginSession(int nodeId);
363370
extern void MtmEndSession(int nodeId, bool unlock);
364371
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
365-
extern void MtmRollbackPreparedTransaction(char const* gid);
372+
extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
366373
extern bool MtmFilterTransaction(char* record, int size);
367374

368375
#endif

contrib/mmts/pglogical_apply.c

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,14 @@ process_remote_message(StringInfo s)
427427
}
428428
case 'A':
429429
{
430-
MtmRollbackPreparedTransaction(messageBody);
430+
MtmAbortLogicalMessage* msg = (MtmAbortLogicalMessage*)messageBody;
431+
int origin_node = msg->origin_node;
432+
Assert(messageSize == sizeof(MtmAbortLogicalMessage));
433+
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
434+
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
435+
}
436+
replorigin_session_origin_lsn = msg->origin_lsn;
437+
MtmRollbackPreparedTransaction(origin_node, msg->gid);
431438
standalone = true;
432439
break;
433440
}
@@ -597,17 +604,16 @@ process_remote_commit(StringInfo in)
597604

598605
/* read fields */
599606
pq_getmsgint64(in); /* commit_lsn */
600-
replorigin_session_origin_lsn = end_lsn = pq_getmsgint64(in); /* end_lsn */
607+
end_lsn = pq_getmsgint64(in); /* end_lsn */
601608
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
602609

603610
origin_node = pq_getmsgbyte(in);
604611
origin_lsn = pq_getmsgint64(in);
605612

606-
if (origin_node != MtmReplicationNodeId) {
607-
replorigin_advance(Mtm->nodes[origin_node-1].originId, origin_lsn, GetXLogInsertRecPtr(),
608-
false /* backward */ , false /* WAL */ );
613+
replorigin_session_origin_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
614+
if (Mtm->nodes[origin_node-1].restartLSN < replorigin_session_origin_lsn) {
615+
Mtm->nodes[origin_node-1].restartLSN = replorigin_session_origin_lsn;
609616
}
610-
611617
Assert(replorigin_session_origin == InvalidRepOriginId);
612618

613619
switch(PGLOGICAL_XACT_EVENT(flags))
@@ -617,9 +623,9 @@ process_remote_commit(StringInfo in)
617623
MTM_LOG2("%d: PGLOGICAL_COMMIT commit", MyProcPid);
618624
if (IsTransactionState()) {
619625
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
620-
MtmBeginSession(MtmReplicationNodeId);
626+
MtmBeginSession(origin_node);
621627
CommitTransactionCommand();
622-
MtmEndSession(MtmReplicationNodeId, true);
628+
MtmEndSession(origin_node, true);
623629
}
624630
break;
625631
}
@@ -632,12 +638,12 @@ process_remote_commit(StringInfo in)
632638
AbortCurrentTransaction();
633639
} else {
634640
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
635-
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s", gid);
641+
MTM_LOG1("PGLOGICAL_PREPARE commit: gid=%s", gid);
636642
BeginTransactionBlock();
637643
CommitTransactionCommand();
638644
StartTransactionCommand();
639645

640-
MtmBeginSession(MtmReplicationNodeId);
646+
MtmBeginSession(origin_node);
641647
/* PREPARE itself */
642648
MtmSetCurrentTransactionGID(gid);
643649
PrepareTransactionBlock(gid);
@@ -650,7 +656,7 @@ process_remote_commit(StringInfo in)
650656
FinishPreparedTransaction(gid, false);
651657
CommitTransactionCommand();
652658
}
653-
MtmEndSession(MtmReplicationNodeId, true);
659+
MtmEndSession(origin_node, true);
654660
}
655661
break;
656662
}
@@ -659,22 +665,22 @@ process_remote_commit(StringInfo in)
659665
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
660666
csn = pq_getmsgint64(in);
661667
gid = pq_getmsgstring(in);
662-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
668+
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx", csn, gid, end_lsn);
663669
MtmResetTransaction();
664670
StartTransactionCommand();
665-
MtmBeginSession(MtmReplicationNodeId);
671+
MtmBeginSession(origin_node);
666672
MtmSetCurrentTransactionCSN(csn);
667673
MtmSetCurrentTransactionGID(gid);
668674
FinishPreparedTransaction(gid, true);
669675
CommitTransactionCommand();
670-
MtmEndSession(MtmReplicationNodeId, true);
676+
MtmEndSession(origin_node, true);
671677
break;
672678
}
673679
case PGLOGICAL_ABORT_PREPARED:
674680
{
675681
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
676682
gid = pq_getmsgstring(in);
677-
MtmRollbackPreparedTransaction(gid);
683+
MtmRollbackPreparedTransaction(origin_node, gid);
678684
break;
679685
}
680686
default:

contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,12 +534,9 @@ pglogical_receiver_main(Datum main_arg)
534534
MtmSpillToFile(spill_file, buf.data, buf.used);
535535
ByteBufferReset(&buf);
536536
}
537-
if (stmt[0] == 'M' && stmt[1] == 'L') {
538-
MTM_LOG3("Process deadlock message from %d", nodeId);
537+
if (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'C' || stmt[1] == 'A')) {
538+
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
539539
MtmExecutor(stmt, rc - hdr_len);
540-
} else if (stmt[0] == 'M' && stmt[1] == 'C') {
541-
MTM_LOG1("Process concurrent DDL message from %d", nodeId);
542-
MtmExecute(stmt, rc - hdr_len);
543540
} else {
544541
ByteBufferAppend(&buf, stmt, rc - hdr_len);
545542
if (stmt[0] == 'C') /* commit */

0 commit comments

Comments
 (0)
29CD
0