8000 Add loging of filtered transactions in logical decoder · postgrespro/postgres_cluster@7d99788 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7d99788

Browse files
committed
Add loging of filtered transactions in logical decoder
1 parent b958316 commit 7d99788

File tree

5 files changed

+105
-55
lines changed

5 files changed

+105
-55
lines changed

contrib/mmts/multimaster.c

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef struct {
8383
bool isReplicated; /* transaction on replica */
8484
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
8585
bool isPrepared; /* transaction is perpared at first stage of 2PC */
86+
bool isSuspended; /* prepared transaction is suspended because coordinator node is switch to offline */
8687
bool isTransactionBlock; /* is transaction block */
8788
bool containsDML; /* transaction contains DML statements */
8889
XidStatus status; /* transaction status */
@@ -712,7 +713,7 @@ MtmXactCallback(XactEvent event, void *arg)
712713
}
713714

714715
/*
715-
* Check if this is "normal" user trnsaction which should be distributed to other nodes
716+
* Check if this is "normal" user transaction which should be distributed to other nodes
716717
*/
717718
static bool
718719
MtmIsUserTransaction()
@@ -734,6 +735,7 @@ MtmResetTransaction()
734735
x->gtid.xid = InvalidTransactionId;
735736
x->isDistributed = false;
736737
x->isPrepared = false;
738+
x->isSuspended = false;
737739
x->isTwoPhase = false;
738740
x->csn =
739741
x->status = TRANSACTION_STATUS_UNKNOWN;
@@ -763,6 +765,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
763765
x->isReplicated = MtmIsLogicalReceiver;
764766
x->isDistributed = MtmIsUserTransaction();
765767
x->isPrepared = false;
768+
x->isSuspended = false;
766769
x->isTwoPhase = false;
767770
x->isTransactionBlock = IsTransactionBlock();
768771
/* Application name can be changed usnig PGAPPNAME environment variable */
@@ -1004,14 +1007,18 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10041007
A3E2 }
10051008
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
10061009
if (ts->isPrepared) {
1007-
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1008-
}
1009-
if (Mtm->status != MTM_ONLINE) {
1010-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1011-
} else {
1012-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1010+
// GetNewTransactionId(false); /* force increment of transaction counter */
1011+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1012+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1013+
x->isSuspended = true;
1014+
} else {
1015+
if (Mtm->status != MTM_ONLINE) {
1016+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1017+
} else {
1018+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1019+
}
1020+
MtmAbortTransaction(ts);
10131021
}
1014-
MtmAbortTransaction(ts);
10151022
}
10161023
x->status = ts->status;
10171024
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -1078,14 +1085,18 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10781085
}
10791086
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
10801087
if (ts->isPrepared) {
1081-
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1082-
}
1083-
if (Mtm->status != MTM_ONLINE) {
1084-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1085-
} else {
1086-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1088+
// GetNewTransactionId(false); /* force increment of transaction counter */
1089+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1090+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1091+
x->isSuspended = true;
1092+
} else {
1093+
if (Mtm->status != MTM_ONLINE) {
1094+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1095+
} else {
1096+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1097+
}
1098+
MtmAbortTransaction(ts);
10871099
}
1088-
MtmAbortTransaction(ts);
10891100
}
10901101
x->status = ts->status;
10911102
x->xid = ts->xid;
@@ -1293,6 +1304,7 @@ static void MtmStartRecovery()
12931304
MtmLock(LW_EXCLUSIVE);
12941305
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
12951306
MtmSwitchClusterMode(MTM_RECOVERY);
1307+
Mtm->recoveredLSN = InvalidXLogRecPtr;
12961308
MtmUnlock();
12971309
}
12981310

