8000 eXplicitely set restart LSN · postgrespro/postgres_cluster@6c22363 · GitHub
[go: up one dir, main page]

Skip to content

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6c22363

Browse files
committed
eXplicitely set restart LSN
1 parent 1a0e6ba commit 6c22363

File tree

6 files changed

+48
-46
lines changed

6 files changed

+48
-46
lines changed

contrib/mmts/arbiter.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,14 @@ static void MtmTransReceiver(Datum arg)
912912
switch (msg->code) {
913913
case MSG_READY:
914914
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_READY");
915+
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
916+
elog(WARNING, "Receive READY response for already committed transaction %d from node %d",
917+
ts->xid, msg->node);
918+
continue;
919+
}
915920
if (ts->nVotes >= Mtm->nLiveNodes) {
921+
elog(WARNING, "Receive deteriorated READY response for transaction %d (%s) from node %d",
922+
ts->xid, ts->gid, msg->node);
916923
MtmAbortTransaction(ts);
917924
MtmWakeUpBackend(ts);
918925
} else {
@@ -956,6 +963,8 @@ static void MtmTransReceiver(Datum arg)
956963
case MSG_PREPARED:
957964
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PREPARED");
958965
if (ts->nVotes >= Mtm->nLiveNodes) {
966+
elog(WARNING, "Receive deteriorated PREPARED response for transaction %d (%s) from node %d",
967+
ts->xid, ts->gid, msg->node);
959968
MtmAbortTransaction(ts);
960969
MtmWakeUpBackend(ts);
961970
} else {

contrib/mmts/multimaster.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -388,16 +388,17 @@ MtmInitializeSequence(int64* start, int64* step)
388388

389389
csn_t MtmTransactionSnapshot(TransactionId xid)
390390
{
391-
MtmTransState* ts;
392391
csn_t snapshot = INVALID_CSN;
393-
392+
394393
MtmLock(LW_SHARED);
395-
ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
396-
if (ts != NULL && !ts->isLocal) {
397-
snapshot = ts->snapshot;
394+
if (Mtm->status == MTM_ONLINE) {
395+
MtmTransState* ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
396+
if (ts != NULL && !ts->isLocal) {
397+
snapshot = ts->snapshot;
398+
Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
399+
}
398400
}
399401
MtmUnlock();
400-
401402
return snapshot;
402403
}
403404

@@ -1007,7 +1008,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
10071008
}
10081009
}
10091010
if (!commit && x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
1010-
Assert(Mtm->status != MTM_RECOVERY);
1011+
Assert(Mtm->status != MTM_RECOVERY || Mtm->recoverySlot != MtmNodeId);
10111012
/*
10121013
* Send notification only if ABORT happens during transaction processing at replicas,
10131014
* do not send notification if ABORT is received from master
@@ -2455,29 +2456,32 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
24552456
return REPLMODE_EXIT;
24562457
}
24572458
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
2459+
MtmLock(LW_EXCLUSIVE);
24582460
if (Mtm->status == MTM_RECOVERY) {
24592461
recovery = true;
24602462
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) {
24612463
/* Choose for recovery first available slot */
2462-
MTM_LOG1("Start recovery from node %d", nodeId);
2464+
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
24632465
Mtm->recoverySlot = nodeId;
24642466
Mtm->nReceivers = 0;
24652467
Mtm->recoveryCount += 1;
24662468
Mtm->pglogicalNodeMask = 0;
2467-
FinishAllPreparedTransactions(false);
24682469
for (i = 0; i < Mtm->nAllNodes; i++) {
2469-
Mtm->nodes[i].restartLsn = 0;
2470+
Mtm->nodes[i].restartLsn = InvalidXLogRecPtr;
24702471
}
2472+
MtmUnlock();
2473+
FinishAllPreparedTransactions(false);
24712474
return REPLMODE_RECOVERY;
24722475
}
24732476
}
2477+
MtmUnlock();
24742478
/* delay opening of other slots until recovery is completed */
24752479
MtmSleep(STATUS_POLL_DELAY);
24762480
}
24772481
if (recovery) {
2478-
MTM_LOG1("Recreate replication slot for node %d after end of recovery", nodeId);
2482+
MTM_LOG1("%d: Restart replication for node %d after end of recovery", MyProcPid, nodeId);
24792483
} else {
2480-
MTM_LOG2("%d: Reuse replication slot for node %d", MyProcPid, nodeId);
2484+
MTM_LOG1("%d: Continue replication slot for node %d", MyProcPid, nodeId);
24812485
}
24822486
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
24832487
return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL;

