10000 Filter applied transactions · postgrespro/postgres_cluster@ab52513 · GitHub
[go: up one dir, main page]

Skip to content

Commit ab52513

Browse files
committed
Filter applied transactions
1 parent cab8515 commit ab52513

File tree

6 files changed

+143
-50
lines changed

6 files changed

+143
-50
lines changed

contrib/mmts/arbiter.c

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ static void MtmSetSocketOptions(int sd)
313313

314314
static void MtmCheckResponse(MtmArbiterMessage* resp)
315315
{
316-
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) && !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)) {
316+
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
317+
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
318+
&& Mtm->status != MTM_RECOVERY
319+
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
320+
{
317321
elog(WARNING, "Node %d thinks that I was dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], messageKindText[resp->code]);
318-
if (Mtm->status != MTM_RECOVERY) {
319-
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
320-
MtmSwitchClusterMode(MTM_RECOVERY);
321-
}
322+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
323+
MtmSwitchClusterMode(MTM_RECOVERY);
322324
}
323325
}
324326

@@ -565,7 +567,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
565567
static int MtmReadFromNode(int node, void* buf, int buf_size)
566568
{
567569
int rc = MtmReadSocket(sockets[node], buf, buf_size);
568-
if (rc < 0) {
570+
if (rc <= 0) {
569571
elog(WARNING, "Arbiter failed to read from node=%d, rc=%d, errno=%d", node+1, rc, errno);
570572
MtmDisconnect(node);
571573
}
@@ -961,6 +963,7 @@ static void MtmReceiver(Datum arg)
961963
elog(WARNING, "Ignore response for unexisted transaction %d from node %d", msg->dxid, node);
962964
continue;
963965
}
966+
Assert(msg->code == MSG_ABORTED || strcmp(msg->gid, ts->gid) == 0);
964967
if (BIT_CHECK(ts->votedMask, node-1)) {
965968
elog(WARNING, "Receive deteriorated %s response for transaction %d (%s) from node %d",
966969
messageKindText[msg->code], ts->xid, ts->gid, node);
@@ -994,6 +997,8 @@ static void MtmReceiver(Datum arg)
994997
MtmWakeUpBackend(ts);
995998
} else {
996999
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1000+
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
1001+
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
9971002
ts->isPrepared = true;
9981003
if (ts->isTwoPhase) {
9991004
MtmWakeUpBackend(ts);
@@ -1052,9 +1057,11 @@ static void MtmReceiver(Datum arg)
10521057
ts->csn = MtmAssignCSN();
10531058
MtmAdjustSubtransactions(ts);
10541059
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
1055-
} else {
1056-
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
1060+
} else if (ts->status == TRANSACTION_STATUS_ABORTED) {
10571061
MtmSend2PCMessage(ts, MSG_ABORTED);
1062+
} else {
1063+
elog(WARNING, "Transaction %s is already %s",
1064+
ts->gid, ts->status == TRANSACTION_STATUS_COMMITTED ? "committed" : "prepared");
10581065
}
10591066
break;
10601067
default:

contrib/mmts/multimaster.c

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "miscadmin.h"
1616

1717
#include "libpq-fe.h"
18+
#include "lib/stringinfo.h"
19+
#include "libpq/pqformat.h"
1820
#include "common/username.h"
1921

2022
#include "postmaster/postmaster.h"
@@ -926,7 +928,9 @@ MtmVotingCompleted(MtmTransState* ts)
926928
ts->votingCompleted = true;
927929
ts->status = TRANSACTION_STATUS_UNKNOWN;
928930
return true;
929-
} else {
931+
} else {
932+
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
933+
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
930934
ts->isPrepared = true;
931935
if (ts->isTwoPhase) {
932936
ts->votingCompleted = true;
@@ -980,9 +984,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980984
MtmResetTransaction();
981985
} else {
982986
int result = 0;
983-
987+
int nConfigChanges = Mtm->nConfigChanges;
984988
/* Wait votes from all nodes until: */
985-
while (!MtmVotingCompleted(ts))
989+
while (!MtmVotingCompleted(ts)
990+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
986991
{
987992
MtmUnlock();
988993
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Start");
@@ -998,8 +1003,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9981003
MtmLock(LW_EXCLUSIVE);
9991004
}
10001005
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1006+
if (ts->isPrepared) {
1007+
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1008+
}
1009+
if (Mtm->status != MTM_ONLINE) {
1010+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1011+
} else {
1012+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1013+
}
10011014
MtmAbortTransaction(ts);
1002-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
10031015
}
10041016
x->status = ts->status;
10051017
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -1032,6 +1044,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10321044
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
10331045
} else {
10341046
int result = 0;
1047+
int nConfigChanges = Mtm->nConfigChanges;
10351048

10361049
Assert(tm->state != NULL);
10371050
MTM_LOG3("Commit prepared transaction %d with gid='%s'", x->xid, x->gid);
@@ -1046,7 +1059,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10461059
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10471060

10481061
/* Wait votes from all nodes until: */
1049-
while (!MtmVotingCompleted(ts))
1062+
while (!MtmVotingCompleted(ts)
1063+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
10501064
{
10511065
MtmUnlock();
10521066
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Start");
@@ -1063,8 +1077,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10631077
}
10641078
}
10651079
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1080+
if (ts->isPrepared) {
1081+
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1082+
}
1083+
if (Mtm->status != MTM_ONLINE) {
1084+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1085+
} else {
1086+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1087+
}
10661088
MtmAbortTransaction(ts);
1067-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
10681089
}
10691090
x->status = ts->status;
10701091
x->xid = ts->xid;
@@ -1166,11 +1187,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11661187
}
11671188
ts->status = TRANSACTION_STATUS_ABORTED;
11681189
ts->isLocal = true;
1190+
ts->isPrepared = false;
11691191
ts->snapshot = x->snapshot;
1192+
ts->isTwoPhase = x->isTwoPhase;
11701193
ts->csn = MtmAssignCSN();
11711194
ts->gtid = x->gtid;
11721195
ts->nSubxids = 0;
11731196
ts->votingCompleted = true;
1197+
strcpy(ts->gid, x->gid);
11741198
if (ts->isActive) {
11751199
ts->isActive = false;
11761200
Assert(Mtm->nActiveTransactions != 0);
@@ -1226,8 +1250,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12261250
int i;
12271251
for (i = 0; i < Mtm->nAllNodes; i++)
12281252
{
1229-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i) && TransactionIdIsValid(ts->xids[i]))
1253+
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
12301254
{
1255+
Assert(TransactionIdIsValid(ts->xids[i]));
12311256
msg.node = i+1;
12321257
msg.dxid = ts->xids[i];
12331258
MtmSendMessage(&msg);
@@ -1655,7 +1680,7 @@ MtmCheckClusterLock()
16551680
continue;
16561681
} else {
16571682
/* All lockers are synchronized their logs */
1658-
/* Remove lock and mark them as rceovered */
1683+
/* Remove lock and mark them as recovered */
16591684
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)", Mtm->nLockers, (long) Mtm->nodeLockerMask);
16601685
Assert(Mtm->walSenderLockerMask == 0);
16611686
Assert((Mtm->nodeLockerMask & Mtm->disabledNodeMask) == Mtm->nodeLockerMask);
@@ -2082,6 +2107,8 @@ static void MtmInitialize()
20822107
Mtm->nodes[i].timeline = 0;
20832108
}
20842109
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
2110+
/* All transaction originated from the current node should be ignored during recovery */
2111+
Mtm->nodes[MtmNodeId-1].restartLsn = (XLogRecPtr)PG_UINT64_MAX;
20852112
PGSemaphoreCreate(&Mtm->sendSemaphore);
20862113
PGSemaphoreReset(&Mtm->sendSemaphore);
20872114
SpinLockInit(&Mtm->spinlock);
@@ -2806,12 +2833,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28062833
Assert(!IsTransactionState());
28072834
MtmResetTransaction();
28082835
StartTransactionCommand();
2809-
#if 0
2810-
if (Mtm->nodes[MtmNodeId-1].originId == InvalidRepOriginId) {
2811-
/* This dummy origin is used for local commits/aborts which should not be replicated */
2812-
Mtm->nodes[MtmNodeId-1].originId = replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId));
2813-
}
2814-
#endif
2836+
28152837
MtmBeginSession(MtmNodeId);
28162838
MtmSetCurrentTransactionCSN(ts->csn);
28172839
MtmSetCurrentTransactionGID(ts->gid);
@@ -2828,7 +2850,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28282850
*/
28292851
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
28302852
{
2831-
int i;
28322853
bool recovery = false;
28332854

28342855
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
@@ -2850,9 +2871,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28502871
Mtm->nReceivers = 0;
28512872
Mtm->recoveryCount += 1;
28522873
Mtm->pglogicalNodeMask = 0;
2853-
for (i = 0; i < Mtm->nAllNodes; i++) {
2854-
Mtm->nodes[i].restartLsn = InvalidXLogRecPtr;
2855-
}
28562874
MtmUnlock();
28572875
return REPLMODE_RECOVERY;
28582876
}
@@ -3069,6 +3087,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30693087
return isDistributed;
30703088
}
30713089

