3333#include "utils/array.h"
3434#include "utils/builtins.h"
3535#include "utils/memutils.h"
36+ #include "commands/dbcommands.h"
37+ #include "miscadmin.h"
38+ #include "postmaster/autovacuum.h"
39+ #include "storage/pmsignal.h"
40+ #include "storage/proc.h"
41+ #include "utils/syscache.h"
3642
3743#include "libdtm.h"
3844
@@ -61,6 +67,7 @@ static void DtmUpdateRecentXmin(void);
6167static void DtmInitialize (void );
6268static void DtmXactCallback (XactEvent event , void * arg );
6369static TransactionId DtmGetNextXid (void );
70+ static TransactionId DtmGetNewTransactionId (bool isSubXact );
6471
6572static bool TransactionIdIsInDtmSnapshot (TransactionId xid );
6673static bool TransactionIdIsInDoubt (TransactionId xid );
@@ -77,7 +84,7 @@ static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
7784static bool DtmHasGlobalSnapshot ;
7885static bool DtmIsGlobalTransaction ;
7986static int DtmLocalXidReserve ;
80- static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNextXid };
87+ static TransactionManager DtmTM = { DtmGetTransactionStatus , DtmSetTransactionStatus , DtmGetSnapshot , DtmGetNewTransactionId };
8188
8289
8390#define XTM_TRACE (fmt , ...)
@@ -190,7 +197,7 @@ static void DtmUpdateRecentXmin(void)
190197
191198 if (TransactionIdIsValid (xmin )) {
192199 xmin -= vacuum_defer_cleanup_age ;
193- xmin = FirstNormalTransactionId ;
200+ // xmin = FirstNormalTransactionId;
194201 if (!TransactionIdIsNormal (xmin )) {
195202 xmin = FirstNormalTransactionId ;
196203 }
@@ -245,6 +252,211 @@ static TransactionId DtmGetNextXid()
245252 return xid ;
246253}
247254
255+ TransactionId
256+ DtmGetNewTransactionId (bool isSubXact )
257+ {
258+ TransactionId xid ;
259+
260+ /*
261+ * Workers synchronize transaction state at the beginning of each parallel
262+ * operation, so we can't account for new XIDs after that point.
263+ */
264+ if (IsInParallelMode ())
265+ elog (ERROR , "cannot assign TransactionIds during a parallel operation" );
266+
267+ /*
268+ * During bootstrap initialization, we return the special bootstrap
269+ * transaction id.
270+ */
271+ if (IsBootstrapProcessingMode ())
272+ {
273+ Assert (!isSubXact );
274+ MyPgXact -> xid = BootstrapTransactionId ;
275+ return BootstrapTransactionId ;
276+ }
277+
278+ /* safety check, we should never get this far in a HS slave */
279+ if (RecoveryInProgress ())
280+ elog (ERROR , "cannot assign TransactionIds during recovery" );
281+
282+ LWLockAcquire (XidGenLock , LW_EXCLUSIVE );
283+ xid = DtmGetNextXid ();
284+
285+ /*----------
286+ * Check to see if it's safe to assign another XID. This protects against
287+ * catastrophic data loss due to XID wraparound. The basic rules are:
288+ *
289+ * If we're past xidVacLimit, start trying to force autovacuum cycles.
290+ * If we're past xidWarnLimit, start issuing warnings.
291+ * If we're past xidStopLimit, refuse to execute transactions, unless
292+ * we are running in single-user mode (which gives an escape hatch
293+ * to the DBA who somehow got past the earlier defenses).
294+ *
295+ * Note that this coding also appears in GetNewMultiXactId.
296+ *----------
297+ */
298+ if (TransactionIdFollowsOrEquals (xid , ShmemVariableCache -> xidVacLimit ))
299+ {
300+ /*
301+ * For safety's sake, we release XidGenLock while sending signals,
302+ * warnings, etc. This is not so much because we care about
303+ * preserving concurrency in this situation, as to avoid any
304+ * possibility of deadlock while doing get_database_name(). First,
305+ * copy all the shared values we'll need in this path.
306+ */
307+ TransactionId xidWarnLimit = ShmemVariableCache -> xidWarnLimit ;
308+ TransactionId xidStopLimit = ShmemVariableCache -> xidStopLimit ;
309+ TransactionId xidWrapLimit = ShmemVariableCache -> xidWrapLimit ;
310+ Oid oldest_datoid = ShmemVariableCache -> oldestXidDB ;
311+
312+ LWLockRelease (XidGenLock );
313+
314+ /*
315+ * To avoid swamping the postmaster with signals, we issue the autovac
316+ * request only once per 64K transaction starts. This still gives
317+ * plenty of chances before we get into real trouble.
318+ */
319+ if (IsUnderPostmaster && (xid % 65536 ) == 0 )
320+ SendPostmasterSignal (PMSIGNAL_START_AUTOVAC_LAUNCHER );
321+
322+ if (IsUnderPostmaster &&
323+ TransactionIdFollowsOrEquals (xid , xidStopLimit ))
324+ {
325+ char * oldest_datname = get_database_name (oldest_datoid );
326+
327+ /* complain even if that DB has disappeared */
328+ if (oldest_datname )
329+ ereport (ERROR ,
330+ (errcode (ERRCODE_PROGRAM_LIMIT_EXCEEDED ),
331+ errmsg ("database is not accepting commands to avoid wraparound data loss in database \"%s\"" ,
332+ oldest_datname ),
333+ errhint ("Stop the postmaster and vacuum that database in single-user mode.\n"
334+ "You might also need to commit or roll back old prepared transactions." )));
335+ else
336+ ereport (ERROR ,
337+ (errcode (ERRCODE_PROGRAM_LIMIT_EXCEEDED ),
338+ errmsg ("database is not accepting commands to avoid wraparound data loss in database with OID %u" ,
339+ oldest_datoid ),
340+ errhint ("Stop the postmaster and vacuum that database in single-user mode.\n"
341+ "You might also need to commit or roll back old prepared transactions." )));
342+ }
343+ else if (TransactionIdFollowsOrEquals (xid , xidWarnLimit ))
344+ {
345+ char * oldest_datname = get_database_name (oldest_datoid );
346+
347+ /* complain even if that DB has disappeared */
348+ if (oldest_datname )
349+ ereport (WARNING ,
350+ (errmsg ("database \"%s\" must be vacuumed within %u transactions" ,
351+ oldest_datname ,
352+ xidWrapLimit - xid ),
353+ errhint ("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
354+ "You might also need to commit or roll back old prepared transactions." )));
355+ else
356+ ereport (WARNING ,
357+ (errmsg ("database with OID %u must be vacuumed within %u transactions" ,
358+ oldest_datoid ,
359+ xidWrapLimit - xid ),
360+ errhint ("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
361+ "You might also need to commit or roll back old prepared transactions." )));
362+ }
363+
364+ /* Re-acquire lock and start over */
365+ LWLockAcquire (XidGenLock , LW_EXCLUSIVE );
366+ xid = DtmGetNextXid ();
367+ }
368+
369+ /*
370+ * If we are allocating the first XID of a new page of the commit log,
371+ * zero out that commit-log page before returning. We must do this while
372+ * holding XidGenLock, else another xact could acquire and commit a later
373+ * XID before we zero the page. Fortunately, a page of the commit log
374+ * holds 32K or more transactions, so we don't have to do this very often.
375+ *
376+ * Extend pg_subtrans and pg_commit_ts too.
377+ */
378+ if (TransactionIdFollowsOrEquals (xid , ShmemVariableCache -> nextXid )) {
379+ fprintf (stderr , "Extend CLOG to %d\n" , xid );
380+ ExtendCLOG (xid );
381+ ExtendCommitTs (xid );
382+ ExtendSUBTRANS (xid );
383+ }
384+ /*
385+ * Now advance the nextXid counter. This must not happen until after we
386+ * have successfully completed ExtendCLOG() --- if that routine fails, we
387+ * want the next incoming transaction to try it again. We cannot assign
388+ * more XIDs until there is CLOG space for them.
389+ */
390+ if (xid == ShmemVariableCache -> nextXid ) {
391+ TransactionIdAdvance (ShmemVariableCache -> nextXid );
392+ } else {
393+ Assert (TransactionIdPrecedes (xid , ShmemVariableCache -> nextXid ));
394+ }
395+
396+ /*
397+ * We must store the new XID into the shared ProcArray before releasing
398+ * XidGenLock. This ensures that every active XID older than
399+ * latestCompletedXid is present in the ProcArray, which is essential for
400+ * correct OldestXmin tracking; see src/backend/access/transam/README.
401+ *
402+ * XXX by storing xid into MyPgXact without acquiring ProcArrayLock, we
403+ * are relying on fetch/store of an xid to be atomic, else other backends
404+ * might see a partially-set xid here. But holding both locks at once
405+ * would be a nasty concurrency hit. So for now, assume atomicity.
406+ *
407+ * Note that readers of PGXACT xid fields should be careful to fetch the
408+ * value only once, rather than assume they can read a value multiple
409+ * times and get the same answer each time.
410+ *
411+ * The same comments apply to the subxact xid count and overflow fields.
412+ *
413+ * A solution to the atomic-store problem would
4B92
be to give each PGXACT its
414+ * own spinlock used only for fetching/storing that PGXACT's xid and
415+ * related fields.
416+ *
417+ * If there's no room to fit a subtransaction XID into PGPROC, set the
418+ * cache-overflowed flag instead. This forces readers to look in
419+ * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
420+ * race-condition window, in that the new XID will not appear as running
421+ * until its parent link has been placed into pg_subtrans. However, that
422+ * will happen before anyone could possibly have a reason to inquire about
423+ * the status of the XID, so it seems OK. (Snapshots taken during this
424+ * window *will* include the parent XID, so they will deliver the correct
425+ * answer later on when someone does have a reason to inquire.)
426+ */
427+ {
428+ /*
429+ * Use volatile pointer to prevent code rearrangement; other backends
430+ * could be examining my subxids info concurrently, and we don't want
431+ * them to see an invalid intermediate state, such as incrementing
432+ * nxids before filling the array entry. Note we are assuming that
433+ * TransactionId and int fetch/store are atomic.
434+ */
435+ volatile PGPROC * myproc = MyProc ;
436+ volatile PGXACT * mypgxact = MyPgXact ;
437+
438+ if (!isSubXact )
439+ mypgxact -> xid = xid ;
440+ else
441+ {
442+ int nxids = mypgxact -> nxids ;
443+
444+ if (nxids < PGPROC_MAX_CACHED_SUBXIDS )
445+ {
446+ myproc -> subxids .xids [nxids ] = xid ;
447+ mypgxact -> nxids = nxids + 1 ;
448+ }
449+ else
450+ mypgxact -> overflowed = true;
451+ }
452+ }
453+
454+ LWLockRelease (XidGenLock );
455+
456+ return xid ;
457+ }
458+
459+
248460static Snapshot DtmGetSnapshot (Snapshot snapshot )
249461{
250462
0 commit comments