10000 Add comments to DTM and remove deteriorated code · postgrespro/postgres_cluster@d9c18ab · GitHub
[go: up one dir, main page]

Skip to content

Commit d9c18ab

Browse files
committed
Add comments to DTM and remove deteriorated code
1 parent 2d626c2 commit d9c18ab

File tree

7 files changed

+81
-181
lines changed

7 files changed

+81
-181
lines changed

contrib/pg_xtm/dtmd/src/main.c

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,7 @@ static char *onreserve(void *stream, void *clientdata, cmd_t *cmd) {
224224
char head[1+16+16+1];
225225
sprintf(head, "+%016llx%016llx", minxid, maxxid);
226226

227-
Snapshot s;
228-
gen_snapshot(&s);
229-
char *snapser = snapshot_serialize(&s);
230-
231-
return destructive_concat(strdup(head), snapser);
227+
return strdup(head);
232228
}
233229

234230
static xid_t get_global_xmin() {

contrib/pg_xtm/libdtm.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
407407
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
408408
// is guaranteed to be at least nXids.
409409
// In other words, *first ≥ xid and result ≥ nXids.
410-
// Also sets the 'active' snapshot, which is used as a container for the list
411-
// of active global transactions.
412-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active)
410+
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
413411
{
414412
bool ok;
415413
xid_t xmin, xmax;
@@ -423,7 +421,6 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapsho
423421

424422
if (!dtm_read_hex16(dtm, &xmin)) goto failure;
425423
if (!dtm_read_hex16(dtm, &xmax)) goto failure;
426-
if (!dtm_read_snapshot(dtm, active)) goto failure;
427424

428425
*first = xmin;
429426
count = xmax - xmin + 1;

contrib/pg_xtm/libdtm.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3636
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
3737
// is guaranteed to be at least nXids.
3838
// In other words, *first ≥ xid and result ≥ nXids.
39-
// Also sets the 'active' snapshot, which is used as a container for the list
40-
// of active global transactions.
41-
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first, Snapshot active);
39+
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first);
4240

4341
#endif

contrib/pg_xtm/pg_dtm.c

Lines changed: 76 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,9 @@ typedef struct
4646
{
4747
LWLockId hashLock;
4848
LWLockId xidLock;
49-
TransactionId minXid;
50-
TransactionId nextXid;
51-
size_t nReservedXids;
52-
SnapshotData activeSnapshot;
49+
TransactionId minXid; /* XID of oldest transaction visible by any active transaction (local or global) */
50+
TransactionId nextXid; /* next XID for local transaction */
51+
size_t nReservedXids; /* number of XIDs reserved for local transactions */
5352
} DtmState;
5453

5554

@@ -60,15 +59,12 @@ void _PG_init(void);
6059
void _PG_fini(void);
6160

6261
static Snapshot DtmGetSnapshot(Snapshot snapshot);
63-
static void DtmMergeSnapshots(Snapshot dst, Snapshot src);
64-
static void DtmMergeWithActiveSnapshot(Snapshot snapshot);
6562
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
6663
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
6764
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
6865
static void DtmUpdateRecentXmin(Snapshot snapshot);
6966
static void DtmInitialize(void);
7067
static void DtmXactCallback(XactEvent event, void *arg);
71-
static bool DtmTransactionIdIsInProgress(TransactionId xid);
7268
static TransactionId DtmGetNextXid(void);
7369
static TransactionId DtmGetNewTransactionId(bool isSubXact);
7470
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -91,7 +87,7 @@ static bool DtmGlobalXidAssigned;
9187
static int DtmLocalXidReserve;
9288
static int DtmCurcid;
9389
static Snapshot DtmLastSnapshot;
94-
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, DtmTransactionIdIsInProgress, DtmGetGlobalTransactionId };
90+
static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionStatus, DtmGetSnapshot, DtmGetNewTransactionId, DtmGetOldestXmin, TransactionIdIsRunning, DtmGetGlobalTransactionId };
9591

9692

9793
#define XTM_TRACE(fmt, ...)
@@ -156,63 +152,23 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
156152
return false;
157153
}
158154

