8000 WIP: Checking procglobal for active backends · pct960/postgres@a1c01bf · GitHub
[go: up one dir, main page]

Skip to content

Commit a1c01bf

Browse files
committed
WIP: Checking procglobal for active backends
1 parent 425a6d5 commit a1c01bf

File tree

6 files changed

+81
-4
lines changed

6 files changed

+81
-4
lines changed

src/backend/access/transam/xact.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,13 +1377,16 @@ RecordTransactionCommit(void)
13771377
//else
13781378
// elog(INFO, "queue empty");
13791379

1380-
elog(INFO, "Queue empty? = (%d)", SHMQueueEmpty(&(WalSndCtl->SyncRepQueue[Min(synchronous_commit, SYNC_REP_WAIT_FLUSH)])));
1380+
//elog(INFO, "Queue empty? = (%d)", SHMQueueEmpty(&(WalSndCtl->SyncRepQueue[Min(synchronous_commit, SYNC_REP_WAIT_FLUSH)])));
13811381
LWLockRelease(SyncRepLock);
13821382

1383+
elog(INFO, "Are backends waiting outside = (%d)", anyActiveBackends());
1384+
13831385
if(XLogMaxLSN > RecentFlushPtr)
13841386
{
1387+
elog(INFO, "Are backends waiting inside = (%d)", anyActiveBackends());
13851388
SyncRepWaitForLSN(XLogMaxLSN, false);
1386-
elog(INFO, "RO finished waiting for syncrepwaitforlsn!");
1389+
//elog(INFO, "RO finished waiting for syncrepwaitforlsn!");
13871390
}
13881391
//elog(INFO, "RO txn maxLSN = (%d), RecntFlushPtr value = (%d), XactMaxLSN = (%d)", XLogMaxLSN, RecentFlushPtr, XactMaxLSN);
13891392
//elog(INFO, "walsndctl->latch = (%d), XLogMaxLSN = (%d)", WalSndCtl->walsnds->latch, XLogMaxLSN);

src/backend/replication/syncrep.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,12 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit /*, bool readOnlyWait */)
223223
// offsetof(PGPROC, syncRepLinks));
224224

225225
//if(!readOnlyWait || (readOnlyWait && proc))
226-
SyncRepQueueInsert(mode);
226+
SyncRepQueueInsert(mode);
227+
//if(!commit)
228+
// elog(INFO, "RO Queue length = (%d)", SyncRepGetQueueLength(mode));
229+
//else
230+
// elog(INFO, "Write Queue length = (%d)", SyncRepGetQueueLength(mode));
231+
227232
//else
228233
//{
229234
// LWLockRelease(SyncRepLock);
@@ -1248,6 +1253,27 @@ SyncRepGetQueueLength(int mode)
12481253
return queueLength;
12491254
}
12501255

1256+
bool
1257+
areBackendsWaiting()
1258+
{
1259+
uint32 allProcCount = &ProcGlobal->allProcCount;
1260+
1261+
for(int i=0;i<allProcCount;i++)
1262+
{
1263+
PGPROC *backend = GetPGProcByNumber(i);
1264+
1265+
if(backend->pid == 0 || backend->backendId == 0)
1266+
continue;
1267+
1268+
elog(INFO, "waitLSN = (%d), syncRepState = (%d), xid = (%d), xmin = (%d)", backend->waitLSN, backend->syncRepState, backend->xid, backend->xmin);
1269+
1270+
if(backend->xid != InvalidXLogRecPtr && (backend->syncRepState == SYNC_REP_NOT_WAITING || backend->syncRepState == SYNC_REP_WAITING || backend->syncRepState == SYNC_REP_WAIT_COMPLETE))
1271+
return true;
1272+
1273+
}
1274+
return false;
1275+
}
1276+
12511277
/*
12521278
* ===========================================================
12531279
* Synchronous Replication functions executed by any process

src/backend/replication/walsender.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ bool am_db_walsender = false; /* Connected to a database? */
121121
/* User-settable parameters for walsender */
122122
int max_wal_senders = 0; /* the maximum number of concurrent
123123
* walsenders */
124-
int wal_sender_timeout = 10 * 1000; /* maximum time to send one WAL
124+
int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
125125
* data message */
126126
bool log_replication_commands = false;
127127

src/backend/storage/ipc/procarray.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3546,6 +3546,52 @@ MinimumActiveBackends(int min)
35463546
return count >= min;
35473547
}
35483548

3549+
bool
3550+
anyActiveBackends()
3551+
{
3552+
ProcArrayStruct *arrayP = procArray;
3553+
int count = 0;
3554+
int index;
3555+
3556+
/*
3557+
* Note: for speed, we don't acquire ProcArrayLock. This is a little bit
3558+
* bogus, but since we are only testing fields for zero or nonzero, it
3559+
* should be OK. The result is only used for heuristic purposes anyway...
3560+
*/
3561+
for (index = 0; index < arrayP->numProcs; index++)
3562+
{
3563+
int pgprocno = arrayP->pgprocnos[index];
3564+
PGPROC *proc = &allProcs[pgprocno];
3565+
3566+
/*
3567+
* Since we're not holding a lock, need to be prepared to deal with
3568+
* garbage, as someone could have incremented numProcs but not yet
3569+
* filled the structure.
3570+
*
3571+
* If someone just decremented numProcs, 'proc' could also point to a
3572+
* PGPROC entry that's no longer in the array. It still points to a
3573+
* PGPROC struct, though, because freed PGPROC entries just go to the
3574+
* free list and are recycled. Its contents are nonsense in that case,
3575+
* but that's acceptable for this function.
3576+
*/
3577+
if (pgprocno == -1)
3578+
continue; /* do not count deleted entries */
3579+
if (proc == MyProc)
3580+
continue; /* do not count myself */
3581+
if (proc->xid == InvalidTransactionId)
3582+
continue; /* do not count if no XID assigned */
3583+
if (proc->pid == 0)
3584+
continue; /* do not count prepared xacts */
3585+
//if (proc->waitLock != NULL)
3586+
// count++;
3587+
if (proc->lwWaiting)
3588+
count++;
3589+
3590+
}
3591+
3592+
return count > 0;
3593+
}
3594+
35493595
/*
35503596
* CountDBBackends --- count backends that are using specified database
35513597
*/

src/include/replication/slot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
216216
extern void ReplicationSlotsDropDBSlots(Oid dboid);
217217
extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
218218
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
219+
extern bool anyActiveReplicationSlots();
219220
extern int ReplicationSlotIndex(ReplicationSlot *slot);
220221
extern bool ReplicationSlotName(int index, Name name);
221222
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);

src/include/storage/procarray.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ extern pid_t SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReaso
7777
bool conflictPending);
7878

7979
extern bool MinimumActiveBackends(int min);
80+
extern bool anyActiveBackends();
8081
extern int CountDBBackends(Oid databaseid);
8182
extern int CountDBConnections(Oid databaseid);
8283
extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending);

0 commit comments

Comments
 (0)
0