8000 Fix longstanding crash-safety bug with newly-created-or-reset sequences. · sureandrew/postgres@8468bcc · GitHub
[go: up one dir, main page]

Skip to content

Commit 8468bcc

Browse files
committed
Fix longstanding crash-safety bug with newly-created-or-reset sequences.
If a crash occurred immediately after the first nextval() call for a serial column, WAL replay would restore the sequence to a state in which it appeared that no nextval() had been done, thus allowing the first sequence value to be returned again by the next nextval() call; as reported in bug #6748 from Xiangming Mei. More generally, the problem would occur if an ALTER SEQUENCE was executed on a freshly created or reset sequence. (The manifestation with serial columns was introduced in 8.2 when we added an ALTER SEQUENCE OWNED BY step to serial column creation.) The cause is that sequence creation attempted to save one WAL entry by writing out a WAL record that made it appear that the first nextval() had already happened (viz, with is_called = true), while marking the sequence's in-database state with log_cnt = 1 to show that the first nextval() need not emit a WAL record. However, ALTER SEQUENCE would emit a new WAL entry reflecting the actual in-database state (with is_called = false). Then, nextval would allocate the first sequence value and set is_called = true, but it would trust the log_cnt value and not emit any WAL record. A crash at this point would thus restore the sequence to its post-ALTER state, causing the next nextval() call to return the first sequence value again. To fix, get rid of the idea of logging an is_called status different from reality. This means that the first nextval-driven WAL record will happen at the first nextval call not the second, but the marginal cost of that is pretty negligible. In addition, make sure that ALTER SEQUENCE resets log_cnt to zero in any case where it touches sequence parameters that affect future nextval results. This will result in some user-visible changes in the contents of a sequence's log_cnt column, as reflected in the patch's regression test changes; but no application should be depending on that anyway, since it was already true that log_cnt changes rather unpredictably depending on checkpoint timing. In addition, make some basically-cosmetic improvements to get rid of sequence.c's undesirable intimacy with page layout details. It was always really trying to WAL-log the contents of the sequence tuple, so we should have it do that directly using a HeapTuple's t_data and t_len, rather than backing into it with some magic assumptions about where the tuple would be on the sequence's page. Back-patch to all supported branches.
1 parent 038c36b commit 8468bcc

File tree

2 files changed

+89
-56
lines changed

2 files changed

+89
-56
lines changed

src/backend/commands/sequence.c

Lines changed: 88 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
/*
3737
* We don't want to log each fetching of a value from a sequence,
3838
* so we pre-log a few fetches in advance. In the event of
39-
* crash we can lose as much as we pre-logged.
39+
* crash we can lose (skip over) as many values as we pre-logged.
4040
*/
4141
#define SEQ_LOG_VALS 32
4242

@@ -70,7 +70,7 @@ typedef struct SeqTableData
7070
int64 cached; /* last value already cached for nextval */
7171
/* if last != cached, we have not used up all the cached values */
7272
int64 increment; /* copy of sequence's increment field */
73-
/* note that increment is zero until we first do read_info() */
73+
/* note that increment is zero until we first do read_seq_tuple() */
7474
} SeqTableData;
7575

7676
typedef SeqTableData *SeqTable;
@@ -86,7 +86,8 @@ static SeqTableData *last_used_seq = NULL;
8686
static int64 nextval_internal(Oid relid);
8787
static Relation open_share_lock(SeqTable seq);
8888
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
89-
static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf);
89+
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
90+
Buffer *buf, HeapTuple seqtuple);
9091
static void init_params(List *options, bool isInit,
9192
Form_pg_sequence new, List **owned_by);
9293
static void do_setval(Oid relid, int64 next, bool iscalled);
@@ -171,7 +172,7 @@ DefineSequence(CreateSeqStmt *seq)
171172
case SEQ_COL_LOG:
172173
coldef->typename = makeTypeNameFromOid(INT8OID, -1);
173174
coldef->colname = "log_cnt";
174-
value[i - 1] = Int64GetDatum((int64) 1);
175+
value[i - 1] = Int64GetDatum((int64) 0);
175176
break;
176177
case SEQ_COL_CYCLE:
177178
coldef->typename = makeTypeNameFromOid(BOOLOID, -1);
@@ -267,12 +268,6 @@ DefineSequence(CreateSeqStmt *seq)
267268
xl_seq_rec xlrec;
268269
XLogRecPtr recptr;
269270
XLogRecData rdata[2];
270-
Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
271-
272-
/* We do not log first nextval call, so "advance" sequence here */
273-
/* Note we are scribbling on local tuple, not the disk buffer */
274-
newseq->is_called = true;
275-
newseq->log_cnt = 0;
276271

