8000 Add information about BGW to node status · postgrespro/postgres_cluster@294238c · GitHub
[go: up one dir, main page]

Skip to content
< 8000 header class="HeaderMktg header-logged-out js-details-container js-header Details f4 py-3" role="banner" data-is-top="true" data-color-mode=light data-light-theme=light data-dark-theme=dark>

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

8000
Appearance settings

Commit 294238c

Browse files
committed
Add information about BGW to node status
1 parent ad75913 commit 294238c

File tree

5 files changed

+23
-5
lines changed

5 files changed

+23
-5
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "connStr" text);
39+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "SenderPid" integer, "SenderStartTime" timestamp, "ReceiverPid" integer, "ReceiverStartTime" timestamp, "connStr" text);
4040

4141
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
4242
AS 'MODULE_PATHNAME','mtm_get_nodes_state'

contrib/mmts/multimaster.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2074,7 +2074,8 @@ void MtmDropNode(int nodeId, bool dropSlot)
20742074
static void
20752075
MtmOnProcExit(int code, Datum arg)
20762076
{
2077-
if (MtmReplicationNodeId >= 0) {
2077+
if (MtmReplicationNodeId > 0) {
2078+
Mtm->nodes[MtmReplicationNodeId-1].senderPid = -1;
20782079
MTM_LOG1("WAL-sender to %d is terminated", MtmReplicationNodeId);
20792080
MtmOnNodeDisconnect(MtmReplicationNodeId);
20802081
}
@@ -2086,6 +2087,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20862087
ListCell *param;
20872088
bool recoveryCompleted = false;
20882089
MtmIsRecoverySession = false;
2090+
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
2091+
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime = MtmGetSystemTime();
20892092
foreach(param, args->in_params)
20902093
{
20912094
DefElem *elem = lfirst(param);
@@ -2378,7 +2381,11 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
23782381
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
23792382
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USECS_PER_SEC));
23802383
usrfctx->values[7] = Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].oldestSnapshot);
2381-
usrfctx->values[8] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2384+
usrfctx->values[8] = Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderPid);
2385+
usrfctx->values[9] = Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderStartTime);
2386+
usrfctx->values[10] = Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverPid);
2387+
usrfctx->values[11] = Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverStartTime);
2388+
usrfctx->values[12] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23822389
usrfctx->nodeId += 1;
23832390

23842391
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));

contrib/mmts/multimaster.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
#define Anum_mtm_local_tables_rel_name 2
6161

6262
#define Natts_mtm_cluster_state 16
63-
#define Natts_mtm_nodes_state 9
63+
#define Natts_mtm_nodes_state 13
6464

6565
typedef uint64 csn_t; /* commit serial number */
6666
#define INVALID_CSN ((csn_t)-1)
@@ -125,8 +125,12 @@ typedef struct
125125
MtmConnectionInfo con;
126126
timestamp_t transDelay;
127127
timestamp_t lastStatusChangeTime;
128+
timestamp_t receiverStartTime;
129+
timestamp_t senderStartTime;
130+
int senderPid;
131+
int receiverPid;
128132
XLogRecPtr flushPos;
129-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
133+
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
130134
} MtmNodeInfo;
131135

132136
typedef struct MtmTransState

contrib/mmts/pglogical_proto.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
} else {
147147
csn_t csn = MtmTransactionSnapshot(txn->xid);
148148
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
149+
/*
150+
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151+
* We do not need to send such transactions unless we perform recovery
152+
*/
149153
if (csn == INVALID_CSN && !isRecovery) {
150154
return;
151155
}

contrib/mmts/pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ pglogical_receiver_main(Datum main_arg)
226226

227227
MtmCreateSpillDirectory(nodeId);
228228

229+
Mtm->nodes[nodeId-1].senderPid = MyProcPid;
230+
Mtm->nodes[nodeId-1].senderStartTime = MtmGetSystemTime();
231+
229232
sprintf(worker_proc, "mtm_pglogical_receiver_%d_%d", MtmNodeId, nodeId);
230233

231234
/* We're now ready to receive signals */

0 commit comments

Comments
 (0)
0