58
58
#include "replication/walsender.h"
59
59
#include "replication/syncrep.h"
60
60
#include "storage/fd.h"
61
+ #include "storage/ipc.h"
61
62
#include "storage/predicate.h"
62
63
#include "storage/procarray.h"
63
64
#include "storage/sinvaladt.h"
@@ -85,25 +86,25 @@ int max_prepared_xacts = 0;
85
86
*
86
87
* The lifecycle of a global transaction is:
87
88
*
88
- * 1. After checking that the requested GID is not in use, set up an
89
- * entry in the TwoPhaseState->prepXacts array with the correct XID and GID ,
90
- * with locking_xid = my own XID and valid = false .
89
+ * 1. After checking that the requested GID is not in use, set up an entry in
90
+ * the TwoPhaseState->prepXacts array with the correct GID and valid = false ,
91
+ * and mark it as locked by my backend .
91
92
*
92
93
* 2. After successfully completing prepare, set valid = true and enter the
93
94
* contained PGPROC into the global ProcArray.
94
95
*
95
- * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
96
- * is valid and its locking_xid is no longer active, then store my current
97
- * XID into locking_xid . This prevents concurrent attempts to commit or
98
- * rollback the same prepared xact.
96
+ * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
97
+ * valid and not locked, then mark the entry as locked by storing my current
98
+ * backend ID into locking_backend . This prevents concurrent attempts to
99
+ * commit or rollback the same prepared xact.
99
100
*
100
101
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
101
102
* from the ProcArray and the TwoPhaseState->prepXacts array and return it to
102
103
* the freelist.
103
104
*
104
105
* Note that if the preparing transaction fails between steps 1 and 2, the
105
- * entry will remain in prepXacts until recycled. We can detect recyclable
106
- * entries by checking for valid = false and locking_xid no longer active .
106
+ * entry must be removed so that the GID and the GlobalTransaction struct
107
+ * can be reused. See AtAbort_Twophase() .
107
108
*
108
109
* typedef struct GlobalTransactionData *GlobalTransaction appears in
109
110
* twophase.h
@@ -117,8 +118,8 @@ typedef struct GlobalTransactionData
117
<
8000
code>118 TimestampTz prepared_at ; /* time of preparation */
118
119
XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record */
119
120
Oid owner ; /* ID of user that executed the xact */
120
- TransactionId locking_xid ; /* top-level XID of backend working on xact */
121
- bool valid ; /* TRUE if fully prepared */
121
+ BackendId locking_backend ; /* backend currently working on the xact */
122
+ bool valid ; /* TRUE if PGPROC entry is in proc array */
122
123
char gid [GIDSIZE ]; /* The GID assigned to the prepared xact */
123
124
} GlobalTransactionData ;
124
125
@@ -143,6 +144,12 @@ typedef struct TwoPhaseStateData
143
144
144
145
static TwoPhaseStateData * TwoPhaseState ;
145
146
147
+ /*
148
+ * Global transaction entry currently locked by us, if any.
149
+ */
150
+ static GlobalTransaction MyLockedGxact = NULL ;
151
+
152
+ static bool twophaseExitRegistered = false;
146
153
147
154
static void RecordTransactionCommitPrepared (TransactionId xid ,
148
155
int nchildren ,
@@ -159,6 +166,7 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
159
166
RelFileNode * rels );
160
167
static void ProcessRecords (char * bufptr , TransactionId xid ,
161
168
const TwoPhaseCallback callbacks []);
169
+ static void RemoveGXact (GlobalTransaction gxact );
162
170
163
171
164
172
/*
@@ -228,6 +236,74 @@ TwoPhaseShmemInit(void)
228
236
Assert (found );
229
237
}
230
238
239
+ /*
240
+ * Exit hook to unlock the global transaction entry we're working on.
241
+ */
242
+ static void
243
+ AtProcExit_Twophase (int code , Datum arg )
244
+ {
245
+ /* same logic as abort */
246
+ AtAbort_Twophase ();
247
+ }
248
+
249
+ /*
250
+ * Abort hook to unlock the global transaction entry we're working on.
251
+ */
252
+ void
253
+ AtAbort_Twophase (void )
254
+ {
255
+ if (MyLockedGxact == NULL )
256
+ return ;
257
+
258
+ /*
259
+ * What to do with the locked global transaction entry? If we were in
260
+ * the process of preparing the transaction, but haven't written the WAL
261
+ * record and state file yet, the transaction must not be considered as
262
+ * prepared. Likewise, if we are in the process of finishing an
263
+ * already-prepared transaction, and fail after having already written
264
+ * the 2nd phase commit or rollback record to the WAL, the transaction
265
+ * should not be considered as prepared anymore. In those cases, just
266
+ * remove the entry from shared memory.
267
+ *
268
+ * Otherwise, the entry must be left in place so that the transaction
269
+ * can be finished later, so just unlock it.
270
+ *
271
+ * If we abort during prepare, after having written the WAL record, we
272
+ * might not have transfered all locks and other state to the prepared
273
+ * transaction yet. Likewise, if we abort during commit or rollback,
274
+ * after having written the WAL record, we might not have released
275
+ * all the resources held by the transaction yet. In those cases, the
276
+ * in-memory state can be wrong, but it's too late to back out.
277
+ */
278
+ if (!MyLockedGxact -> valid )
279
+ {
280
+ RemoveGXact (MyLockedGxact );
281
+ }
282
+ else
283
+ {
284
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
285
+
286
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
287
+
288
+ LWLockRelease (TwoPhaseStateLock );
289
+ }
290
+ MyLockedGxact = NULL ;
291
+ }
292
+
293
+ /*
294
+ * This is called after we have finished transfering state to the prepared
295
+ * PGXACT entry.
296
+ */
297
+ void
298
+ PostPrepare_Twophase ()
299
+ {
300
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
301
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
302
+ LWLockRelease (TwoPhaseStateLock );
303
+
304
+ MyLockedGxact = NULL ;
305
+ }
306
+
231
307
232
308
/*
233
309
* MarkAsPreparing
@@ -257,29 +333,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
257
333
errmsg ("prepared transactions are disabled" ),
258
334
errhint ("Set max_prepared_transactions to a nonzero value." )));
259
335
260
- LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
261
-
262
- /*
263
- * First, find and recycle any gxacts that failed during prepare. We do
264
- * this partly to ensure we don't mistakenly say their GIDs are still
265
- * reserved, and partly so we don't fail on out-of-slots unnecessarily.
266
- */
267
- for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
336
+ /* on first call, register the exit hook */
337
+ if (!twophaseExitRegistered )
268
338
{
269
- gxact = TwoPhaseState -> prepXacts [i ];
270
- if (!gxact -> valid && !TransactionIdIsActive (gxact -> locking_xid ))
271
- {
272
- /* It's dead Jim ... remove from the active array */
273
- TwoPhaseState -> numPrepXacts -- ;
274
- TwoPhaseState -> prepXacts [i ] = TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ];
275
- /* and put it back in the freelist */
276
- gxact -> proc .links .next = (SHM_QUEUE * ) TwoPhaseState -> freeGXacts ;
277
- TwoPhaseState -> freeGXacts = gxact ;
278
- /* Back up index count too, so we don't miss scanning one */
279
- i -- ;
280
- }
339
+ on_shmem_exit (AtProcExit_Twophase , 0 );
340
+ twophaseExitRegistered = true;
281
341
}
282
342
343
+ LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
344
+
283
345
/* Check for conflicting GID */
284
346
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
285
347
{
@@ -333,14 +395,20 @@ MarkAsPreparing(TransactionId xid, const char *gid,
333
395
gxact -> prepare_lsn .xlogid = 0 ;
334
396
gxact -> prepare_lsn .xrecoff = 0 ;
335
397
gxact -> owner = owner ;
336
- gxact -> locking_xid = xid ;
398
+ gxact -> locking_backend = MyBackendId ;
337
399
gxact -> valid = false;
338
400
strcpy (gxact -> gid , gid );
339
401
340
402
/* And insert it into the active array */
341
403
Assert (TwoPhaseState -> numPrepXacts < max_prepared_xacts );
342
404
TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ++ ] = gxact ;
343
405
406
+ /*
407
+ * Remember that we have this GlobalTransaction entry locked for us.
408
+ * If we abort after this, we must release it.
409
+ */
410
+ MyLockedGxact = gxact ;
411
+
344
412
LWLockRelease (TwoPhaseStateLock );
345
413
346
414
return gxact ;
@@ -400,6 +468,13 @@ LockGXact(const char *gid, Oid user)
400
468
{
401
469
int i ;
402
470
471
+ /* on first call, register the exit hook */
472
+ if (!twophaseExitRegistered )
473
+ {
474
+ on_shmem_exit (AtProcExit_Twophase , 0 );
475
+ twophaseExitRegistered = true;
476
+ }
477
+
403
478
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
404
479
405
480
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
@@ -413,15 +488,11 @@ LockGXact(const char *gid, Oid user)
413
488
continue ;
414
489
415
490
/* Found it, but has someone else got it locked? */
416
- if (TransactionIdIsValid (gxact -> locking_xid ))
417
- {
418
- if (TransactionIdIsActive (gxact -> locking_xid ))
419
- ereport (ERROR ,
420
- (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
421
- errmsg ("prepared transaction with identifier \"%s\" is busy" ,
422
- gid )));
423
- gxact -> locking_xid = InvalidTransactionId ;
424
- }
491
+ if (gxact -> locking_backend != InvalidBackendId )
492
+ ereport (ERROR ,
493
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
494
+ errmsg ("prepared transaction with identifier \"%s\" is busy" ,
495
+ gid )));
425
496
426
497
if (user != gxact -> owner && !superuser_arg (user ))
427
498
ereport (ERROR ,
@@ -442,7 +513,8 @@ LockGXact(const char *gid, Oid user)
442
513
errhint ("Connect to the database where the transaction was prepared to finish it." )));
443
514
444
515
/* OK for me to lock it */
445
- gxact -> locking_xid = GetTopTransactionId ();
516
+ gxact -> locking_backend = MyBackendId ;
517
+ MyLockedGxact = gxact ;
446
518
447
519
LWLockRelease (TwoPhaseStateLock );
448
520
@@ -1070,6 +1142,13 @@ EndPrepare(GlobalTransaction gxact)
1070
1142
*/
1071
1143
MyProc -> inCommit = false;
1072
1144
1145
+ /*
1146
+ * Remember that we have this GlobalTransaction entry locked for us. If
1147
+ * we crash after this point, it's too late to abort, but we must unlock
1148
+ * it so that the prepared transaction can be committed or rolled back.
1149
+ */
1150
+ MyLockedGxact = gxact ;
1151
+
1073
1152
END_CRIT_SECTION ();
1074
1153
1075
1154
/*
@@ -1312,8 +1391,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1312
1391
1313
1392
/*
1314
1393
* In case we fail while running the callbacks, mark the gxact invalid so
1315
- * no one else will try to commit/rollback, and so it can be recycled
1316
- * properly later. It is still locked by our XID so it won't go away yet.
1394
+ * no one else will try to commit/rollback, and so it will be recycled
1395
+ * if we fail after this point. It is still locked by our backend so it
1396
+ * won't go away yet.
1317
1397
*
1318
1398
* (We assume it's safe to do this without taking TwoPhaseStateLock.)
1319
1399
*/
@@ -1377,6 +1457,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1377
1457
RemoveTwoPhaseFile (xid , true);
1378
1458
1379
1459
RemoveGXact (gxact );
1460
+ MyLockedGxact = NULL ;
1380
1461
1381
1462
pfree (buf );
1382
1463
}
0 commit comments