277272
xlrec.node = rel->rd_node;
278273
rdata[0].data = (char *) &xlrec;
@@ -314,7 +309,7 @@ AlterSequence(AlterSeqStmt *stmt)
314309
SeqTable elm;
315310
Relation seqrel;
316311
Buffer buf;
317-
Page page;
312+
HeapTupleData seqtuple;
318313
Form_pg_sequence seq;
319314
FormData_pg_sequence new;
320315
List *owned_by;
@@ -329,8 +324,7 @@ AlterSequence(AlterSeqStmt *stmt)
329324
stmt->sequence->relname);
330325

331326
/* lock page' buffer and read tuple into new sequence structure */
332-
seq = read_info(elm, seqrel, &buf);
333-
page = BufferGetPage(buf);
327+
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
334328

335329
/* Copy old values of options into workspace */
336330
memcpy(&new, seq, sizeof(FormData_pg_sequence));
@@ -343,10 +337,10 @@ AlterSequence(AlterSeqStmt *stmt)
343337
elm->cached = elm->last;
344338

345339
/* Now okay to update the on-disk tuple */
346-
memcpy(seq, &new, sizeof(FormData_pg_sequence));
347-
348340
START_CRIT_SECTION();
349341

342+
memcpy(seq, &new, sizeof(FormData_pg_sequence));
343+
350344
MarkBufferDirty(buf);
351345

352346
/* XLOG stuff */
@@ -355,16 +349,16 @@ AlterSequence(AlterSeqStmt *stmt)
355349
xl_seq_rec xlrec;
356350
XLogRecPtr recptr;
357351
XLogRecData rdata[2];
352+
Page page = BufferGetPage(buf);
358353

359354
xlrec.node = seqrel->rd_node;
360355
rdata[0].data = (char *) &xlrec;
361356
rdata[0].len = sizeof(xl_seq_rec);
362357
rdata[0].buffer = InvalidBuffer;
363358
rdata[0].next = &(rdata[1]);
364359

365-
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
366-
rdata[1].len = ((PageHeader) page)->pd_special -
367-
((PageHeader) page)->pd_upper;
360+
rdata[1].data = (char *) seqtuple.t_data;
361+
rdata[1].len = seqtuple.t_len;
368362
rdata[1].buffer = InvalidBuffer;
369363
rdata[1].next = NULL;
370364

@@ -419,6 +413,7 @@ nextval_internal(Oid relid)
419413
Relation seqrel;
420414
Buffer buf;
421415
Page page;
416+
HeapTupleData seqtuple;
422417
Form_pg_sequence seq;
423418
int64 incby,
424419
maxv,
@@ -453,7 +448,7 @@ nextval_internal(Oid relid)
453448
}
454449

455450
/* lock page' buffer and read tuple */
456-
seq = read_info(elm, seqrel, &buf);
451+
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
457452
page = BufferGetPage(buf);
458453

459454
last = next = result = seq->last_value;
@@ -465,9 +460,8 @@ nextval_internal(Oid relid)
465460

466461
if (!seq->is_called)
467462
{
468-
rescnt++; /* last_value if not called */
463+
rescnt++; /* return last_value if not is_called */
469464
fetch--;
470-
log--;
471465
}
472466

