8000 Fix bug in flushed_lsn reporting · postgrespro/postgres_cluster@b8c3075 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b8c3075

Browse files
committed
Fix bug in flushed_lsn reporting
1 parent c1f3c26 commit b8c3075

File tree

6 files changed

+80
-58
lines changed

6 files changed

+80
-58
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3535
LANGUAGE C;
3636

3737
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
38-
"xidHashSize" bigint, "gidHashSize" bigint, "oldestSnapshot" bigint, "configChanges" integer);
38+
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" integer, "configChanges" integer);
3939

4040
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
4141
AS 'MODULE_PATHNAME','mtm_get_cluster_state'

contrib/mmts/multimaster.c

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ static int MtmMinRecoveryLag;
200200
static int MtmMaxRecoveryLag;
201201
static int Mtm2PCPrepareRatio;
202202
static int Mtm2PCMinTimeout;
203+
static int MtmGcPeriod;
203204
static bool MtmIgnoreTablesWithoutPk;
204205

205206
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -342,16 +343,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
342343
Snapshot MtmGetSnapshot(Snapshot snapshot)
343344
{
344345
snapshot = PgGetSnapshotData(snapshot);
345-
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
346+
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;
346347
return snapshot;
347348
}
348349

349350

350351
TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
351352
{
352353
TransactionId xmin = PgGetOldestXmin(NULL, false); /* consider all backends */
353-
xmin = MtmAdjustOldestXid(xmin);
354-
return xmin;
354+
if (TransactionIdIsValid(xmin)) {
355+
MtmLock(LW_EXCLUSIVE);
356+
xmin = MtmAdjustOldestXid(xmin);
357+
MtmUnlock();
358+
}
359+
return xmin;
355360
}
356361

357362
bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
@@ -446,53 +451,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
446451
static TransactionId
447452
MtmAdjustOldestXid(TransactionId xid)
448453
{
449-
if (TransactionIdIsValid(xid)) {
450-
MtmTransState *ts, *prev = NULL;
451-
int i;
452-
453-
MtmLock(LW_EXCLUSIVE);
454-
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
455-
if (ts != NULL) {
456-
csn_t oldestSnapshot = ts->snapshot;
457-
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
458-
for (i = 0; i < Mtm->nAllNodes; i++) {
459-
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
460-
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
461-
{
462-
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
463-
}
464-
}
465-
oldestSnapshot -= MtmVacuumDelay*USECS_PER_SEC;
466-
467-
for (ts = Mtm->transListHead;
468-
ts != NULL
469-
&& ts->csn < oldestSnapshot
470-
&& TransactionIdPrecedes(ts->xid, xid)
471-
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
472-
ts->status == TRANSACTION_STATUS_ABORTED);
473-
prev = ts, ts = ts->next)
454+
int i;
455+
MtmTransState *prev = NULL;
456+
MtmTransState *ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
457+
MTM_LOG1("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d", MyProcPid, xid, ts != NULL ? ts->snapshot : 0, ts != NULL ? ts->csn : 0, ts != NULL ? ts->status : -1);
458+
Mtm->gcCount = 0;
459+
if (ts != NULL) {
460+
csn_t oldestSnapshot = ts->snapshot;
461+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
462+
for (i = 0; i < Mtm->nAllNodes; i++) {
463+
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
464+
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
474465
{
475-
if (prev != NULL) {
476-
/* Remove information about too old transactions */
477-
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
478-
}
466+
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
479467
}
480-
}
481-
if (MtmUseDtm)
468+
}
469+
oldestSnapshot -= MtmVacuumDelay*USECS_PER_SEC;
470+
471+
for (ts = Mtm->transListHead;
472+
ts != NULL
473+
&& ts->csn < oldestSnapshot
474+
&& TransactionIdPrecedes(ts->xid, xid)
475+
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
476+
ts->status == TRANSACTION_STATUS_ABORTED);
477+
prev = ts, ts = ts->next)
482478
{
483479
if (prev != NULL) {
484-
Mtm->transListHead = prev;
485-
Mtm->oldestXid = xid = prev->xid;
486-
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
487-
xid = Mtm->oldestXid;
488-
}
489-
} else {
490-
if (prev != NULL) {
491-
Mtm->transListHead = prev;
480+
/* Remove information about too old transactions */
481+
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
492482
}
493483
}
494-
MtmUnlock();
495-
}
484+
}
485+
if (MtmUseDtm)
486+
{
487+
if (prev != NULL) {
488+
Mtm->transListHead = prev;
489+
Mtm->oldestXid = xid = prev->xid;
490+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
491+
xid = Mtm->oldestXid;
492+
}
493+
} else {
494+
if (prev != NULL) {
495+
Mtm->transListHead = prev;
496+
}
497+
}
496498
return xid;
497499
}
498500
/*
@@ -614,7 +616,12 @@ static void
614616
MtmBeginTransaction(MtmCurrentTrans* x)
615617
{
616618
if (x->snapshot == INVALID_CSN) {
617-
MtmLock(LW_EXCLUSIVE);
619+
TransactionId xmin = (Mtm->gcCount >= MtmGcPeriod) ? PgGetOldestXmin(NULL, false) : InvalidTransactionId; /* Get oldest xmin outside critical section */
620+
621+
MtmLock(LW_EXCLUSIVE);
622+
if (TransactionIdIsValid(xmin) && Mtm->gcCount >= MtmGcPeriod) {
623+
MtmAdjustOldestXid(xmin);
624+
}
618625
x->xid = GetCurrentTransactionIdIfAny();
619626
x->isReplicated = false;
620627
x->isDistributed = MtmIsUserTransaction();
@@ -690,7 +697,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
690697
}
691698