@@ -1604,6 +1616,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
16041616
MTM_LOG1("%d: node %d is caugth-up without locking cluster", MyProcPid, nodeId);
16051617
/* We are lucky: caugth-up without locking cluster! */
16061618
}
1619+
Mtm->recoveredLSN = walLSN;
16071620
MtmEnableNode(nodeId);
16081621
Mtm->nConfigChanges += 1;
16091622
caughtUp = true;
@@ -2075,6 +2088,7 @@ static void MtmInitialize()
20752088
Mtm->walSenderLockerMask = 0;
20762089
Mtm->nodeLockerMask = 0;
20772090
Mtm->reconnectMask = 0;
2091+
Mtm->recoveredLSN = InvalidXLogRecPtr;
20782092
Mtm->nLockers = 0;
20792093
Mtm->nActiveTransactions = 0;
20802094
Mtm->votingTransactions = NULL;
@@ -2102,13 +2116,14 @@ static void MtmInitialize()
21022116
Mtm->nodes[i].con = MtmConnections[i];
21032117
Mtm->nodes[i].flushPos = 0;
21042118
Mtm->nodes[i].lastHeartbeat = 0;
2105-
Mtm->nodes[i].restartLsn = 0;
2119+
Mtm->nodes[i].restartLSN = InvalidXLogRecPtr;
21062120
Mtm->nodes[i].originId = InvalidRepOriginId;
21072121
Mtm->nodes[i].timeline = 0;
2122+
Mtm->nodes[i].recoveredLSN = InvalidXLogRecPtr;
21082123
}
21092124
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
21102125
/* All transaction originated from the current node should be ignored during recovery */
2111-
Mtm->nodes[MtmNodeId-1].restartLsn = (XLogRecPtr)PG_UINT64_MAX;
2126+
Mtm->nodes[MtmNodeId-1].restartLSN = (XLogRecPtr)PG_UINT64_MAX;
21122127
PGSemaphoreCreate(&Mtm->sendSemaphore);
21132128
PGSemaphoreReset(&Mtm->sendSemaphore);
21142129
SpinLockInit(&Mtm->spinlock);
@@ -2850,18 +2865,21 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28502865
*/
28512866
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
28522867
{
2853-
bool recovery = false;
2868+
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
28542869

2855-
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
2870+
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
28562871
{
28572872
if (*shutdown)
28582873
{
28592874
return REPLMODE_EXIT;
28602875
}
2861-
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
28622876
MtmLock(LW_EXCLUSIVE);
2877+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2878+
mode = REPLMODE_CREATE_NEW;
2879+
}
2880+
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
28632881
if (Mtm->status == MTM_RECOVERY) {
2864-
recovery = true;
2882+
mode = REPLMODE_RECOVERED;
28652883
if ((Mtm->recoverySlot == 0 && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId))
28662884
|| Mtm->recoverySlot == nodeId)
28672885
{
@@ -2879,13 +2897,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28792897
/* delay opening of other slots until recovery is completed */
28802898
MtmSleep(STATUS_POLL_DELAY);
28812899
}
2882-
if (recovery) {
2900+
if (mode == REPLMODE_RECOVERED) {
28832901
MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
2902+
} else if (mode == REPLMODE_CREATE_NEW) {
2903+
MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
28842904
} else {
28852905
MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
28862906
}
2887-
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2888-
return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL;
2907+
return mode;
28892908
}
28902909

28912910
static bool MtmIsBroadcast()
@@ -2964,7 +2983,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29642983
MtmIsRecoverySession = true;
29652984
} else if (strcmp(strVal(elem->arg), "recovered") == 0) {
29662985
recoveryCompleted = true;
2967-
} else if (strcmp(strVal(elem->arg), "normal") != 0) {
2986+
} else if (strcmp(strVal(elem->arg), "open_existed") != 0 && strcmp(strVal(elem->arg), "create_new") != 0) {
29682987
elog(ERROR, "Illegal recovery mode %s", strVal(elem->arg));
29692988
}
29702989
} else {
@@ -2976,14 +2995,20 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29762995
} else {
29772996
elog(ERROR, "Restart position is not specified");
29782997
}
2998+
} else if (strcmp("mtm_recovered_pos", elem->defname) == 0) {
2999+
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
3000+
sscanf(strVal(elem->arg), "%lx", &Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3001+
} else {
3002+
elog(ERROR, "Recovered position is not specified");
3003+
}
29793004
}
2980 F438 3005
}
29813006
MtmLock(LW_EXCLUSIVE);
29823007
if (MtmIsRecoverySession) {
29833008
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx", MyProcPid, MtmNodeId, MtmReplicationNodeId, recoveryStartPos);
29843009
Assert(MyReplicationSlot != NULL);
29853010
if (recoveryStartPos < MyReplicationSlot->data.restart_lsn) {
2986-
elog(ERROR, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
3011+
elog(WARNING, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
29873012
}
29883013
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
29893014
MtmDisableNode(MtmReplicationNodeId);
@@ -3132,17 +3157,17 @@ bool MtmFilterTransaction(char* record, int size)
31323157
default:
31333158
break;
31343159
}
3135-
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLsn;
3160+
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31363161

31373162
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3138-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLsn);
3163+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
31393164
if (Mtm->status == MTM_RECOVERY) {
3140-
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
3141-
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
3165+
if (Mtm->nodes[origin_node-1].restartLSN < origin_lsn) {
3166+
Mtm->nodes[origin_node-1].restartLSN = origin_lsn;
31423167
}
31433168
} else {
3144-
if (Mtm->nodes[replication_node-1].restartLsn < end_lsn) {
3145-
Mtm->nodes[replication_node-1].restartLsn = end_lsn;
3169+
if (Mtm->nodes[replication_node-1].restartLSN < end_lsn) {
3170+
Mtm->nodes[replication_node-1].restartLSN = end_lsn;
31463171
}
31473172
}
31483173
return duplicate;
@@ -3757,12 +3782,16 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
37573782
/* ??? Should we do explicit rollback */
37583783
} else {
37593784
CommitTransactionCommand();
3760- 10000
StartTransactionCommand();
3761-
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
3762-
FinishPreparedTransaction(x->gid, false);
3763-
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3764-
} else {
3765-
FinishPreparedTransaction(x->gid, true);
3785+
if (x->isSuspended) {
3786+
elog(WARNING, "Transaction %s is left in prepared state because coordinator onde is not online", x->gid);
3787+
} else {
3788+
StartTransactionCommand();
3789+
if (x->status == TRANSACTION_STATUS_ABORTED) {
3790+
FinishPreparedTransaction(x->gid, false);
3791+
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3792+
} else {
3793+
FinishPreparedTransaction(x->gid, true);
3794+
}
37663795
}
37673796
}
37683797
return true;

contrib/mmts/multimaster.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,11 @@ typedef enum
126126

127127
typedef enum
128128
{
129-
REPLMODE_EXIT, /* receiver should exit */
130-
REPLMODE_RECOVERED, /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
131-
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
132-
REPLMODE_NORMAL /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
129+
REPLMODE_EXIT, /* receiver should exit */
130+
REPLMODE_RECOVERED, /* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
131+
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
132+
REPLMODE_CREATE_NEW, /* destination node is recovered: drop old slot and restart from roveredLsn position */
133+
REPLMODE_OPEN_EXISTED /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
133134
} MtmReplicationMode;
134135

135136
typedef struct
@@ -188,12 +189,13 @@ typedef struct
188189
int receiverPid;
189190
XLogRecPtr flushPos;
190191
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
191-
XLogRecPtr restartLsn;
192+
XLogRecPtr restartLSN;
192193
RepOriginId originId;
193194
int timeline;
194195
void* lockGraphData;
195196
int lockGraphAllocated;
196197
int lockGraphUsed;
198+
XLogRecPtr recoveredLSN;
197199
} MtmNodeInfo;
198200