159-
160-
static void DtmMergeSnapshots(Snapshot dst, Snapshot src)
161-
{
162-
int i, j, n;
163-
TransactionId prev;
164-
165-
if (src->xmin < dst->xmin) {
166-
dst->xmin = src->xmin;
167-
}
168-
169-
n = dst->xcnt;
170-
Assert(src->xcnt + n <= GetMaxSnapshotXidCount());
171-
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
172-
n += src->xcnt;
173-
174-
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
175-
prev = InvalidTransactionId;
176-
177-
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
178-
if (dst->xip[i] != prev) {
179-
dst->xip[j++] = prev = dst->xip[i];
180-
}
181-
}
182-
dst->xcnt = j;
183-
}
184-
185-
static void DtmMergeWithActiveSnapshot(Snapshot dst)
186-
{
187-
int i, j;
188-
XLogRecPtr lsn;
189-
Snapshot src = &dtm->activeSnapshot;
190-
191-
LWLockAcquire(dtm->xidLock, LW_EXCLUSIVE);
192-
for (i = 0, j = 0; i < src->xcnt; i++) {
193-
if (!TransactionIdIsInSnapshot(src->xip[i], dst)
194-
&& DtmGetTransactionStatus(src->xip[i], &lsn) == TRANSACTION_STATUS_IN_PROGRESS)
195-
{
196-
src->xip[j++] = src->xip[i];
197-
}
198-
}
199-
src->xcnt = j;
200-
if (j != 0) {
201-
src->xmin = src->xip[0];
202-
DtmMergeSnapshots(dst, src);
203-
}
204-
LWLockRelease(dtm->xidLock);
205-
}
206-
155+
/* Merge local and global snapshots.
156+
* Produce most restricted (conservative) snapshot which treate transaction as in-progress if is is marked as in-progress
157+
* either in local, either in global snapshots
158+
*/
207159
static void DtmMergeWithGlobalSnapshot(Snapshot dst)
208160
{
209-
int i;
161+
int i, j, n;
210162
TransactionId xid;
211163
Snapshot src = &DtmSnapshot;
212164

213165
Assert(TransactionIdIsValid(src->xmin) && TransactionIdIsValid(src->xmax));
214166

215-
GetLocalSnapshot:
167+
GetLocalSnapshot:
168+
/*
169+
* Check that global and local snapshots are consistent: transactions marked as completed in global snapohsot
170+
* should be completed locally
171+
*/
216172
dst = GetLocalSnapshotData(dst);
217173
for (i = 0; i < dst->xcnt; i++) {
218174
if (TransactionIdIsInDoubt(dst->xip[i])) {
@@ -229,11 +185,32 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
229185

230186
if (src->xmax < dst->xmax) dst->xmax = src->xmax;
231187

232-
DtmMergeSnapshots(dst, src);
188+
if (src->xmin < dst->xmin) {
189+
dst->xmin = src->xmin;
190+
}
191+
192+
n = dst->xcnt;
193+
Assert(src->xcnt + n <= GetMaxSnapshotXidCount());
194+
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
195+
n += src->xcnt;
196+
197+
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
198+
xid = InvalidTransactionId;
199+
200+
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
201+
if (dst->xip[i] != xid) {
202+
dst->xip[j++] = xid = dst->xip[i];
203+
}
204+
}
205+
dst->xcnt = j;
233206

234207
DumpSnapshot(dst, "merged");
235208
}
236209

210+
/*
211+
* Get oldest Xid visible by any active transaction (global or local)
212+
* Take in account global Xmin received from DTMD
213+
*/
237214
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
238215
{
239216
TransactionId localXmin = GetOldestLocalXmin(rel, ignoreVacuum);
@@ -253,14 +230,16 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
253230
return localXmin;
254231
}
255232

233+
/*
234+
* Update local Recent*Xmin variables taken in account MinXmin received from DTMD
235+
*/
256236
static void DtmUpdateRecentXmin(Snapshot snapshot)
257237
{
258-
TransactionId xmin = dtm->minXid;//DtmSnapshot.xmin;
238+
TransactionId xmin = dtm->minXid;
259239
XTM_INFO("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n", dtm->minXid, DtmSnapshot.xmin);
260240

261241
if (TransactionIdIsValid(xmin)) {
262242
xmin -= vacuum_defer_cleanup_age;
263-
//xmin = FirstNormalTransactionId;
264243
if (!TransactionIdIsNormal(xmin)) {
265244
xmin = FirstNormalTransactionId;
266245
}
@@ -277,6 +256,10 @@ static void DtmUpdateRecentXmin(Snapshot snapshot)
277256
}
278257
}
279258

259+
/*
260+
* Get new XID. For global transaction is it previsly set by dtm_begin_transaction or dtm_join_transaction.
261+
* Local transactions are using range of local Xids obtains from DTM.
262+
*/
280263
static TransactionId DtmGetNextXid()
281264
{
282265
TransactionId xid;
@@ -285,18 +268,8 @@ static TransactionId DtmGetNextXid()
285268
XTM_INFO("Use global XID %d\n", DtmNextXid);
286269
xid = DtmNextXid;
287270

288-
#ifdef SUPPORT_LOCAL_TRANSACTIONS
289-
{
290-
TransactionId* p;
291-
p = bsearch(&DtmNextXid, dtm->activeSnapshot.xip, dtm->activeSnapshot.xcnt, sizeof(TransactionId), xidComparator);
292-
if (p != NULL) {
293-
dtm->activeSnapshot.xcnt -= 1;
294-
memcpy(p, p+1, (dtm->activeSnapshot.xcnt - (p - dtm->activeSnapshot.xip))*sizeof(TransactionId));
295-
}
296-
}
297-
#endif
298-
299271
if (TransactionIdPrecedesOrEquals(ShmemVariableCache->nextXid, xid)) {
272+
/* Advance ShmemVariableCache->nextXid formward until new Xid */
300273
while (TransactionIdPrecedes(ShmemVariableCache->nextXid, xid)) {
301274
XTM_INFO("Extend CLOG for global transaction to %d\n", ShmemVariableCache->nextXid);
302275
ExtendCLOG(ShmemVariableCache->nextXid);
@@ -308,21 +281,20 @@ static TransactionId DtmGetNextXid()
308281
}
309282
} else {
310283
if (dtm->nReservedXids == 0) {
311-
dtm->nReservedXids = DtmGlobalReserve(ShmemVariableCache->nextXid, DtmLocalXidReserve, &dtm->nextXid, &dtm->activeSnapshot);
284+
dtm->nReservedXids = DtmGlobalReserve(ShmemVariableCache->nextXid, DtmLocalXidReserve, &dtm->nextXid);
312285
Assert(dtm->nReservedXids > 0);
313286
Assert(TransactionIdFollowsOrEquals(dtm->nextXid, ShmemVariableCache->nextXid));
314287

288+
/* Advance ShmemVariableCache->nextXid formward until new Xid */
315289
while (TransactionIdPrecedes(ShmemVariableCache->nextXid, dtm->nextXid)) {
316290
XTM_INFO("Extend CLOG for local transaction to %d\n", ShmemVariableCache->nextXid);
317291
ExtendCLOG(ShmemVariableCache->nextXid);
318292
ExtendCommitTs(ShmemVariableCache->nextXid);
319293
ExtendSUBTRANS(ShmemVariableCache->nextXid);
320294
TransactionIdAdvance(ShmemVariableCache->nextXid);
321295
}
322-
Assert(ShmemVariableCache->nextXid == dtm->nextXid);
323-
} else {
324-
Assert(ShmemVariableCache->nextXid == dtm->nextXid);
325-
}
296+
}
297+
Assert(ShmemVariableCache->nextXid == dtm->nextXid);
326298
xid = dtm->nextXid++;
327299
dtm->nReservedXids -= 1;
328300
XTM_INFO("Obtain new local XID %d\n", xid);
@@ -337,13 +309,16 @@ DtmGetGlobalTransactionId()
337309
return DtmNextXid;
338310
}
339311

312+
/*
313+
* We have to cut&paste copde of GetNewTransactionId from varsup because we change way of advancing ShmemVariableCache->nextXid
314+
*/
340315
TransactionId
341316
DtmGetNewTransactionId(bool isSubXact)
342317
{
343318
TransactionId xid;
344319

345320
XTM_INFO("%d: GetNewTransactionId\n", getpid());
346-
Assert(!DtmGlobalXidAssigned);
321+
Assert(!DtmGlobalXidAssigned); /* We should not assign new Xid if we do not use previous one */
347322

348323
/*
349324
* Workers synchronize transaction state at the beginning of each parallel
@@ -544,53 +519,42 @@ DtmGetNewTransactionId(bool isSubXact)
544519
}
545520

546521

547-
static bool DtmTransactionIdIsInProgress(TransactionId xid)
548-
{
549-
XLogRecPtr lsn;
550-
if (TransactionIdIsRunning(xid)) {
551-
return true;
552-
}
553-
#ifdef SUPPORT_LOCAL_TRANSACTIONS
554-
else if (DtmGetTransactionStatus(xid, &lsn) == TRANSACTION_STATUS_IN_PROGRESS) {
555-
bool globallyStarted;
556-
LWLockAcquire(dtm->xidLock, LW_SHARED);
557-
globallyStarted = bsearch(&xid, dtm->activeSnapshot.xip, dtm->activeSnapshot.xcnt, sizeof(TransactionId), xidComparator) != NULL;
558-
LWLockRelease(dtm->xidLock);
559-
return globallyStarted;
560-
}
561-
#endif
562-
return false;
563-
}
564-
565-
566522
static Snapshot DtmGetSnapshot(Snapshot snapshot)
567523
{
568524
if (DtmGlobalXidAssigned) {
525+
/* If DtmGlobalXidAssigned is set, we are in transaction performing dtm_begin_transaction or dtm_join_transaction
526+
* which PRECEDS actual transaction for which Xid is received.
527+
* This transaction doesn't need to take in accountn global snapshot
528+
*/
569529
return GetLocalSnapshotData(snapshot);
570530
}
571-
if (TransactionIdIsValid(DtmNextXid) /*&& IsMVCCSnapshot(snapshot)*/ && snapshot != &CatalogSnapshotData) {
531+
if (TransactionIdIsValid(DtmNextXid) && snapshot != &CatalogSnapshotData) {
572532
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid)) {
573533
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
574534
}
575535
DtmCurcid = snapshot->curcid;
576536
DtmLastSnapshot = snapshot;
577537
DtmMergeWithGlobalSnapshot(snapshot);
578538
if (!IsolationUsesXactSnapshot()) {
539+
/* Use single global snapshot during all transaction for repeatable read isolation level,
540+
* but obtain new global snapshot each time it is requested for read committed isolation level
541+
*/
579542
DtmHasGlobalSnapshot = false;
580543
}
581544
} else {
545+
/* For local transactions and catalog snapshots use default GetSnapshotData implementation */
582546
snapshot = GetLocalSnapshotData(snapshot);
583547
}
584-
#ifdef SUPPORT_LOCAL_TRANSACTIONS
585-
DtmMergeWithActiveSnapshot(snapshot);
586-
#endif
587548
DtmUpdateRecentXmin(snapshot);
588549
CurrentTransactionSnapshot = snapshot;
589550
return snapshot;
590551
}
591552