contrib/mmts/pglogical_apply.c

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ typedef struct TupleData
5959
bool changed[MaxTupleAttributeNumber];
6060
} TupleData;
6161

62-
static int MtmTransactionRecords;
6362
static bool inside_tx = false;
6463

6564
static Relation read_rel(StringInfo s, LOCKMODE mode);
@@ -529,6 +528,8 @@ MtmEndSession(bool unlock)
529528
if (replorigin_session_origin != InvalidRepOriginId) {
530529
MTM_LOG2("%d: Begin reset replorigin session for node %d: %d, progress %lx", MyProcPid, MtmReplicationNodeId, replorigin_session_origin, replorigin_session_get_progress(false));
531530
replorigin_session_origin = InvalidRepOriginId;
531+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
532+
replorigin_session_origin_timestamp = 0;
532533
replorigin_session_reset();
533534
if (unlock) {
534535
MtmUnlockNode(MtmReplicationNodeId);
@@ -540,42 +541,25 @@ MtmEndSession(bool unlock)
540541
static void
541542
process_remote_commit(StringInfo in)
542543
{
543-
int i;
544544
uint8 flags;
545545
csn_t csn;
546546
const char *gid = NULL;
547547
XLogRecPtr end_lsn;
548548
XLogRecPtr origin_lsn;
549-
RepOriginId originId;
550-
int n_records;
549+
int origin_node;
551550
/* read flags */
552551
flags = pq_getmsgbyte(in);
553552
MtmReplicationNodeId = pq_getmsgbyte(in);
554553

555-
n_records = pq_getmsgint(in, 4);
556-
if (MtmTransactionRecords != n_records) {
557-
elog(ERROR, "Transaction %d flags %d contains %d records instead of %d", MtmGetCurrentTransactionId(), flags, MtmTransactionRecords, n_records);
558-
}
559-
560554
/* read fields */
561555
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
562556
end_lsn = pq_getmsgint64(in); /* end_lsn */
563557
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
564558

565-
originId = (RepOriginId)pq_getmsgint(in, 2);
559+
origin_node = pq_getmsgbyte(in);
566560
origin_lsn = pq_getmsgint64(in);
561+
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
567562

568-
if (originId != InvalidRepOriginId) {
569-
for (i = 0; i < Mtm->nAllNodes; i++) {
570-
if (Mtm->nodes[i].originId == originId) {
571-
Mtm->nodes[i].restartLsn = origin_lsn;
572-
break;
573-
}
574-
}
575-
if (i == Mtm->nAllNodes) {
576-
elog(WARNING, "Failed to map origin %d", originId);
577-
}
578-
}
579563
Assert(replorigin_session_origin == InvalidRepOriginId);
580564

581565
switch(PGLOGICAL_XACT_EVENT(flags))
@@ -677,8 +661,6 @@ process_remote_insert(StringInfo s, Relation rel)
677661
ScanKey *index_keys;
678662
int i;
679663

680-
MtmTransactionRecords += 1;
681-
682664
estate = create_rel_estate(rel);
683665
newslot = ExecInitExtraTupleSlot(estate);
684666
oldslot = ExecInitExtraTupleSlot(estate);
@@ -777,8 +759,6 @@ process_remote_update(StringInfo s, Relation rel)
777759
ScanKeyData skey[INDEX_MAX_KEYS];
778760
HeapTuple remote_tuple = NULL;
779761

780-
MtmTransactionRecords += 1;
781-
782762
action = pq_getmsgbyte(s);
783763

784764
/* old key present, identifying key changed */
@@ -896,8 +876,6 @@ process_remote_delete(StringInfo s, Relation rel)
896876
ScanKeyData skey[INDEX_MAX_KEYS];
897877
bool found_old;
898878

899-
MtmTransactionRecords += 1;
900-
901879
estate = create_rel_estate(rel);
902880
oldslot = ExecInitExtraTupleSlot(estate);
903881
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
@@ -985,7 +963,6 @@ void MtmExecutor(int id, void* work, size_t size)
985963
}
986964
MemoryContextSwitchTo(ApplyContext);
987965
replorigin_session_origin = InvalidRepOriginId;
988-
MtmTransactionRecords = 0;
989966
PG_TRY();
990967
{
991968
while (true) {

contrib/mmts/pglogical_proto.c

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,25 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
pq_sendbyte(out, MtmNodeId);
187187

188188
Assert(txn->xact_action != XLOG_XACT_PREPARE || txn->xid < 1000 || MtmTransactionRecords >= 2);
189-
pq_sendint(out, MtmTransactionRecords, 4);
190-
189+
191190
/* send fixed fields */
192191
pq_sendint64(out, commit_lsn);
193192
pq_sendint64(out, txn->end_lsn);
194193
pq_sendint64(out, txn->commit_time);
195194

196-
pq_sendint(out, txn->origin_id, 2);
195+
if (txn->origin_id != InvalidRepOriginId) {
196+
int i;
197+
for (i = 0; i < Mtm->nAllNodes && Mtm->nodes[i].originId != txn->origin_id; i++);
198+
if (i == Mtm->nAllNodes) {
199+
elog(WARNING, "Failed to map origin %d", txn->origin_id);
200+
i = MtmNodeId-1;
201+
} else {
202+
//Assert(i == MtmNodeId-1 || txn->origin_lsn != InvalidXLogRecPtr);
203+
}
204+
pq_sendbyte(out, i+1);
205+
} else {
206+
pq_sendbyte(out, MtmNodeId);
207+
}
197208
pq_sendint64(out, txn->origin_lsn);
198209

199210
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {

contrib/mmts/pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ pglogical_receiver_main(Datum main_arg)
247247
{
248248
int count;
249249
ConnStatusType status;
250-
XLogRecPtr originStartPos = 0;
250+
XLogRecPtr originStartPos = InvalidXLogRecPtr;
251251

252252
/*
253253
* Determine when and how we should open replication slot.
@@ -306,8 +306,9 @@ pglogical_receiver_main(Datum main_arg)
306306
/* Start logical replication at specified position */
307307
if (mode == REPLMODE_RECOVERED) {
308308
originStartPos = Mtm->nodes[nodeId-1].restartLsn;
309+
MTM_LOG1("Restart replication from node %d from position %lx", nodeId, originStartPos);
309310
}
310-
if (originStartPos == 0) {
311+
if (originStartPos == InvalidXLogRecPtr) {
311312
StartTransactionCommand();
312313
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
313314
originId = replorigin_by_name(originName, true);

contrib/mmts/tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ if [ "$1" = 'postgres' ]; then
6161

6262
############################################################################
6363

64-
CONNSTRS='dbname=postgres host=node1, dbname=postgres host=node2, dbname=postgres host=node3'
64+
CONNSTRS='dbname=postgres user=postgres host=node1, dbname=postgres user=postgres host=node2, dbname=postgres user=postgres host=node3'
6565
RAFT_PEERS='1:node1:6666, 2:node2:6666, 3:node3:6666'
6666

6767
cat <<-EOF >> $PGDATA/postgresql.conf

0 commit comments

Comments
 (0)
0