36
36
/*
37
37
* We don't want to log each fetching of a value from a sequence,
38
38
* so we pre-log a few fetches in advance. In the event of
39
- * crash we can lose as much as we pre-logged.
39
+ * crash we can lose (skip over) as many values as we pre-logged.
40
40
*/
41
41
#define SEQ_LOG_VALS 32
42
42
@@ -70,7 +70,7 @@ typedef struct SeqTableData
70
70
int64 cached ; /* last value already cached for nextval */
71
71
/* if last != cached, we have not used up all the cached values */
72
72
int64 increment ; /* copy of sequence's increment field */
73
- /* note that increment is zero until we first do read_info () */
73
+ /* note that increment is zero until we first do read_seq_tuple () */
74
74
} SeqTableData ;
75
75
76
76
typedef SeqTableData * SeqTable ;
@@ -86,7 +86,8 @@ static SeqTableData *last_used_seq = NULL;
86
86
static int64 nextval_internal (Oid relid );
87
87
static Relation open_share_lock (SeqTable seq );
88
88
static void init_sequence (Oid relid , SeqTable * p_elm , Relation * p_rel );
89
- static Form_pg_sequence read_info (SeqTable elm , Relation rel , Buffer * buf );
89
+ static Form_pg_sequence read_seq_tuple (SeqTable elm , Relation rel ,
90
+ Buffer * buf , HeapTuple seqtuple );
90
91
static void init_params (List * options , bool isInit ,
91
92
Form_pg_sequence new , List * * owned_by );
92
93
static void do_setval (Oid relid , int64 next , bool iscalled );
@@ -171,7 +172,7 @@ DefineSequence(CreateSeqStmt *seq)
171
172
case SEQ_COL_LOG :
172
173
coldef -> typename = makeTypeNameFromOid (INT8OID , -1 );
173
174
coldef -> colname = "log_cnt" ;
174
- value [i - 1 ] = Int64GetDatum ((int64 ) 1 );
175
+ value [i - 1 ] = Int64GetDatum ((int64 ) 0 );
175
176
break ;
176
177
case SEQ_COL_CYCLE :
177
178
coldef -> typename = makeTypeNameFromOid (BOOLOID , -1 );
@@ -267,12 +268,6 @@ DefineSequence(CreateSeqStmt *seq)
267
268
xl_seq_rec xlrec ;
268
269
XLogRecPtr recptr ;
269
270
XLogRecData rdata [2 ];
270
- Form_pg_sequence newseq = (Form_pg_sequence ) GETSTRUCT (tuple );
271
-
272
- /* We do not log first nextval call, so "advance" sequence here */
273
- /* Note we are scribbling on local tuple, not the disk buffer */
274
- newseq -> is_called = true;
275
- newseq -> log_cnt = 0 ;
276
271
277
272
xlrec .node = rel -> rd_node ;
278
273
rdata [0 ].data = (char * ) & xlrec ;
@@ -314,7 +309,7 @@ AlterSequence(AlterSeqStmt *stmt)
314
309
SeqTable elm ;
315
310
Relation seqrel ;
316
311
Buffer buf ;
317
- Page page ;
312
+ HeapTupleData seqtuple ;
318
313
Form_pg_sequence seq ;
319
314
FormData_pg_sequence new ;
320
315
List * owned_by ;
@@ -329,8 +324,7 @@ AlterSequence(AlterSeqStmt *stmt)
329
324
stmt -> sequence -> relname );
330
325
331
326
/* lock page' buffer and read tuple into new sequence structure */
332
- seq = read_info (elm , seqrel , & buf );
333
- page = BufferGetPage (buf );
327
+ seq = read_seq_tuple (elm , seqrel , & buf , & seqtuple );
334
328
335
329
/* Copy old values of options into workspace */
336
330
memcpy (& new , seq , sizeof (FormData_pg_sequence ));
@@ -343,10 +337,10 @@ AlterSequence(AlterSeqStmt *stmt)
343
337
elm -> cached = elm -> last ;
344
338
345
339
/* Now okay to update the on-disk tuple */
346
- memcpy (seq , & new , sizeof (FormData_pg_sequence ));
347
-
348
340
START_CRIT_SECTION ();
349
341
342
+ memcpy (seq , & new , sizeof (FormData_pg_sequence ));
343
+
350
344
MarkBufferDirty (buf );
351
345
352
346
/* XLOG stuff */
@@ -355,16 +349,16 @@ AlterSequence(AlterSeqStmt *stmt)
355
349
xl_seq_rec xlrec ;
356
350
XLogRecPtr recptr ;
357
351
XLogRecData rdata [2 ];
352
+ Page page = BufferGetPage (buf );
358
353
359
354
xlrec .node = seqrel -> rd_node ;
360
355
rdata [0 ].data = (char * ) & xlrec ;
361
356
rdata [0 ].len = sizeof (xl_seq_rec );
362
357
rdata [0 ].buffer = InvalidBuffer ;
363
358
rdata [0 ].next = & (rdata [1 ]);
364
359
365
- rdata [1 ].data = (char * ) page + ((PageHeader ) page )-> pd_upper ;
366
- rdata [1 ].len = ((PageHeader ) page )-> pd_special -
367
- ((PageHeader ) page )-> pd_upper ;
360
+ rdata [1 ].data = (char * ) seqtuple .t_data ;
361
+ rdata [1 ].len = seqtuple .t_len ;
368
362
rdata [1 ].buffer = InvalidBuffer ;
369
363
rdata [1 ].next = NULL ;
370
364
@@ -419,6 +413,7 @@ nextval_internal(Oid relid)
419
413
Relation seqrel ;
420
414
Buffer buf ;
421
415
Page page ;
416
+ HeapTupleData seqtuple ;
422
417
Form_pg_sequence seq ;
423
418
int64 incby ,
424
419
maxv ,
@@ -453,7 +448,7 @@ nextval_internal(Oid relid)
453
448
}
454
449
455
450
/* lock page' buffer and read tuple */
456
- seq = read_info (elm , seqrel , & buf );
451
+ seq = read_seq_tuple (elm , seqrel , & buf , & seqtuple );
457
452
page = BufferGetPage (buf );
458
453
459
454
last = next = result = seq -> last_value ;
@@ -465,9 +460,8 @@ nextval_internal(Oid relid)
465
460
466
461
if (!seq -> is_called )
467
462
{
468
- rescnt ++ ; /* last_value if not called */
463
+ rescnt ++ ; /* return last_value if not is_called */
469
464
fetch -- ;
470
- log -- ;
471
465
}
472
466
473
467
/*
@@ -480,7 +474,7 @@ nextval_internal(Oid relid)
480
474
* checkpoint would fail to advance the sequence past the logged values.
481
475
* In this case we may as well fetch extra values.
482
476
*/
483
- if (log < fetch )
477
+ if (log < fetch || ! seq -> is_called )
484
478
{
485
479
/* forced log to satisfy local demand for values */
486
480
fetch = log = fetch + SEQ_LOG_VALS ;
@@ -571,8 +565,18 @@ nextval_internal(Oid relid)
571
565
572
566
last_used_seq = elm ;
573
567
568
+ /* ready to change the on-disk (or really, in-buffer) tuple */
574
569
START_CRIT_SECTION ();
575
570
571
+ /*
572
+ * We must mark the buffer dirty before doing XLogInsert(); see notes in
573
+ * SyncOneBuffer(). However, we don't apply the desired changes just yet.
574
+ * This looks like a violation of the buffer update protocol, but it is
575
+ * in fact safe because we hold exclusive lock on the buffer. Any other
576
+ * process, including a checkpoint, that tries to examine the buffer
577
+ * contents will block until we release the lock, and then will see the
578
+ * final state that we install below.
579
+ */
576
580
MarkBufferDirty (buf );
577
581
578
582
/* XLOG stuff */
@@ -582,20 +586,26 @@ nextval_internal(Oid relid)
582
586
XLogRecPtr recptr ;
583
587
XLogRecData rdata [2 ];
584
588
585
- xlrec .node = seqrel -> rd_node ;
586
- rdata [0 ].data = (char * ) & xlrec ;
587
- rdata [0 ].len = sizeof (xl_seq_rec );
588
- rdata [0 ].buffer = InvalidBuffer ;
589
- rdata [0 ].next = & (rdata [1 ]);
589
+ /*
590
+ * We don't log the current state of the tuple, but rather the state
591
+ * as it would appear after "log" more fetches. This lets us skip
592
+ * that many future WAL records, at the cost that we lose those
593
+ * sequence values if we crash.
594
+ */
590
595
591
596
/* set values that will be saved in xlog */
592
597
seq -> last_value = next ;
593
598
seq -> is_called = true;
594
599
seq -> log_cnt = 0 ;
595
600
596
- rdata [1 ].data = (char * ) page + ((PageHeader ) page )-> pd_upper ;
597
- rdata [1 ].len = ((PageHeader ) page )-> pd_special -
598
- ((PageHeader ) page )-> pd_upper ;
601
+ xlrec .node = seqrel -> rd_node ;
602
+ rdata [0 ].data = (char * ) & xlrec ;
603
+ rdata [0 ].len = sizeof (xl_seq_rec );
604
+ rdata [0 ].buffer = InvalidBuffer ;
605
+ rdata [0 ].next = & (rdata [1 ]);
606
+
607
+ rdata [1 ].data = (char * ) seqtuple .t_data ;
608
+ rdata [1 ].len = seqtuple .t_len ;
599
609
rdata [1 ].buffer = InvalidBuffer ;
600
610
rdata [1 ].next = NULL ;
601
611
@@ -605,7 +615,7 @@ nextval_internal(Oid relid)
605
615
PageSetTLI (page , ThisTimeLineID );
606
616
}
607
617
608
- /* update on-disk data */
618
+ /* Now update sequence tuple to the intended final state */
609
619
seq -> last_value = last ; /* last fetched number */
610
620
seq -> is_called = true;
611
621
seq -> log_cnt = log ; /* how much is logged */
@@ -706,6 +716,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
706
716
SeqTable elm ;
707
717
Relation seqrel ;
708
718
Buffer buf ;
719
+ HeapTupleData seqtuple ;
709
720
Form_pg_sequence seq ;
710
721
711
722
/* open and AccessShareLock sequence */
@@ -718,7 +729,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
718
729
RelationGetRelationName (seqrel ))));
719
730
720
731
/* lock page' buffer and read tuple */
721
- seq = read_info (elm , seqrel , & buf );
732
+ seq = read_seq_tuple (elm , seqrel , & buf , & seqtuple );
722
733
723
734
if ((next < seq -> min_value ) || (next > seq -> max_value ))
724
735
{
@@ -746,8 +757,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
746
757
/* In any case, forget any future cached numbers */
747
758
elm -> cached = elm -> last ;
748
759
760
+ /* ready to change the on-disk (or really, in-buffer) tuple */
749
761
START_CRIT_SECTION ();
750
762
763
+ seq -> last_value = next ; /* last fetched number */
764
+ seq -> is_called = iscalled ;
765
+ seq -> log_cnt = 0 ;
766
+
751
767
MarkBufferDirty (buf );
752
768
753
769
/* XLOG stuff */
@@ -764,14 +780,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
764
780
rdata [0 ].buffer = InvalidBuffer ;
765
781
rdata [0 ].next = & (rdata [1 ]);
766
782
767
- /* set values that will be saved in xlog */
768
- seq -> last_value = next ;
769
- seq -> is_called = true;
770
- seq -> log_cnt = 0 ;
771
-
772
- rdata [1 ].data = (char * ) page + ((PageHeader ) page )-> pd_upper ;
773
- rdata [1 ].len = ((PageHeader ) page )-> pd_special -
774
- ((PageHeader ) page )-> pd_upper ;
783
+ rdata [1 ].data = (char * ) seqtuple .t_data ;
784
+ rdata [1 ].len = seqtuple .t_len ;
775
785
rdata [1 ].buffer = InvalidBuffer ;
776
786
rdata [1 ].next = NULL ;
777
787
@@ -781,11 +791,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
781
791
PageSetTLI (page , ThisTimeLineID );
782
792
}
783
793
784
- /* save info in sequence relation */
785
- seq -> last_value = next ; /* last fetched number */
786
- seq -> is_called = iscalled ;
787
- seq -> log_cnt = (iscalled ) ? 0 : 1 ;
788
-
789
794
END_CRIT_SECTION ();
790
795
791
796
UnlockReleaseBuffer (buf );
@@ -925,13 +930,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
925
930
}
926
931
927
932
928
- /* Given an opened relation, lock the page buffer and find the tuple */
933
+ /*
934
+ * Given an opened sequence relation, lock the page buffer and find the tuple
935
+ *
936
+ * *buf receives the reference to the pinned-and-ex-locked buffer
937
+ * *seqtuple receives the reference to the sequence tuple proper
938
+ * (this arg should point to a local variable of type HeapTupleData)
939
+ *
940
+ * Function's return value points to the data payload of the tuple
941
+ */
929
942
static Form_pg_sequence
930
- read_info (SeqTable elm , Relation rel , Buffer * buf )
943
+ read_seq_tuple (SeqTable elm , Relation rel , Buffer * buf , HeapTuple seqtuple )
931
944
{
932
945
PageHeader page ;
933
946
ItemId lp ;
934
- HeapTupleData tuple ;
935
947
sequence_magic * sm ;
936
948
Form_pg_sequence seq ;
937
949
@@ -947,7 +959,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
947
959
948
960
lp = PageGetItemId (page , FirstOffsetNumber );
949
961
Assert (ItemIdIsNormal (lp ));
950
- tuple .t_data = (HeapTupleHeader ) PageGetItem ((Page ) page , lp );
962
+
963
+ /* Note we currently only bother to set these two fields of *seqtuple */
964
+ seqtuple -> t_data = (HeapTupleHeader ) PageGetItem ((Page ) page , lp );
965
+ seqtuple -> t_len = ItemIdGetLength (lp );
951
966
952
967
/*
953
968
* Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
@@ -957,15 +972,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
957
972
* hint bit update, ie, don't bother to WAL-log it, since we can certainly
958
973
* do this again if the update gets lost.
959
974
*/
960
- if (HeapTupleHeaderGetXmax (tuple . t_data ) != InvalidTransactionId )
975
+ if (HeapTupleHeaderGetXmax (seqtuple -> t_data ) != InvalidTransactionId )
961
976
{
962
- HeapTupleHeaderSetXmax (tuple . t_data , InvalidTransactionId );
963
- tuple . t_data -> t_infomask &= ~HEAP_XMAX_COMMITTED ;
964
- tuple . t_data -> t_infomask |= HEAP_XMAX_INVALID ;
977
+ HeapTupleHeaderSetXmax (seqtuple -> t_data , InvalidTransactionId );
978
+ seqtuple -> t_data -> t_infomask &= ~HEAP_XMAX_COMMITTED ;
979
+ seqtuple -> t_data -> t_infomask |= HEAP_XMAX_INVALID ;
965
980
SetBufferCommitInfoNeedsSave (* buf );
966
981
}
967
982
968
- seq = (Form_pg_sequence ) GETSTRUCT (& tuple );
983
+ seq = (Form_pg_sequence ) GETSTRUCT (seqtuple );
969
984
970
985
/* this is a handy place to update our copy of the increment */
971
986
elm -> increment = seq -> increment_by ;
@@ -1065,6 +1080,13 @@ init_params(List *options, bool isInit,
1065
1080
defel -> defname );
1066
1081
}
1067
1082
1083
+ /*
1084
+ * We must reset log_cnt when isInit or when changing any parameters
1085
+ * that would affect future nextval allocations.
1086
+ */
1087
+ if (isInit )
1088
+ new -> log_cnt = 0 ;
1089
+
1068
1090
/* INCREMENT BY */
1069
1091
if (increment_by != NULL )
1070
1092
{
@@ -1073,6 +1095,7 @@ init_params(List *options, bool isInit,
1073
1095
ereport (ERROR ,
1074
1096
(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
1075
1097
errmsg ("INCREMENT must not be zero" )));
1098
+ new -> log_cnt = 0 ;
1076
1099
}
1077
1100
else if (isInit )
1078
1101
new -> increment_by = 1 ;
@@ -1082,30 +1105,39 @@ init_params(List *options, bool isInit,
1082
1105
{
1083
1106
new -> is_cycled = intVal (is_cycled -> arg );
1084
1107
Assert (new -> is_cycled == false || new -> is_cycled == true);
1108
+ new -> log_cnt = 0 ;
1085
1109
}
1086
1110
else if (isInit )
1087
1111
new -> is_cycled = false;
1088
1112
1089
1113
/* MAXVALUE (null arg means NO MAXVALUE) */
1090
1114
if (max_value != NULL && max_value -> arg )
1115
+ {
1091
1116
new -> max_value = defGetInt64 (max_value );
1117
+ new -> log_cnt = 0 ;
1118
+ }
1092
1119
else if (isInit || max_value != NULL )
1093
1120
{
1094
1121
if (new -> increment_by > 0 )
1095
1122
new -> max_value = SEQ_MAXVALUE ; /* ascending seq */
1096
1123
else
1097
1124
new -> max_value = -1 ; /* descending seq */
1125
+ new -> log_cnt = 0 ;
1098
1126
}
1099
1127
1100
1128
/* MINVALUE (null arg means NO MINVALUE) */
1101
1129
if (min_value != NULL && min_value -> arg )
1130
+ {
1102
1131
new -> min_value = defGetInt64 (min_value );
1132
+ new -> log_cnt = 0 ;
1133
+ }
1103
1134
else if (isInit || min_value != NULL )
1104
1135
{
1105
1136
if (new -> increment_by > 0 )
1106
1137
new -> min_value = 1 ; /* ascending seq */
1107
1138
else
1108
1139
new -> min_value = SEQ_MINVALUE ; /* descending seq */
1140
+ new -> log_cnt = 0 ;
1109
1141
}
1110
1142
1111
1143
/* crosscheck min/max */
@@ -1179,6 +1211,7 @@ init_params(List *options, bool isInit,
1179
1211
errmsg ("CACHE (%s) must be greater than zero" ,
1180
1212
buf )));
1181
1213
}
1214
+ new -> log_cnt = 0 ;
1182
1215
}
1183
1216
else if (isInit )
1184
1217
new -> cache_value = 1 ;
@@ -1306,7 +1339,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
1306
1339
1307
1340
item = (char * ) xlrec + sizeof (xl_seq_rec );
1308
1341
itemsz = record -> xl_len - sizeof (xl_seq_rec );
1309
- itemsz = MAXALIGN ( itemsz );
1342
+
1310
1343
if (PageAddItem (page , (Item ) item , itemsz ,
1311
1344
FirstOffsetNumber , false, false) == InvalidOffsetNumber )
1312
1345
elog (PANIC , "seq_redo: failed to add item to page" );
0 commit comments