592553
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
593554
{
555+
/* Because of global snapshots we can ask for status of transaction which is not yet started locally: so we have
556+
* to compare xid with ShmemVariableCache->nextXid before accessing CLOG
557+
*/
594558
XidStatus status = xid >= ShmemVariableCache->nextXid
595559
? TRANSACTION_STATUS_IN_PROGRESS
596560
: CLOGTransactionIdGetStatus(xid, lsn);
@@ -603,8 +567,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
603567
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
604568
if (!RecoveryInProgress()) {
605569
if (!DtmGlobalXidAssigned && TransactionIdIsValid(DtmNextXid)) {
606-
/* Already should be IN_PROGRESS */
607-
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
608570
CurrentTransactionSnapshot = NULL;
609571
if (status == TRANSACTION_STATUS_ABORTED) {
610572
CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
@@ -613,6 +575,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
613575
return;
614576
} else {
615577
XTM_INFO("Begin commit transaction %d\n", xid);
578+
/* Mark transaction as on-doubt in xid_in_doubt hash table */
616579
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
617580
hash_search(xid_in_doubt, &DtmNextXid, HASH_ENTER, NULL);
618581
LWLockRelease(dtm->hashLock);
@@ -656,9 +619,6 @@ static void DtmInitialize()
656619
dtm->xidLock = LWLockAssign();
657620
dtm->nReservedXids = 0;
658621
dtm->minXid = InvalidTransactionId;
659-
dtm->activeSnapshot.xip = (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
660-
dtm->activeSnapshot.subxip = (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
661-
662622
RegisterXactCallback(DtmXactCallback, NULL);
663623
}
664624
LWLockRelease(AddinShmemInitLock);
@@ -684,13 +644,23 @@ DtmXactCallback(XactEvent event, void *arg)
684644
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n", getpid(), event, DtmGlobalXidAssigned, DtmNextXid);
685645
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT) {
686646
if (DtmGlobalXidAssigned) {
647+
/* DtmGlobalXidAssigned is set when Xid for global transaction is recieved.
648+
* But it happens in separate local transaction preceding this global transaction at this backend.
649+
* So this variable is used as indicator that we are still in local transaction preceeding global transaction.
650+
* When this local transaction is completed we are ready to assign Xid to global transaction.
651+
*/
687652
DtmGlobalXidAssigned = false;
688653
} else if (TransactionIdIsValid(DtmNextXid)) {
689654
if (event == XACT_EVENT_COMMIT) {
655+
/* Now transaction status is already written in CLOG, so we can remove information about it from hash table */
690656
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
691657
hash_search(xid_in_doubt, &DtmNextXid, HASH_REMOVE, NULL);
692658
LWLockRelease(dtm->hashLock);
693659
} else {
660+
/* Transaction at the node can be aborted because of transaction failure at some other node
661+
* before it starts doing anything and assigned Xid, in this case Postgres is not calling SetTransactionStatus,
662+
* so we have to send report to DTMD here
663+
*/
694664
if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
695665
DtmGlobalSetTransStatus(DtmNextXid, TRANSACTION_STATUS_ABORTED, false);
696666
}

0 commit comments

Comments
 (0)
0