199201
typedef struct MtmTransState
@@ -264,6 +266,7 @@ typedef struct
264266
uint64 gcCount; /* Number of global transactions performed since last GC */
265267
MtmMessageQueue* sendQueue; /* Messages to be sent by arbiter sender */
266268
MtmMessageQueue* freeQueue; /* Free messages */
269+
XLogRecPtr recoveredLSN; /* LSN at the moment of recovery completion */
267270
BgwPool pool; /* Pool of background workers for applying logical replication patches */
268271
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
269272
} MtmState;

contrib/mmts/pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187187
Assert(false);
188188

189189
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
190-
Assert(txn->xid < 1000 || MtmTransactionRecords >= 2);
190+
//Assert(txn->xid < 1000 || MtmTransactionRecords != 1);
191191
// if (MtmIsFilteredTxn) {
192192
// Assert(MtmTransactionRecords == 0);
193193
// return;

contrib/mmts/pglogical_receiver.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,10 @@ feTimestampDifference(int64 start_time, int64 stop_time,
196196
static char const* const MtmReplicationModeName[] =
197197
{
198198
"exit",
199-
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
200-
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
201-
"normal" /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
199+
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
200+
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
201+
"create_new", /* destination node is recovered: drop old slot and restart from roveredLsn position */
202+
"open_existed" /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
202203
};
203204

204205
static void
@@ -275,7 +276,7 @@ pglogical_receiver_main(Datum main_arg)
275276
}
276277
timeline = Mtm->nodes[nodeId-1].timeline;
277278
count = Mtm->recoveryCount;
278-
279+
279280
/* Establish connection to remote server */
280281
conn = PQconnectdb_safe(connString);
281282
status = PQstatus(conn);
@@ -287,7 +288,9 @@ pglogical_receiver_main(Datum main_arg)
287288
}
288289

