56
56
#include "pg_trace.h"
57
57
#include "pgstat.h"
58
58
#include "storage/fd.h"
59
+ #include "storage/ipc.h"
59
60
#include "storage/procarray.h"
60
61
#include "storage/smgr.h"
61
62
#include "utils/builtins.h"
@@ -81,25 +82,25 @@ int max_prepared_xacts = 0;
81
82
*
82
83
* The lifecycle of a global transaction is:
83
84
*
84
- * 1. After checking that the requested GID is not in use, set up an
85
- * entry in the TwoPhaseState->prepXacts array with the correct XID and GID ,
86
- * with locking_xid = my own XID and valid = false .
85
+ * 1. After checking that the requested GID is not in use, set up an entry in
86
+ * the TwoPhaseState->prepXacts array with the correct GID and valid = false ,
87
+ * and mark it as locked by my backend .
87
88
*
88
89
* 2. After successfully completing prepare, set valid = true and enter the
89
90
* contained PGPROC into the global ProcArray.
90
91
*
91
- * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
92
- * is valid and its locking_xid is no longer active, then store my current
93
- * XID into locking_xid . This prevents concurrent attempts to commit or
94
- * rollback the same prepared xact.
92
+ * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
93
+ * valid and not locked, then mark the entry as locked by storing my current
94
+ * backend ID into locking_backend . This prevents concurrent attempts to
95
+ * commit or rollback the same prepared xact.
95
96
*
96
97
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
97
98
* from the ProcArray and the TwoPhaseState->prepXacts array and return it to
98
99
* the freelist.
99
100
*
100
101
* Note that if the preparing transaction fails between steps 1 and 2, the
101
- * entry will remain in prepXacts until recycled. We can detect recyclable
102
- * entries by checking for valid = false and locking_xid no longer active .
102
+ * entry must be removed so that the GID and the GlobalTransaction struct
103
+ * can be reused. See AtAbort_Twophase() .
103
104
*
104
105
* typedef struct GlobalTransactionData *GlobalTransaction appears in
105
106
* twophase.h
@@ -113,8 +114,8 @@ typedef struct GlobalTransactionData
113
114
TimestampTz prepared_at ; /* time of preparation */
114
115
XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record */
115
116
Oid owner ; /* ID of user that executed the xact */
116
- TransactionId locking_xid ; /* top-level XID of backend working on xact */
117
- bool valid ; /* TRUE if fully prepared */
117
+ BackendId locking_backend ; /* backend currently working on the xact */
118
+ bool valid ; /* TRUE if PGPROC entry is in proc array */
118
119
char gid [GIDSIZE ]; /* The GID assigned to the prepared xact */
119
120
} GlobalTransactionData ;
120
121
@@ -139,6 +140,12 @@ typedef struct TwoPhaseStateData
139
140
140
141
static TwoPhaseStateData * TwoPhaseState ;
141
142
143
+ /*
144
+ * Global transaction entry currently locked by us, if any.
145
+ */
146
+ static GlobalTransaction MyLockedGxact = NULL ;
147
+
148
+ static bool twophaseExitRegistered = false;
142
149
143
150
static void RecordTransactionCommitPrepared (TransactionId xid ,
144
151
int nchildren ,
@@ -152,6 +159,7 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
152
159
RelFileNode * rels );
153
160
static void ProcessRecords (char * bufptr , TransactionId xid ,
154
161
const TwoPhaseCallback callbacks []);
162
+ static void RemoveGXact (GlobalTransaction gxact );
155
163
156
164
157
165
/*
@@ -221,6 +229,74 @@ TwoPhaseShmemInit(void)
221
229
Assert (found );
222
230
}
223
231
232
+ /*
233
+ * Exit hook to unlock the global transaction entry we're working on.
234
+ */
235
+ static void
236
+ AtProcExit_Twophase (int code , Datum arg )
237
+ {
238
+ /* same logic as abort */
239
+ AtAbort_Twophase ();
240
+ }
241
+
242
+ /*
243
+ * Abort hook to unlock the global transaction entry we're working on.
244
+ */
245
+ void
246
+ AtAbort_Twophase (void )
247
+ {
248
+ if (MyLockedGxact == NULL )
249
+ return ;
250
+
251
+ /*
252
+ * What to do with the locked global transaction entry? If we were in
253
+ * the process of preparing the transaction, but haven't written the WAL
254
+ * record and state file yet, the transaction must not be considered as
255
+ * prepared. Likewise, if we are in the process of finishing an
256
+ * already-prepared transaction, and fail after having already written
257
+ * the 2nd phase commit or rollback record to the WAL, the transaction
258
+ * should not be considered as prepared anymore. In those cases, just
259
+ * remove the entry from shared memory.
260
+ *
261
+ * Otherwise, the entry must be left in place so that the transaction
262
+ * can be finished later, so just unlock it.
263
+ *
264
+ * If we abort during prepare, after having written the WAL record, we
265
+ * might not have transfered all locks and other state to the prepared
266
+ * transaction yet. Likewise, if we abort during commit or rollback,
267
+ * after having written the WAL record, we might not have released
268
+ * all the resources held by the transaction yet. In those cases, the
269
+ * in-memory state can be wrong, but it's too late to back out.
270
+ */
271
+ if (!MyLockedGxact -> valid )
272
+ {
273
+ RemoveGXact (MyLockedGxact );
274
+ }
275
+ else
276
+ {
277
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
278
+
279
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
280
+
281
+ LWLockRelease (TwoPhaseStateLock );
282
+ }
283
+ MyLockedGxact = NULL ;
284
+ }
285
+
286
+ /*
287
+ * This is called after we have finished transfering state to the prepared
288
+ * PGXACT entry.
289
+ */
290
+ void
291
+ PostPrepare_Twophase ()
292
+ {
293
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
294
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
295
+ LWLockRelease (TwoPhaseStateLock );
296
+
297
+ MyLockedGxact = NULL ;
298
+ }
299
+
224
300
225
301
/*
226
302
* MarkAsPreparing
@@ -250,29 +326,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
250
326
errmsg ("prepared transactions are disabled" ),
251
327
errhint ("Set max_prepared_transactions to a nonzero value." )));
252
328
253
- LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
254
-
255
- /*
256
- * First, find and recycle any gxacts that failed during prepare. We do
257
- * this partly to ensure we don't mistakenly say their GIDs are still
258
- * reserved, and partly so we don't fail on out-of-slots unnecessarily.
259
- */
260
- for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
329
+ /* on first call, register the exit hook */
330
+ if (!twophaseExitRegistered )
261
331
{
262
- gxact = TwoPhaseState -> prepXacts [i ];
263
- if (!gxact -> valid && !TransactionIdIsActive (gxact -> locking_xid ))
264
- {
265
- /* It's dead Jim ... remove from the active array */
266
- TwoPhaseState -> numPrepXacts -- ;
267
- TwoPhaseState -> prepXacts [i ] = TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ];
268
- /* and put it back in the freelist */
269
- gxact -> proc .links .next = (SHM_QUEUE * ) TwoPhaseState -> freeGXacts ;
270
- TwoPhaseState -> freeGXacts = gxact ;
271
- /* Back up index count too, so we don't miss scanning one */
272
- i -- ;
273
- }
332
+ on_shmem_exit (AtProcExit_Twophase , 0 );
333
+ twophaseExitRegistered = true;
274
334
}
275
335
336
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
337
+
276
338
/* Check for conflicting GID */
277
339
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
278
340
{
@@ -326,14 +388,20 @@ MarkAsPreparing(TransactionId xid, const char *gid,
326
388
gxact -> prepare_lsn .xlogid = 0 ;
327
389
gxact -> prepare_lsn .xrecoff = 0 ;
328
390
gxact -> owner = owner ;
329
- gxact -> locking_xid = xid ;
391
+ gxact -> locking_backend = MyBackendId ;
330
392
gxact -> valid = false;
331
393
strcpy (gxact -> gid , gid );
332
394
333
395
/* And insert it into the active array */
334
396
Assert (TwoPhaseState -> numPrepXacts < max_prepared_xacts );
335
397
TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ++ ] = gxact ;
336
398
399
+ /*
400
+ * Remember that we have this GlobalTransaction entry locked for us.
401
+ * If we abort after this, we must release it.
402
+ */
403
+ MyLockedGxact = gxact ;
404
+
337
405
LWLockRelease (TwoPhaseStateLock );
338
406
339
407
return gxact ;
@@ -393,6 +461,13 @@ LockGXact(const char *gid, Oid user)
393
461
{
394
462
int i ;
395
463
464
+ /* on first call, register the exit hook */
465
+ if (!twophaseExitRegistered )
466
+ {
467
+ on_shmem_exit (AtProcExit_Twophase , 0 );
468
+ twophaseExitRegistered = true;
469
+ }
470
+
396
471
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
397
472
398
473
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
@@ -406,15 +481,11 @@ LockGXact(const char *gid, Oid user)
406
481
continue ;
407
482
408
483
/* Found it, but has someone else got it locked? */
409
- if (TransactionIdIsValid (gxact -> locking_xid ))
410
- {
411
- if (TransactionIdIsActive (gxact -> locking_xid ))
412
- ereport (ERROR ,
413
- (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
414
- errmsg ("prepared transaction with identifier \"%s\" is busy" ,
415
- gid )));
416
- gxact -> locking_xid = InvalidTransactionId ;
417
- }
484
+ if (gxact -> locking_backend != InvalidBackendId )
485
+ ereport (ERROR ,
486
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
487
+ errmsg ("prepared transaction with identifier \"%s\" is busy" ,
488
+ gid )));
418
489
419
490
if (user != gxact -> owner && !superuser_arg (user ))
420
491
ereport (ERROR ,
@@ -435,7 +506,8 @@ LockGXact(const char *gid, Oid user)
435
506
errhint ("Connect to the database where the transaction was prepared to finish it." )));
436
507
437
508
/* OK for me to lock it */
438
- gxact -> locking_xid = GetTopTransactionId ();
509
+ gxact -> locking_backend = MyBackendId ;
510
+ MyLockedGxact = gxact ;
439
511
440
512
LWLockRelease (TwoPhaseStateLock );
441
513
@@ -1041,6 +1113,13 @@ EndPrepare(GlobalTransaction gxact)
1041
1113
*/
1042
1114
MyProc -> inCommit = false;
1043
1115
1116
+ /*
1117
+ * Remember that we have this GlobalTransaction entry locked for us. If
1118
+ * we crash after this point, it's too late to abort, but we must unlock
1119
+ * it so that the prepared transaction can be committed or rolled back.
1120
+ */
1121
+ MyLockedGxact = gxact ;
1122
+
1044
1123
END_CRIT_SECTION ();
1045
1124
1046
1125
records .tail = records .head = NULL ;
@@ -1240,8 +1319,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1240
1319
1241
1320
/*
1242
1321
* In case we fail while running the callbacks, mark the gxact invalid so
1243
- * no one else will try to commit/rollback, and so it can be recycled
1244
- * properly later. It is still locked by our XID so it won't go away yet.
1322
+ * no one else will try to commit/rollback, and so it will be recycled
1323
+ * if we fail after this point. It is still locked by our backend so it
1324
+ * won't go away yet.
1245
1325
*
1246
1326
* (We assume it's safe to do this without taking TwoPhaseStateLock.)
1247
1327
*/
@@ -1291,6 +1371,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1291
1371
RemoveTwoPhaseFile (xid , true);
1292
1372
1293
1373
RemoveGXact (gxact );
1374
+ MyLockedGxact = NULL ;
1294
1375
1295
1376
pfree (buf );
1296
1377
}
0 commit comments