3090+
bool MtmFilterTransaction(char* record, int size)
3091+
{
3092+
StringInfoData s;
3093+
uint8 flags;
3094+
XLogRecPtr origin_lsn;
3095+
XLogRecPtr end_lsn;
3096+
int replication_node;
3097+
int origin_node;
3098+
char const* gid = "";
3099+
bool duplicate;
3100+
3101+
s.data = record;
3102+
s.len = size;
3103+
s.maxlen = -1;
3104+
s.cursor = 0;
3105+
3106+
Assert(pq_getmsgbyte(&s) == 'C');
3107+
flags = pq_getmsgbyte(&s); /* flags */
3108+
replication_node = pq_getmsgbyte(&s);
3109+
3110+
/* read fields */
3111+
pq_getmsgint64(&s); /* commit_lsn */
3112+
end_lsn = pq_getmsgint64(&s); /* end_lsn */
3113+
pq_getmsgint64(&s); /* commit_time */
3114+
3115+
origin_node = pq_getmsgbyte(&s);
3116+
origin_lsn = pq_getmsgint64(&s);
3117+
3118+
Assert(replication_node == MtmReplicationNodeId &&
3119+
origin_node != 0 &&
3120+
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
3121+
3122+
switch(PGLOGICAL_XACT_EVENT(flags))
3123+
{
3124+
case PGLOGICAL_PREPARE:
3125+
case PGLOGICAL_ABORT_PREPARED:
3126+
gid = pq_getmsgstring(&s);
3127+
break;
3128+
case PGLOGICAL_COMMIT_PREPARED:
3129+
pq_getmsgint64(&s); /* CSN */
3130+
gid = pq_getmsgstring(&s);
3131+
break;
3132+
default:
3133+
break;
3134+
}
3135+
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLsn;
3136+
3137+
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3138+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLsn);
3139+
if (Mtm->status == MTM_RECOVERY) {
3140+
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
3141+
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
3142+
}
3143+
} else {
3144+
if (Mtm->nodes[replication_node-1].restartLsn < end_lsn) {
3145+
Mtm->nodes[replication_node-1].restartLsn = end_lsn;
3146+
}
3147+
}
3148+
return duplicate;
3149+
}
3150+
30723151
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
30733152
{
30743153
hooks->startup_hook = MtmReplicationStartupHook;

contrib/mmts/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,4 +360,6 @@ extern void MtmBeginSession(int nodeId);
360360
extern void MtmEndSession(int nodeId, bool unlock);
361361
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
362362
extern void MtmRollbackPreparedTransaction(char const* gid);
363+
extern bool MtmFilterTransaction(char* record, int size);
364+
363365
#endif

contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ process_remote_begin(StringInfo s)
344344
Assert(gtid.node > 0);
345345

346346
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
347+
MtmResetTransaction();
347348
#if 1
348349
if (BIT_CHECK(Mtm->disabledNodeMask, gtid.node-1)) {
349350
elog(WARNING, "Ignore transaction %d from disabled node %d", gtid.xid, gtid.node);
350-
MtmResetTransaction();
351351
return false;
352352
}
353353
#endif
@@ -603,9 +603,6 @@ process_remote_commit(StringInfo in)
603603
origin_node = pq_getmsgbyte(in);
604604
origin_lsn = pq_getmsgint64(in);
605605

606-
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
607-
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
608-
}
609606
if (origin_node != MtmReplicationNodeId) {
610607
replorigin_advance(Mtm->nodes[origin_node-1].originId, origin_lsn, GetXLogInsertRecPtr(),
611608
false /* backward */ , false /* WAL */ );

contrib/mmts/pglogical_receiver.c

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ pglogical_receiver_main(Datum main_arg)
339339
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
340340
} else {
341341
originStartPos = replorigin_get_progress(originId, false);
342+
if (Mtm->nodes[nodeId-1].restartLsn < originStartPos) {
343+
Mtm->nodes[nodeId-1].restartLsn = originStartPos;
344+
}
342345
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
343346
}
344347
Mtm->nodes[nodeId-1].originId = originId;
@@ -535,27 +538,32 @@ pglogical_receiver_main(Datum main_arg)
535538
ByteBufferAppend(&buf, stmt, rc - hdr_len);
536539
if (stmt[0] == 'C') /* commit */
537540
{
538-
if (spill_file >= 0) {
539-
ByteBufferAppend(&buf, ")", 1);
540-
pq_sendbyte(&spill_info, '(');
541-
pq_sendint(&spill_info, buf.used, 4);
542-
MtmSpillToFile(spill_file, buf.data, buf.used);
543-
MtmCloseSpillFile(spill_file);
544-
MtmExecute(spill_info.data, spill_info.len);
545-
spill_file = -1;
546-
resetStringInfo(&spill_info);
547-
} else {
548-
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
549-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
550-
timestamp_t stop, start = MtmGetSystemTime();
551-
MtmExecutor(buf.data, buf.used);
552-
stop = MtmGetSystemTime();
553-
if (stop - start > USECS_PER_SEC) {
554-
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
541+
if (!MtmFilterTransaction(stmt, rc - hdr_len)) {
542+
if (spill_file >= 0) {
543+
ByteBufferAppend(&buf, ")", 1);
544+
pq_sendbyte(&spill_info, '(');
545+
pq_sendint(&spill_info, buf.used, 4);
546+
MtmSpillToFile(spill_file, buf.data, buf.used);
547+
MtmCloseSpillFile(spill_file);
548+
MtmExecute(spill_info.data, spill_info.len);
549+
spill_file = -1;
550+
resetStringInfo(&spill_info);
551+
} else {
552+
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
553+
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
554+
timestamp_t stop, start = MtmGetSystemTime();
555+
MtmExecutor(buf.data, buf.used);
556+
stop = MtmGetSystemTime();
557+
if (stop - start > USECS_PER_SEC) {
558+
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
559+
}
560+
} else {
561+
MtmExecute(buf.data, buf.used);
555562
}
556-
} else {
557-
MtmExecute(buf.data, buf.used);
558563
}
564+
} else if (spill_file >= 0) {
565+
MtmCloseSpillFile(spill_file);
566+
spill_file = -1;
559567
}
560568
ByteBufferReset(&buf);
561569
}

contrib/mmts/tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ if [ "$1" = 'postgres' ]; then
6464
CONNSTRS='dbname=postgres user=postgres host=node1, dbname=postgres user=postgres host=node2, dbname=postgres user=postgres host=node3'
6565
RAFT_PEERS='1:node1:6666, 2:node2:6666, 3:node3:6666'
6666

67-
# log_line_prefix = '%t: '
6867

6968
cat <<-EOF >> $PGDATA/postgresql.conf
7069
listen_addresses='*'
@@ -80,6 +79,7 @@ if [ "$1" = 'postgres' ]; then
8079
log_checkpoints = on
8180
checkpoint_timeout = 30
8281
log_autovacuum_min_duration = 0
82+
log_line_prefix = '%t: '
8383
8484
multimaster.workers = 4
8585
multimaster.max_nodes = 3

0 commit comments

Comments
 (0)
0