289290
query = createPQExpBuffer();
290-
if (mode == REPLMODE_NORMAL && timeline != Mtm->nodes[nodeId-1].timeline) { /* recreate slot */
291+
if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm->nodes[nodeId-1].timeline)
292+
|| mode == REPLMODE_CREATE_NEW)
293+
{ /* recreate slot */
291294
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
292295
res = PQexec(conn, query->data);
293296
PQclear(res);
@@ -320,7 +323,7 @@ pglogical_receiver_main(Datum main_arg)
320323

321324
/* Start logical replication at specified position */
322325
if (mode == REPLMODE_RECOVERED) {
323-
originStartPos = Mtm->nodes[nodeId-1].restartLsn;
326+
originStartPos = Mtm->nodes[nodeId-1].restartLSN;
324327
MTM_LOG1("Restart replication from node %d from position %lx", nodeId, originStartPos);
325328
}
326329
if (originStartPos == InvalidXLogRecPtr && !newTimeline) {
@@ -339,23 +342,26 @@ pglogical_receiver_main(Datum main_arg)
339342
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
340343
} else {
341344
originStartPos = replorigin_get_progress(originId, false);
342-
if (Mtm->nodes[nodeId-1].restartLsn < originStartPos) {
343-
Mtm->nodes[nodeId-1].restartLsn = originStartPos;
345+
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
346+
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
344347
}
345348
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
346349
}
347350
Mtm->nodes[nodeId-1].originId = originId;
348351
CommitTransactionCommand();
352+
} else if (mode == REPLMODE_CREATE_NEW) {
353+
originStartPos = Mtm->nodes[nodeId-1].recoveredLSN;
349354
}
350355

351-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
356+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx', \"mtm_recovered_pos\" '%lx')",
352357
slotName,
353358
(uint32) (originStartPos >> 32),
354359
(uint32) originStartPos,
355360
MULTIMASTER_MAX_PROTO_VERSION,
356361
MULTIMASTER_MIN_PROTO_VERSION,
357362
MtmReplicationModeName[mode],
358-
originStartPos
363+
originStartPos,
364+
Mtm->recoveredLSN
359365
);
360366
res = PQexec(conn, query->data);
361367
if (PQresultStatus(res) != PGRES_COPY_BOTH)
@@ -511,7 +517,7 @@ pglogical_receiver_main(Datum main_arg)
511517
output_written_lsn = Max(walEnd, output_written_lsn);
512518
continue;
513519
}
514-
mode = REPLMODE_NORMAL;
520+
mode = REPLMODE_OPEN_EXISTED;
515521
}
516522
MTM_LOG3("%ld: Receive message %c from node %d", MtmGetSystemTime(), stmt[0], nodeId);
517523
if (buf.used >= MtmTransSpillThreshold*MB) {

src/backend/replication/logical/decode.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,18 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
606606
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
607607
FilterByOrigin(ctx, origin_id))
608608
{
609+
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr)) {
610+
elog(LOG, "Skip transaction %d at %lx because it's origptr %lx is not in snapshot",
611+
xid, buf->endptr, buf->origptr);
612+
}
613+
if (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) {
614+
elog(LOG, "Skip transaction %d at %lx because database id is not matched",
615+
xid, buf->endptr);
616+
}
617+
if (FilterByOrigin(ctx, origin_id)) {
618+
elog(LOG, "Skip transaction %d at %lx is filtered by origin_id %d",
619+
xid, buf->endptr, origin_id);
620+
}
609621
for (i = 0; i < parsed->nsubxacts; i++)
610622
{
611623
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);

0 commit comments

Comments
 (0)
0