@@ -46,10 +46,9 @@ typedef struct
46
46
{
47
47
LWLockId hashLock ;
48
48
LWLockId xidLock ;
49
- TransactionId minXid ;
50
- TransactionId nextXid ;
51
- size_t nReservedXids ;
52
- SnapshotData activeSnapshot ;
49
+ TransactionId minXid ; /* XID of oldest transaction visible by any active transaction (local or global) */
50
+ TransactionId nextXid ; /* next XID for local transaction */
51
+ size_t nReservedXids ; /* number of XIDs reserved for local transactions */
53
52
} DtmState ;
54
53
55
54
@@ -60,15 +59,12 @@ void _PG_init(void);
60
59
void _PG_fini (void );
61
60
62
61
static Snapshot DtmGetSnapshot (Snapshot snapshot );
63
- static void DtmMergeSnapshots (Snapshot dst , Snapshot src );
64
- static void DtmMergeWithActiveSnapshot (Snapshot snapshot );
65
62
static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
66
63
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
67
64
static void DtmSetTransactionStatus (TransactionId xid , int nsubxids , TransactionId * subxids , XidStatus status , XLogRecPtr lsn );
68
65
static void DtmUpdateRecentXmin (Snapshot snapshot );
69
66
static void DtmInitialize (void );
70
67
static void DtmXactCallback (XactEvent event , void * arg );
71
- static bool DtmTransactionIdIsInProgress (TransactionId xid );
72
68
static TransactionId DtmGetNextXid (void );
73
69
static TransactionId DtmGetNewTransactionId (bool isSubXact );
74
70
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum );
@@ -91,7 +87,7 @@ static bool DtmGlobalXidAssigned;
91
87
static int DtmLocalXidReserve ;
92
88
static int DtmCurcid ;
93
89
static Snapshot DtmLastSnapshot ;
94
- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin , DtmTransactionIdIsInProgress , DtmGetGlobalTransactionId };
90
+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId , DtmGetOldestXmin , TransactionIdIsRunning , DtmGetGlobalTransactionId };
95
91
96
92
97
93
#define XTM_TRACE (fmt , ...)
@@ -156,63 +152,23 @@ static bool TransactionIdIsInDoubt(TransactionId xid)
156
152
return false;
157
153
}
158
154
159
-
160
- static void DtmMergeSnapshots (Snapshot dst , Snapshot src )
161
- {
162
- int i , j , n ;
163
- TransactionId prev ;
164
-
165
- if (src -> xmin < dst -> xmin ) {
166
- dst -> xmin = src -> xmin ;
167
- }
168
-
169
- n = dst -> xcnt ;
170
- Assert (src -> xcnt + n <= GetMaxSnapshotXidCount ());
171
- memcpy (dst -> xip + n , src -> xip , src -> xcnt * sizeof (TransactionId ));
172
- n += src -> xcnt ;
173
-
174
- qsort (dst -> xip , n , sizeof (TransactionId ), xidComparator );
175
- prev = InvalidTransactionId ;
176
-
177
- for (i = 0 , j = 0 ; i < n && dst -> xip [i ] < dst -> xmax ; i ++ ) {
178
- if (dst -> xip [i ] != prev ) {
179
- dst -> xip [j ++ ] = prev = dst -> xip [i ];
180
- }
181
- }
182
- dst -> xcnt = j ;
183
- }
184
-
185
- static void DtmMergeWithActiveSnapshot (Snapshot dst )
186
- {
187
- int i , j ;
188
- XLogRecPtr lsn ;
189
- Snapshot src = & dtm -> activeSnapshot ;
190
-
191
- LWLockAcquire (dtm -> xidLock , LW_EXCLUSIVE );
192
- for (i = 0 , j = 0 ; i < src -> xcnt ; i ++ ) {
193
- if (!TransactionIdIsInSnapshot (src -> xip [i ], dst )
194
- && DtmGetTransactionStatus (src -> xip [i ], & lsn ) == TRANSACTION_STATUS_IN_PROGRESS )
195
- {
196
- src -> xip [j ++ ] = src -> xip [i ];
197
- }
198
- }
199
- src -> xcnt = j ;
200
- if (j != 0 ) {
201
- src -> xmin = src -> xip [0 ];
202
- DtmMergeSnapshots (dst , src );
203
- }
204
- LWLockRelease (dtm -> xidLock );
205
- }
206
-
155
+ /* Merge local and global snapshots.
156
+ * Produce most restricted (conservative) snapshot which treate transaction as in-progress if is is marked as in-progress
157
+ * either in local, either in global snapshots
158
+ */
207
159
static void DtmMergeWithGlobalSnapshot (Snapshot dst )
208
160
{
209
- int i ;
161
+ int i , j , n ;
210
162
TransactionId xid ;
211
163
Snapshot src = & DtmSnapshot ;
212
164
213
165
Assert (TransactionIdIsValid (src -> xmin ) && TransactionIdIsValid (src -> xmax ));
214
166
215
- GetLocalSnapshot :
167
+ GetLocalSnapshot :
168
+ /*
169
+ * Check that global and local snapshots are consistent: transactions marked as completed in global snapohsot
170
+ * should be completed locally
171
+ */
216
172
dst = GetLocalSnapshotData (dst );
217
173
for (i = 0 ; i < dst -> xcnt ; i ++ ) {
218
174
if (TransactionIdIsInDoubt (dst -> xip [i ])) {
@@ -229,11 +185,32 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
229
185
230
186
if (src -> xmax < dst -> xmax ) dst -> xmax = src -> xmax ;
231
187
232
- DtmMergeSnapshots (dst , src );
188
+ if (src -> xmin < dst -> xmin ) {
189
+ dst -> xmin = src -> xmin ;
190
+ }
191
+
192
+ n = dst -> xcnt ;
193
+ Assert (src -> xcnt + n <= GetMaxSnapshotXidCount ());
194
+ memcpy (dst -> xip + n , src -> xip , src -> xcnt * sizeof (TransactionId ));
195
+ n += src -> xcnt ;
196
+
197
+ qsort (dst -> xip , n , sizeof (TransactionId ), xidComparator );
198
+ xid = InvalidTransactionId ;
199
+
200
+ for (i = 0 , j = 0 ; i < n && dst -> xip [i ] < dst -> xmax ; i ++ ) {
201
+ if (dst -> xip [i ] != xid ) {
202
+ dst -> xip [j ++ ] = xid = dst -> xip [i ];
203
+ }
204
+ }
205
+ dst -> xcnt = j ;
233
206
234
207
DumpSnapshot (dst , "merged" );
235
208
}
236
209
210
+ /*
211
+ * Get oldest Xid visible by any active transaction (global or local)
212
+ * Take in account global Xmin received from DTMD
213
+ */
237
214
static TransactionId DtmGetOldestXmin (Relation rel , bool ignoreVacuum )
238
215
{
239
216
TransactionId localXmin = GetOldestLocalXmin (rel , ignoreVacuum );
@@ -253,14 +230,16 @@ static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum)
253
230
return localXmin ;
254
231
}
255
232
233
+ /*
234
+ * Update local Recent*Xmin variables taken in account MinXmin received from DTMD
235
+ */
256
236
static void DtmUpdateRecentXmin (Snapshot snapshot )
257
237
{
258
- TransactionId xmin = dtm -> minXid ;//DtmSnapshot.xmin;
238
+ TransactionId xmin = dtm -> minXid ;
259
239
XTM_INFO ("XTM: DtmUpdateRecentXmin global xmin=%d, snapshot xmin %d\n" , dtm -> minXid , DtmSnapshot .xmin );
260
240
261
241
if (TransactionIdIsValid (xmin )) {
262
242
xmin -= vacuum_defer_cleanup_age ;
263
- //xmin = FirstNormalTransactionId;
264
243
if (!TransactionIdIsNormal (xmin )) {
265
244
xmin = FirstNormalTransactionId ;
266
245
}
@@ -277,6 +256,10 @@ static void DtmUpdateRecentXmin(Snapshot snapshot)
277
256
}
278
257
}
279
258
259
+ /*
260
+ * Get new XID. For global transaction is it previsly set by dtm_begin_transaction or dtm_join_transaction.
261
+ * Local transactions are using range of local Xids obtains from DTM.
262
+ */
280
263
static TransactionId DtmGetNextXid ()
281
264
{
282
265
TransactionId xid ;
@@ -285,18 +268,8 @@ static TransactionId DtmGetNextXid()
285
268
XTM_INFO ("Use global XID %d\n" , DtmNextXid );
286
269
xid = DtmNextXid ;
287
270
288
- #ifdef SUPPORT_LOCAL_TRANSACTIONS
289
- {
290
- TransactionId * p ;
291
- p = bsearch (& DtmNextXid , dtm -> activeSnapshot .xip , dtm -> activeSnapshot .xcnt , sizeof (TransactionId ), xidComparator );
292
- if (p != NULL ) {
293
- dtm -> activeSnapshot .xcnt -= 1 ;
294
- memcpy (p , p + 1 , (dtm -> activeSnapshot .xcnt - (p - dtm -> activeSnapshot .xip ))* sizeof (TransactionId ));
295
- }
296
- }
297
- #endif
298
-
299
271
if (TransactionIdPrecedesOrEquals (ShmemVariableCache -> nextXid , xid )) {
272
+ /* Advance ShmemVariableCache->nextXid formward until new Xid */
300
273
while (TransactionIdPrecedes (ShmemVariableCache -> nextXid , xid )) {
301
274
XTM_INFO ("Extend CLOG for global transaction to %d\n" , ShmemVariableCache -> nextXid );
302
275
ExtendCLOG (ShmemVariableCache -> nextXid );
@@ -308,21 +281,20 @@ static TransactionId DtmGetNextXid()
308
281
}
309
282
} else {
310
283
if (dtm -> nReservedXids == 0 ) {
311
- dtm -> nReservedXids = DtmGlobalReserve (ShmemVariableCache -> nextXid , DtmLocalXidReserve , & dtm -> nextXid , & dtm -> activeSnapshot );
284
+ dtm -> nReservedXids = DtmGlobalReserve (ShmemVariableCache -> nextXid , DtmLocalXidReserve , & dtm -> nextXid );
312
285
Assert (dtm -> nReservedXids > 0 );
313
286
Assert (TransactionIdFollowsOrEquals (dtm -> nextXid , ShmemVariableCache -> nextXid ));
314
287
288
+ /* Advance ShmemVariableCache->nextXid formward until new Xid */
315
289
while (TransactionIdPrecedes (ShmemVariableCache -> nextXid , dtm -> nextXid )) {
316
290
XTM_INFO ("Extend CLOG for local transaction to %d\n" , ShmemVariableCache -> nextXid );
317
291
ExtendCLOG (ShmemVariableCache -> nextXid );
318
292
ExtendCommitTs (ShmemVariableCache -> nextXid );
319
293
ExtendSUBTRANS (ShmemVariableCache -> nextXid );
320
294
TransactionIdAdvance (ShmemVariableCache -> nextXid );
321
295
}
322
- Assert (ShmemVariableCache -> nextXid == dtm -> nextXid );
323
- } else {
324
- Assert (ShmemVariableCache -> nextXid == dtm -> nextXid );
325
- }
296
+ }
297
+ Assert (ShmemVariableCache -> nextXid == dtm -> nextXid );
326
298
xid = dtm -> nextXid ++ ;
327
299
dtm -> nReservedXids -= 1 ;
328
300
XTM_INFO ("Obtain new local XID %d\n" , xid );
@@ -337,13 +309,16 @@ DtmGetGlobalTransactionId()
337
309
return DtmNextXid ;
338
310
}
339
311
312
+ /*
313
+ * We have to cut&paste copde of GetNewTransactionId from varsup because we change way of advancing ShmemVariableCache->nextXid
314
+ */
340
315
TransactionId
341
316
DtmGetNewTransactionId (bool isSubXact )
342
317
{
343
318
TransactionId xid ;
344
319
345
320
XTM_INFO ("%d: GetNewTransactionId\n" , getpid ());
346
- Assert (!DtmGlobalXidAssigned );
321
+ Assert (!DtmGlobalXidAssigned ); /* We should not assign new Xid if we do not use previous one */
347
322
348
323
/*
349
324 * Workers synchronize transaction state at the beginning of each parallel
@@ -544,53 +519,42 @@ DtmGetNewTransactionId(bool isSubXact)
544
519
}
545
520
546
521
547
- static bool DtmTransactionIdIsInProgress (TransactionId xid )
548
- {
549
- XLogRecPtr lsn ;
550
- if (TransactionIdIsRunning (xid )) {
551
- return true;
552
- }
553
- #ifdef SUPPORT_LOCAL_TRANSACTIONS
554
- else if (DtmGetTransactionStatus (xid , & lsn ) == TRANSACTION_STATUS_IN_PROGRESS ) {
555
- bool globallyStarted ;
556
- LWLockAcquire (dtm -> xidLock , LW_SHARED );
557
- globallyStarted = bsearch (& xid , dtm -> activeSnapshot .xip , dtm -> activeSnapshot .xcnt , sizeof (TransactionId ), xidComparator ) != NULL ;
558
- LWLockRelease (dtm -> xidLock );
559
- return globallyStarted ;
560
- }
561
- #endif
562
- return false;
563
- }
564
-
565
-
566
522
static Snapshot DtmGetSnapshot (Snapshot snapshot )
567
523
{
568
524
if (DtmGlobalXidAssigned ) {
525
+ /* If DtmGlobalXidAssigned is set, we are in transaction performing dtm_begin_transaction or dtm_join_transaction
526
+ * which PRECEDS actual transaction for which Xid is received.
527
+ * This transaction doesn't need to take in accountn global snapshot
528
+ */
569
529
return GetLocalSnapshotData (snapshot );
570
530
}
571
- if (TransactionIdIsValid (DtmNextXid ) /*&& IsMVCCSnapshot(snapshot)*/ && snapshot != & CatalogSnapshotData ) {
531
+ if (TransactionIdIsValid (DtmNextXid ) && snapshot != & CatalogSnapshotData ) {
572
532
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot -> curcid )) {
573
533
DtmGlobalGetSnapshot (DtmNextXid , & DtmSnapshot , & dtm -> minXid );
574
534
}
575
535
DtmCurcid = snapshot -> curcid ;
576
536
DtmLastSnapshot = snapshot ;
577
537
DtmMergeWithGlobalSnapshot (snapshot );
578
538
if (!IsolationUsesXactSnapshot ()) {
539
+ /* Use single global snapshot during all transaction for repeatable read isolation level,
540
+ * but obtain new global snapshot each time it is requested for read committed isolation level
541
+ */
579
542
DtmHasGlobalSnapshot = false;
580
543
}
581
544
} else {
545
+ /* For local transactions and catalog snapshots use default GetSnapshotData implementation */
582
546
snapshot = GetLocalSnapshotData (snapshot );
583
547
}
584
- #ifdef SUPPORT_LOCAL_TRANSACTIONS
585
- DtmMergeWithActiveSnapshot (snapshot );
586
- #endif
587
548
DtmUpdateRecentXmin (snapshot );
588
549
CurrentTransactionSnapshot = snapshot ;
589
550
return snapshot ;
590
551
}
591
552
592
553
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn )
593
554
{
555
+ /* Because of global snapshots we can ask for status of transaction which is not yet started locally: so we have
556
+ * to compare xid with ShmemVariableCache->nextXid before accessing CLOG
557
+ */
594
558
XidStatus status = xid >= ShmemVariableCache -> nextXid
595
559
? TRANSACTION_STATUS_IN_PROGRESS
596
560
: CLOGTransactionIdGetStatus (xid , lsn );
@@ -603,8 +567,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
603
567
XTM_INFO ("%d: DtmSetTransactionStatus %u = %u\n" , getpid (), xid , status );
604
568
if (!RecoveryInProgress ()) {
605
569
if (!DtmGlobalXidAssigned && TransactionIdIsValid (DtmNextXid )) {
606
- /* Already should be IN_PROGRESS */
607
- /* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
608
570
CurrentTransactionSnapshot = NULL ;
609
571
if (status == TRANSACTION_STATUS_ABORTED ) {
610
572
CLOGTransactionIdSetTreeStatus (xid , nsubxids , subxids , status , lsn );
@@ -613,6 +575,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
613
575
return ;
614
576
} else {
615
577
XTM_INFO ("Begin commit transaction %d\n" , xid );
578
+ /* Mark transaction as on-doubt in xid_in_doubt hash table */
616
579
LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
617
580
hash_search (xid_in_doubt , & DtmNextXid , HASH_ENTER , NULL );
618
581
LWLockRelease (dtm -> hashLock );
@@ -656,9 +619,6 @@ static void DtmInitialize()
656
619
dtm -> xidLock = LWLockAssign ();
657
620
dtm -> nReservedXids = 0 ;
658
621
dtm -> minXid = InvalidTransactionId ;
659
- dtm -> activeSnapshot .xip = (TransactionId * )ShmemAlloc (GetMaxSnapshotXidCount () * sizeof (TransactionId ));
660
- dtm -> activeSnapshot .subxip = (TransactionId * )ShmemAlloc (GetMaxSnapshotSubxidCount () * sizeof (TransactionId ));
661
-
662
622
RegisterXactCallback (DtmXactCallback , NULL );
663
623
}
664
624
LWLockRelease (AddinShmemInitLock );
@@ -684,13 +644,23 @@ DtmXactCallback(XactEvent event, void *arg)
684
644
XTM_INFO ("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n" , getpid (), event , DtmGlobalXidAssigned , DtmNextXid );
685
645
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_ABORT ) {
686
646
if (DtmGlobalXidAssigned ) {
647
+ /* DtmGlobalXidAssigned is set when Xid for global transaction is recieved.
648
+ * But it happens in separate local transaction preceding this global transaction at this backend.
649
+ * So this variable is used as indicator that we are still in local transaction preceeding global transaction.
650
+ * When this local transaction is completed we are ready to assign Xid to global transaction.
651
+ */
687
652
DtmGlobalXidAssigned = false;
688
653
} else if (TransactionIdIsValid (DtmNextXid )) {
689
654
if (event == XACT_EVENT_COMMIT ) {
655
+ /* Now transaction status is already written in CLOG, so we can remove information about it from hash table */
690
656
LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
691
657
hash_search (xid_in_doubt , & DtmNextXid , HASH_REMOVE , NULL );
692
658
LWLockRelease (dtm -> hashLock );
693
659
} else {
660
+ /* Transaction at the node can be aborted because of transaction failure at some other node
661
+ * before it starts doing anything and assigned Xid, in this case Postgres is not calling SetTransactionStatus,
662
+ * so we have to send report to DTMD here
663
+ */
694
664
if (!TransactionIdIsValid (GetCurrentTransactionIdIfAny ())) {
695
665
DtmGlobalSetTransStatus (DtmNextXid , TRANSACTION_STATUS_ABORTED , false);
696
666
}
0 commit comments