473467
/*
@@ -480,7 +474,7 @@ nextval_internal(Oid relid)
480474
* checkpoint would fail to advance the sequence past the logged values.
481475
* In this case we may as well fetch extra values.
482476
*/
483-
if (log < fetch)
477+
if (log < fetch || !seq->is_called)
484478
{
485479
/* forced log to satisfy local demand for values */
486480
fetch = log = fetch + SEQ_LOG_VALS;
@@ -571,8 +565,18 @@ nextval_internal(Oid relid)
571565

572566
last_used_seq = elm;
573567

568+
/* ready to change the on-disk (or really, in-buffer) tuple */
574569
START_CRIT_SECTION();
575570

571+
/*
572+
* We must mark the buffer dirty before doing XLogInsert(); see notes in
573+
* SyncOneBuffer(). However, we don't apply the desired changes just yet.
574+
* This looks like a violation of the buffer update protocol, but it is
575+
* in fact safe because we hold exclusive lock on the buffer. Any other
576+
* process, including a checkpoint, that tries to examine the buffer
577+
* contents will block until we release the lock, and then will see the
578+
* final state that we install below.
579+
*/
576580
MarkBufferDirty(buf);
577581

578582
/* XLOG stuff */
@@ -582,20 +586,26 @@ nextval_internal(Oid relid)
582586
XLogRecPtr recptr;
583587
XLogRecData rdata[2];
584588

585-
xlrec.node = seqrel->rd_node;
586-
rdata[0].data = (char *) &xlrec;
587-
rdata[0].len = sizeof(xl_seq_rec);
588-
rdata[0].buffer = InvalidBuffer;
589-
rdata[0].next = &(rdata[1]);
589+
/*
590+
* We don't log the current state of the tuple, but rather the state
591+
* as it would appear after "log" more fetches. This lets us skip
592+
* that many future WAL records, at the cost that we lose those
593+
* sequence values if we crash.
594+
*/
590595

591596
/* set values that will be saved in xlog */
592597
seq->last_value = next;
593598
seq->is_called = true;
594599
seq->log_cnt = 0;
595600

596-
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
597-
rdata[1].len = ((PageHeader) page)->pd_special -
598-
((PageHeader) page)->pd_upper;
601+
xlrec.node = seqrel->rd_node;
602+
rdata[0].data = (char *) &xlrec;
603+
rdata[0].len = sizeof(xl_seq_rec);
604+
rdata[0].buffer = InvalidBuffer;
605+
rdata[0].next = &(rdata[1]);
606+
607+
rdata[1].data = (char *) seqtuple.t_data;
608+
rdata[1].len = seqtuple.t_len;
599609
rdata[1].buffer = InvalidBuffer;
600610
rdata[1].next = NULL;
601611

@@ -605,7 +615,7 @@ nextval_internal(Oid relid)
605615
PageSetTLI(page, ThisTimeLineID);
606616
}
607617

608-
/* update on-disk data */
618+
/* Now update sequence tuple to the intended final state */
609619
seq->last_value = last; /* last fetched number */
610620
seq->is_called = true;
611621
seq->log_cnt = log; /* how much is logged */
@@ -706,6 +716,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
706716
SeqTable elm;
707717
Relation seqrel;
708718
Buffer buf;
719+
HeapTupleData seqtuple;
709720
Form_pg_sequence seq;
710721

711722
/* open and AccessShareLock sequence */
@@ -718,7 +729,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
718729
RelationGetRelationName(seqrel))));
719730

720731
/* lock page' buffer and read tuple */
721-
seq = read_info(elm, seqrel, &buf);
732+
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
722733

723734
if ((next < seq->min_value) || (next > seq->max_value))
724735
{
@@ -746,8 +757,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
746757
/* In any case, forget any future cached numbers */
747758
elm->cached = elm->last;
748759

760+
/* ready to change the on-disk (or really, in-buffer) tuple */
749761
START_CRIT_SECTION();
750762

763+
seq->last_value = next; /* last fetched number */
764+
seq->is_called = iscalled;
765+
seq->log_cnt = 0;
766+
751767
MarkBufferDirty(buf);
752768

753769
/* XLOG stuff */
@@ -764,14 +780,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
764780
rdata[0].buffer = InvalidBuffer;
765781
rdata[0].next = &(rdata[1]);
766782

767-
/* set values that will be saved in xlog */
768-
seq->last_value = next;
769-
seq->is_called = true;
770-
seq->log_cnt = 0;
771-
772-
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
773-
rdata[1].len = ((PageHeader) page)->pd_special -
774-
((PageHeader) page)->pd_upper;
783+
rdata[1].data = (char *) seqtuple.t_data;
784+
rdata[1].len = seqtuple.t_len;
775785
rdata[1].buffer = InvalidBuffer;
776786
rdata[1].next = NULL;
777787

@@ -781,11 +791,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
781791
PageSetTLI(page, ThisTimeLineID);
782792
}
783793

784-
/* save info in sequence relation */
785-
seq->last_value = next; /* last fetched number */
786-
seq->is_called = iscalled;
787-
seq->log_cnt = (iscalled) ? 0 : 1;
788-
789794
END_CRIT_SECTION();
790795

791796
UnlockReleaseBuffer(buf);
@@ -925,13 +930,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
925930
}
926931

927932