692699
MtmLock(LW_EXCLUSIVE);
693-
694700
/*
695701
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
696702
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -716,8 +722,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716722

717723
x->isPrepared = true;
718724
x->csn = ts->csn;
719-
725+
720726
Mtm->transCount += 1;
727+
Mtm->gcCount += 1;
728+
721729
MtmTransactionListAppend(ts);
722730
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
723731
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
@@ -1466,8 +1474,9 @@ static void MtmInitialize()
14661474
Mtm->transListHead = NULL;
14671475
Mtm->transListTail = &Mtm->transListHead;
14681476
Mtm->nReceivers = 0;
1469-
Mtm->timeShift = 0;
1477+
Mtm->timeShift = 0;
14701478
Mtm->transCount = 0;
1479+
Mtm->gcCount = 0;
14711480
Mtm->nConfigChanges = 0;
14721481
Mtm->localTablesHashLoaded = false;
14731482
for (i = 0; i < MtmNodes; i++) {
@@ -1600,6 +1609,21 @@ _PG_init(void)
16001609
if (!process_shared_preload_libraries_in_progress)
16011610
return;
16021611

1612+
DefineCustomIntVariable(
1613+
"multimaster.gc_period",
1614+
"Number of distributed transactions after which garbage collection is started",
1615+
"Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map",
1616+
&MtmGcPeriod,
1617+
MTM_HASH_SIZE/10,
1618+
1,
1619+
INT_MAX,
1620+
PGC_BACKEND,
1621+
0,
1622+
NULL,
1623+
NULL,
1624+
NULL
1625+
);
1626+
16031627
DefineCustomIntVariable(
16041628
"multimaster.max_nodes",
16051629
"Maximal number of cluster nodes",
@@ -2339,7 +2363,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
23392363
values[11] = Int32GetDatum(Mtm->recoverySlot);
23402364
values[12] = Int64GetDatum(hash_get_num_entries(MtmXid2State));
23412365
values[13] = Int64GetDatum(hash_get_num_entries(MtmGid2State));
2342-
values[14] = Int64GetDatum(Mtm->oldestSnapshot);
2366+
values[14] = Int32GetDatum(Mtm->oldestXid);
23432367
values[15] = Int32GetDatum(Mtm->nConfigChanges);
23442368

23452369
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ typedef struct
180180
MtmTransState** transListTail; /* Tail of L1 list of all finished transactionds, used to append new elements.
181181
This list is expected to be in CSN ascending order, by strict order may be violated */
182182
uint64 transCount; /* Counter of transactions perfromed by this node */
183+
uint64 gcCount; /* Number of global transactions performed since last GC */
183184
BgwPool pool; /* Pool of background workers for applying logical replication patches */
184185
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
185186
} MtmState;

contrib/mmts/pglogical_apply.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ static void process_remote_insert(StringInfo s, Relation rel);
7575
static void process_remote_update(StringInfo s, Relation rel);
7676
static void process_remote_delete(StringInfo s, Relation rel);
7777

78-
static int MtmReplicationNode;
79-
8078
/*
8179
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
8280
*
@@ -481,8 +479,8 @@ static void
481479
MtmBeginSession(void)
482480
{
483481
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
484-
MtmLockNode(MtmReplicationNode);
485-
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNode);
482+
MtmLockNode(MtmReplicationNodeId);
483+
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNodeId);
486484
Assert(replorigin_session_origin == InvalidRepOriginId);
487485
replorigin_session_origin = replorigin_by_name(slot_name, false);
488486
MTM_LOG3("%d: Begin setup replorigin session: %d", MyProcPid, replorigin_session_origin);
@@ -498,7 +496,7 @@ MtmEndSession(bool unlock)
498496
replorigin_session_origin = InvalidRepOriginId;
499497
replorigin_session_reset();
500498
if (unlock) {
501-
MtmUnlockNode(MtmReplicationNode);
499+
MtmUnlockNode(MtmReplicationNodeId);
502500
}
503501
MTM_LOG3("%d: End reset replorigin session: %d", MyProcPid, replorigin_session_origin);
504502
}
@@ -513,7 +511,7 @@ process_remote_commit(StringInfo in)
513511
XLogRecPtr end_lsn;
514512
/* read flags */
515513
flags = pq_getmsgbyte(in);
516-
MtmReplicationNode = pq_getmsgbyte(in);
514+
MtmReplicationNodeId = pq_getmsgbyte(in);
517515

518516
/* read fields */
519517
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static volatile sig_atomic_t got_sighup = false;
4747

4848
/* GUC variables */
4949
static int receiver_idle_time = 0;
50-
static bool receiver_sync_mode = false;
50+
static bool receiver_sync_mode = true; /* We need sync mode to have up-to-date values of catalog_xmin in replication slots */
5151

5252
/* Worker name */
5353
static char worker_proc[BGW_MAXLEN];

contrib/mmts/tests/dtmacid.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ int main (int argc, char* argv[])
285285
for (int i = 0; i < cfg.nReaders; i++) {
286286
readers[i].wait();
287287
nSelects += readers[i].selects;
288-
nTransactions += writers[i].transactions;
289288
}
290289

291290
time_t elapsed = getCurrentTime() - start;

0 commit comments

Comments
 (0)
0