8000 Add mtm.get_last_csn function · postgrespro/postgres_cluster@e0862d9 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e0862d9

Browse files
committed
Add mtm.get_last_csn function
1 parent 75b574f commit e0862d9

File tree

4 files changed

+20
-5
lines changed

4 files changed

+20
-5
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ CREATE FUNCTION mtm.get_csn(tid xid) RETURNS bigint
3131
AS 'MODULE_PATHNAME','mtm_get_csn'
3232
LANGUAGE C;
3333

34+
CREATE FUNCTION mtm.get_last_csn() RETURNS bigint
35+
AS 'MODULE_PATHNAME','mtm_get_last_csn'
36+
LANGUAGE C;
37+
3438

3539
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "connStr" text);
3640

contrib/mmts/multimaster.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_poll_node);
108108
PG_FUNCTION_INFO_V1(mtm_recover_node);
109109
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
110110
PG_FUNCTION_INFO_V1(mtm_get_csn);
111+
PG_FUNCTION_INFO_V1(mtm_get_last_csn);
111112
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
112113
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
113114
PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
@@ -823,11 +824,11 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
823824
if (ts != NULL) {
824825
if (commit) {
825826
Assert(ts->status == TRANSACTION_STATUS_UNKNOWN);
826-
ts->status = TRANSACTION_STATUS_COMMITTED;
827827
if (x->csn > ts->csn) {
828828
ts->csn = x->csn;
829829
MtmSyncClock(ts->csn);
830830
}
831+
ts->status = TRANSACTION_STATUS_COMMITTED;
831832
} else {
832833
ts->status = TRANSACTION_STATUS_ABORTED;
833834
}
@@ -1462,6 +1463,7 @@ static void MtmInitialize()
14621463
Mtm->recoverySlot = 0;
14631464
Mtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);
14641465
Mtm->csn = MtmGetCurrentTime();
1466+
Mtm->lastCsn = INVALID_CSN;
14651467
Mtm->oldestXid = FirstNormalTransactionId;
14661468
Mtm->nLiveNodes = MtmNodes;
14671469
Mtm->nAllNodes = MtmNodes;
@@ -2295,6 +2297,12 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
22952297
PG_RETURN_INT64(MtmTx.snapshot);
22962298
}
22972299

2300+
Datum
2301+
mtm_get_last_csn(PG_FUNCTION_ARGS)
2302+
{
2303+
PG_RETURN_INT64(Mtm->lastCsn);
2304+
}
2305+
22982306
Datum
22992307
mtm_get_csn(PG_FUNCTION_ARGS)
23002308
{

contrib/mmts/multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ typedef struct
172172
int nActiveTransactions; /* Nunmber of active 2PC transactions */
173173
int nConfigChanges; /* Number of cluster configuration changes */
174174
int64 timeShift; /* Local time correction */
175-
csn_t csn; /* Last obtained CSN: used to provide unique acending CSNs based on system time */
175+
csn_t csn; /* Last obtained timestamp: used to provide unique acending CSNs based on system time */
176+
csn_t lastCsn; /* CSN of last committed transaction */
176177
MtmTransState* votingTransactions; /* L1-list of replicated transactions sendings notifications to coordinator.
177178
This list is used to pass information to mtm-sender BGW */
178179
MtmTransState* transListHead; /* L1 list of all finished transactions present in xid2state hash.

contrib/mmts/tests/dtmacid.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ void* reader(void* arg)
130130
while ((c2 = random() % conns.size()) == c1);
131131
work txn1(*conns[c1]);
132132
work txn2(*conns[c2]);
133-
result r1 = txn1.exec("select v,xmin,xmax,mtm.get_csn(xmin) from t order by u");
134-
result r2 = txn2.exec("select v,xmin,xmax,mtm.get_csn(xmin) from t order by u");
133+
result r1 = txn1.exec("select v,xmin,xmax,mtm.get_csn(xmin),mtm.get_csn(xmax),mtm.get_snapshot(),mtm.get_last_csn() from t order by u");
134+
result r2 = txn2.exec("select v,xmin,xmax,mtm.get_csn(xmin),mtm.get_csn(xmax),mtm.get_snapshot(),mtm.get_last_csn() from t order by u");
135135
int delta = 0;
136136
for (int i=0; i < cfg.nAccounts; i++) {
137137
int diff = r1[i][0].as(int()) - r2[i][0].as(int());
@@ -140,7 +140,9 @@ void* reader(void* arg)
140140
delta = diff;
141141
if (delta < 0) lt++; else gt++;
142142
} else if (delta != diff) {
143-
printf("Inconsistency found for record %d: [%d,%d]->%ld vs [%d,%d]->%ld\n", i, r1[i][1].as(int()), r1[i][2].as(int()), r1[i][3].as(int64_t()), r2[i][1].as(int()), r2[i][2].as(int()), r2[i][3].as(int64_t()));
143+
printf("Inconsistency found for record %d: [%d,%d]->[%ld,%ld] (snapshot %ld, last CSN %ld) vs. [%d,%d]->[%ld,%ld] (snapshot %ld, last CSN %ld)\n", i,
144+
r1[i][1].as(int()), r1[i][2].as(int()), r1[i][3].as(int64_t()), r1[i][4].as(int64_t()), r1[i][5].as(int64_t()), r1[i][6].as(int64_t()),
145+
r2[i][1].as(int()), r2[i][2].as(int()), r2[i][3].as(int64_t()), r2[i][4].as(int64_t()), r2[i][5].as(int64_t()), r2[i][6].as(int64_t()));
144146
}
145147
}
146148
}

0 commit comments

Comments
 (0)
0