928-
/* Given an opened relation, lock the page buffer and find the tuple */
933+
/*
934+
* Given an opened sequence relation, lock the page buffer and find the tuple
935+
*
936+
* *buf receives the reference to the pinned-and-ex-locked buffer
937+
* *seqtuple receives the reference to the sequence tuple proper
938+
* (this arg should point to a local variable of type HeapTupleData)
939+
*
940+
* Function's return value points to the data payload of the tuple
941+
*/
929942
static Form_pg_sequence
930-
read_info(SeqTable elm, Relation rel, Buffer *buf)
943+
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
931944
{
932945
PageHeader page;
933946
ItemId lp;
934-
HeapTupleData tuple;
935947
sequence_magic *sm;
936948
Form_pg_sequence seq;
937949

@@ -947,7 +959,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
947959

948960
lp = PageGetItemId(page, FirstOffsetNumber);
949961
Assert(ItemIdIsNormal(lp));
950-
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
962+
963+
/* Note we currently only bother to set these two fields of *seqtuple */
964+
seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
965+
seqtuple->t_len = ItemIdGetLength(lp);
951966

952967
/*
953968
* Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
@@ -957,15 +972,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
957972
* hint bit update, ie, don't bother to WAL-log it, since we can certainly
958973
* do this again if the update gets lost.
959974
*/
960-
if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId)
975+
if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
961976
{
962-
HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId);
963-
tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
964-
tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
977+
HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
978+
seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
979+
seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
965980
SetBufferCommitInfoNeedsSave(*buf);
966981
}
967982

968-
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
983+
seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
969984

970985
/* this is a handy place to update our copy of the increment */
971986
elm->increment = seq->increment_by;
@@ -1065,6 +1080,13 @@ init_params(List *options, bool isInit,
10651080
defel->defname);
10661081
}
10671082

1083+
/*
1084+
* We must reset log_cnt when isInit or when changing any parameters
1085+
* that would affect future nextval allocations.
1086+
*/
1087+
if (isInit)
1088+
new->log_cnt = 0;
1089+
10681090
/* INCREMENT BY */
10691091
if (increment_by != NULL)
10701092
{
@@ -1073,6 +1095,7 @@ init_params(List *options, bool isInit,
10731095
ereport(ERROR,
10741096
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
10751097
errmsg("INCREMENT must not be zero")));
1098+
new->log_cnt = 0;
10761099
}
10771100
else if (isInit)
10781101
new->increment_by = 1;
@@ -1082,30 +1105,39 @@ init_params(List *options, bool isInit,
10821105
{
10831106
new->is_cycled = intVal(is_cycled->arg);
10841107
Assert(new->is_cycled == false || new->is_cycled == true);
1108+
new->log_cnt = 0;
10851109
}
10861110
else if (isInit)
10871111
new->is_cycled = false;
10881112

10891113
/* MAXVALUE (null arg means NO MAXVALUE) */
10901114
if (max_value != NULL && max_value->arg)
1115+
{
10911116
new->max_value = defGetInt64(max_value);
1117+
new->log_cnt = 0;
1118+
}
10921119
else if (isInit || max_value != NULL)
10931120
{
10941121
if (new->increment_by > 0)
10951122
new->max_value = SEQ_MAXVALUE; /* ascending seq */
10961123
else
10971124
new->max_value = -1; /* descending seq */
1125+
new->log_cnt = 0;
10981126
}
10991127

11001128
/* MINVALUE (null arg means NO MINVALUE) */
11011129
if (min_value != NULL && min_value->arg)
1130+
{
11021131
new->min_value = defGetInt64(min_value);
1132+
new->log_cnt = 0;
1133+
}
11031134
else if (isInit || min_value != NULL)
11041135
{
11051136
if (new->increment_by > 0)
11061137
new->min_value = 1; /* ascending seq */
11071138
else
11081139
new->min_value = SEQ_MINVALUE; /* descending seq */
1140+
new->log_cnt = 0;
11091141
}
11101142

11111143
/* crosscheck min/max */
@@ -1179,6 +1211,7 @@ init_params(List *options, bool isInit,
11791211
errmsg("CACHE (%s) must be greater than zero",
11801212
buf)));
11811213
}
1214+
new->log_cnt = 0;
11821215
}
11831216
else if (isInit)
11841217
new->cache_value = 1;
@@ -1306,7 +1339,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
13061339

13071340
item = (char *) xlrec + sizeof(xl_seq_rec);
13081341
itemsz = record->xl_len - sizeof(xl_seq_rec);
1309-
itemsz = MAXALIGN(itemsz);
1342+
13101343
if (PageAddItem(page, (Item) item, itemsz,
13111344
FirstOffsetNumber, false, false) == InvalidOffsetNumber)
13121345
elog(PANIC, "seq_redo: failed to add item to page");

0 commit comments

Comments
 (0)
0