8000 Support INSERT ... ON CONFLICT {UPDATE | IGNORE} · petergeoghegan/postgres@ba7bcd1 · GitHub
[go: up one dir, main page]

Skip to content

Commit ba7bcd1

Browse files
author
Peter Geoghegan
committed
Support INSERT ... ON CONFLICT {UPDATE | IGNORE}
This non-standard INSERT clause allows DML statement authors to specify that in the event of each of any of the tuples being inserted duplicating an existing tuple in terms of a value or set of values constrained by a unique index, an alternative path may be taken. The statement may alternatively IGNORE the tuple being inserted without raising an error, or go to UPDATE the existing tuple whose value is duplicated by a value within one single tuple proposed for insertion. The implementation loops until either an insert or an UPDATE/IGNORE occurs. No existing tuple may be affected more than once per INSERT. This is implemented using a new infrastructure called "speculative insertion". (The approach to "Value locking" presenting here follows design postgres#2, as described on https://wiki.postgresql.org/wiki/Value_locking). Alternatively, we may go to UPDATE, using the EvalPlanQual() mechanism to execute a special auxiliary plan. READ COMMITTED isolation level is permitted to UPDATE a tuple even where no version is visible to the command's MVCC snapshot. Similarly, any query predicate associated with the UPDATE portion of the new statement need only satisfy an already locked, conclusively committed and visible conflict tuple. When the predicate isn't satisfied, the tuple is still locked, which implies that at READ COMMITTED, a tuple may be locked without any version being visible to the command's MVCC snapshot. Users specify a single unique index to take the alternative path on, which is inferred from a set of user-supplied column names (or expressions). This is mandatory for the ON CONFLICT UPDATE variant, which should address concerns about spuriously taking an incorrect alternative ON CONFLICT path (i.e. the wrong unique index is used for arbitration of whether or not to take the alternative path) due to there being more than one would-be unique violation. Previous revisions of the patch didn't mandate this. However, we may still IGNORE based on the first would-be unique violation detected, on the assumption that it doesn't particularly matter where it originated from for that variant (iff the user didn't make a point of indicated his or her intent). The auxiliary ModifyTable plan used by the UPDATE portion of the new statement is not formally a subplan of its parent INSERT ModifyTable plan. Rather, it's an independently planned subquery, whose execution is tightly driven by its parent. Special auxiliary state pertaining to the auxiliary UPDATE is tracked by its parent through all stages of query execution. The optimizer imposes some restrictions on child auxiliary UPDATE plans, which make the plans comport with their parent to the extent required during the executor stage. One user-visible consequences of this is that the special auxiliary UPDATE query cannot have subselects within its targetlist or WHERE clause. UPDATEs may not reference any other table, and UPDATE FROM is disallowed. INSERT's RETURNING clause continues to only project tuples actually inserted.
1 parent e291312 commit ba7bcd1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1956
-115
lines changed

contrib/pg_stat_statements/pg_stat_statements.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2198,6 +2198,9 @@ JumbleQuery(pgssJumbleState *jstate, Query *query)
21982198
JumbleRangeTable(jstate, query->rtable);
21992199
JumbleExpr(jstate, (Node *) query->jointree);
22002200
JumbleExpr(jstate, (Node *) query->targetList);
2201+
JumbleExpr(jstate, (Node *) query->arbiterExpr);
2202+
if (query->onConflict)
2203+
JumbleQuery(jstate, (Query *) query->onConflict);
22012204
JumbleExpr(jstate, (Node *) query->returningList);
22022205
JumbleExpr(jstate, (Node *) query->groupClause);
22032206
JumbleExpr(jstate, query->havingQual);

contrib/postgres_fdw/deparse.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -834,8 +834,8 @@ appendWhereClause(StringInfo buf,
834834
void
835835
deparseInsertSql(StringInfo buf, PlannerInfo *root,
836836
Index rtindex, Relation rel,
837-
List *targetAttrs, List *returningList,
838-
List **retrieved_attrs)
837+
List *targetAttrs, bool ignore,
838+
List *returningList, List **retrieved_attrs)
839839
{
840840
AttrNumber pindex;
841841
bool first;
@@ -879,6 +879,9 @@ deparseInsertSql(StringInfo buf, PlannerInfo *root,
879879
else
880880
appendStringInfoString(buf, " DEFAULT VALUES");
881881

882+
if (ignore)
883+
appendStringInfoString(buf, " ON CONFLICT IGNORE");
884+
882885
deparseReturningList(buf, root, rtindex, rel,
883886
rel->trigdesc && rel->trigdesc->trig_insert_after_row,
884887
returningList, retrieved_attrs);

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1166,6 +1166,7 @@ postgresPlanForeignModify(PlannerInfo *root,
11661166
List *targetAttrs = NIL;
11671167
List *returningList = NIL;
11681168
List *retrieved_attrs = NIL;
1169+
bool ignore = false;
11691170

11701171
initStringInfo(&sql);
11711172

@@ -1215,14 +1216,25 @@ postgresPlanForeignModify(PlannerInfo *root,
12151216
if (plan->returningLists)
12161217
returningList = (List *) list_nth(plan->returningLists, subplan_index);
12171218

1219+
if (root->parse->arbiterExpr)
1220+
ereport(ERROR,
1221+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1222+
errmsg("postgres_fdw does not support ON CONFLICT unique index inference")));
1223+
else if (plan->spec == SPEC_INSERT)
1224+
ereport(ERROR,
1225+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1226+
errmsg("postgres_fdw does not support ON CONFLICT UPDATE")));
1227+
else if (plan->spec == SPEC_IGNORE)
1228+
ignore = true;
1229+
12181230
/*
12191231
* Construct the SQL command string.
12201232
*/
12211233
switch (operation)
12221234
{
12231235
case CMD_INSERT:
12241236
deparseInsertSql(&sql, root, resultRelation, rel,
1225-
targetAttrs, returningList,
1237+
targetAttrs, ignore, returningList,
12261238
&retrieved_attrs);
12271239
break;
12281240
case CMD_UPDATE:

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ extern void appendWhereClause(StringInfo buf,
6060
List **params);
6161
extern void deparseInsertSql(StringInfo buf, PlannerInfo *root,
6262
Index rtindex, Relation rel,
63-
List *targetAttrs, List *returningList,
63+
List *targetAttrs, bool ignore, List *returningList,
6464
List **retrieved_attrs);
6565
extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
6666
Index rtindex, Relation rel,

src/backend/access/heap/heapam.c

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,6 +2048,9 @@ FreeBulkInsertState(BulkInsertState bistate)
20482048
* This causes rows to be frozen, which is an MVCC violation and
20492049
* requires explicit options chosen by user.
20502050
*
2051+
* If HEAP_INSERT_SPECULATIVE is specified, the MyProc->specInsert fields
2052+
* are filled.
2053+
*
20512054
* Note that these options will be applied when inserting into the heap's
20522055
* TOAST table, too, if the tuple requires any out-of-line data.
20532056
*
@@ -2216,6 +2219,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
22162219

22172220
END_CRIT_SECTION();
22182221

2222+
/*
2223+
* Let others know that we speculatively inserted this tuple, before
2224+
* releasing the buffer lock.
2225+
*/
2226+
if (options & HEAP_INSERT_SPECULATIVE)
2227+
SetSpeculativeInsertion(relation->rd_node, &heaptup->t_self);
2228+
22192229
UnlockReleaseBuffer(buffer);
22202230
if (vmbuffer != InvalidBuffer)
22212231
ReleaseBuffer(vmbuffer);
@@ -2655,11 +2665,17 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
26552665
* (the last only for HeapTupleSelfUpdated, since we
26562666
* cannot obtain cmax from a combocid generated by another transaction).
26572667
* See comments for struct HeapUpdateFailureData for additional info.
2668+
*
2669+
* If 'killspeculative' is true, we're "super-deleting" a tuple we just
2670+
* inserted in the same command. Instead of the normal visibility checks, we
2671+
* check that the tuple was inserted by the current transaction and given
2672+
* command id. Also, instead of setting its xmax, we set xmin to invalid,
2673+
* making it immediately appear as dead to everyone.
26582674
*/
26592675
HTSU_Result
26602676
heap_delete(Relation relation, ItemPointer tid,
26612677
CommandId cid, Snapshot crosscheck, bool wait,
2662-
HeapUpdateFailureData *hufd)
2678+
HeapUpdateFailureData *hufd, bool killspeculative)
26632679
{
26642680
HTSU_Result result;
26652681
TransactionId xid = GetCurrentTransactionId();
@@ -2717,7 +2733,16 @@ heap_delete(Relation relation, ItemPointer tid,
27172733
tp.t_self = *tid;
27182734

27192735
l1:
2720-
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
2736+
if (!killspeculative)
2737+
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
2738+
else
2739+
{
2740+
if (tp.t_data->t_choice.t_heap.t_xmin != xid ||
2741+
tp.t_data->t_choice.t_heap.t_field3.t_cid != cid)
2742+
elog(ERROR, "attempted to kill a tuple inserted by another transaction or command");
2743+
result = HeapTupleMayBeUpdated;
2744+
}
2745+
27212746

27222747
if (result == HeapTupleInvisible)
27232748
{
@@ -2865,12 +2890,15 @@ heap_delete(Relation relation, ItemPointer tid,
28652890
* using our own TransactionId below, since some other backend could
28662891
* incorporate our XID into a MultiXact immediately afterwards.)
28672892
*/
2868-
MultiXactIdSetOldestMember();
2893+
if (!killspeculative)
2894+
{
2895+
MultiXactIdSetOldestMember();
28692896

2870-
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
2871-
tp.t_data->t_infomask, tp.t_data->t_infomask2,
2872-
xid, LockTupleExclusive, true,
2873-
&new_xmax, &new_infomask, &new_infomask2);
2897+
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
2898+
tp.t_data->t_infomask, tp.t_data->t_infomask2,
2899+
xid, LockTupleExclusive, true,
2900+
&new_xmax, &new_infomask, &new_infomask2);
2901+
}
28742902

28752903
START_CRIT_SECTION();
28762904

@@ -2897,8 +2925,21 @@ heap_delete(Relation relation, ItemPointer tid,
28972925
tp.t_data->t_infomask |= new_infomask;
28982926
tp.t_data->t_infomask2 |= new_infomask2;
28992927
HeapTupleHeaderClearHotUpdated(tp.t_data);
2900-
HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
2901-
HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
2928+
/*
2929+
* When killing a speculatively-inserted tuple, we set xmin to invalid
2930+
* instead of setting xmax, to make the tuple clearly invisible to
2931+
* everyone. In particular, we want HeapTupleSatisfiesDirty() to regard
2932+
* the tuple as dead, so that another backend inserting a duplicate key
2933+
* value won't unnecessarily wait for our transaction to finish.
2934+
*/
2935+
if (killspeculative)
2936+
HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId);
2937+
else
2938+
{
2939+
HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
2940+
HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
2941+
}
2942+
29022943
/* Make sure there is no forward chain link in t_ctid */
29032944
tp.t_data->t_ctid = tp.t_self;
29042945

@@ -2915,7 +2956,11 @@ heap_delete(Relation relation, ItemPointer tid,
29152956
if (RelationIsAccessibleInLogicalDecoding(relation))
29162957
log_heap_new_cid(relation, &tp);
29172958

2918-
xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
2959+
xlrec.flags = 0;
2960+
if (all_visible_cleared)
2961+
xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
2962+
if (killspeculative)
2963+
xlrec.flags |= XLOG_HEAP_KILLED_SPECULATIVE_TUPLE;
29192964
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
29202965
tp.t_data->t_infomask2);
29212966
xlrec.target.node = relation->rd_node;
@@ -3031,7 +3076,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
30313076
result = heap_delete(relation, tid,
30323077
GetCurrentCommandId(true), InvalidSnapshot,
30333078
true /* wait for commit */ ,
3034-
&hufd);
3079+
&hufd, false);
30353080
switch (result)
30363081
{
30373082
case HeapTupleSelfUpdated:
@@ -4117,15 +4162,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
41174162
*
41184163
* Function result may be:
41194164
* HeapTupleMayBeUpdated: lock was successfully acquired
4165+
* HeapTupleInvisible: lock failed because tuple instantaneously invisible
41204166
* HeapTupleSelfUpdated: lock failed because tuple updated by self
41214167
* HeapTupleUpdated: lock failed because tuple updated by other xact
41224168
* HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip
41234169
*
4124-
* In the failure cases, the routine fills *hufd with the tuple's t_ctid,
4125-
* t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
4126-
* (the last only for HeapTupleSelfUpdated, since we
4127-
* cannot obtain cmax from a combocid generated by another transaction).
4128-
* See comments for struct HeapUpdateFailureData for additional info.
4170+
* In the failure cases other than HeapTupleInvisible, the routine fills *hufd
4171+
* with the tuple's t_ctid, t_xmax (resolving a possible MultiXact, if
4172+
* necessary), and t_cmax (the last only for HeapTupleSelfUpdated, since we
4173+
* cannot obtain cmax from a combocid generated by another transaction). See
4174+
* comments for struct HeapUpdateFailureData for additional info.
41294175
*
41304176
* See README.tuplock for a thorough explanation of this mechanism.
41314177
*/
@@ -4162,8 +4208,15 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
41624208

41634209
if (result == HeapTupleInvisible)
41644210
{
4165-
UnlockReleaseBuffer(*buffer);
4166-
elog(ERROR, "attempted to lock invisible tuple");
4211+
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
4212+
4213+
/*
4214+
* This is possible, but only when locking a tuple for speculative
4215+
* insertion. We return this value here rather than throwing an error
4216+
* in order to give that case the opportunity to throw a more specific
4217+
* error.
4218+
*/
4219+
return HeapTupleInvisible;
41674220
}
41684221
else if (result == HeapTupleBeingUpdated)
41694222
{
@@ -7492,7 +7545,10 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
74927545
HeapTupleHeaderClearHotUpdated(htup);
74937546
fix_infomask_from_infobits(xlrec->infobits_set,
74947547
&htup->t_infomask, &htup->t_infomask2);
7495-
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
7548+
if (xlrec->flags & XLOG_HEAP_KILLED_SPECULATIVE_TUPLE)
7549+
HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
7550+
else
7551+
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
74967552
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
74977553

74987554
/* Mark the page as a candidate for pruning */

src/backend/access/nbtree/nbtinsert.c

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "miscadmin.h"
2323
#include "storage/lmgr.h"
2424
#include "storage/predicate.h"
25+
#include "storage/procarray.h"
26+
#include "utils/inval.h"
2527
#include "utils/tqual.h"
2628

2729

@@ -51,7 +53,8 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
5153
static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
5254
Relation heapRel, Buffer buf, OffsetNumber offset,
5355
ScanKey itup_scankey,
54-
IndexUniqueCheck checkUnique, bool *is_unique);
56+
IndexUniqueCheck checkUnique, bool *is_unique,
57+
uint32 *speculativeToken);
5558
static void _bt_findinsertloc(Relation rel,
5659
Buffer *bufptr,
5760
OffsetNumber *offsetptr,
@@ -160,16 +163,26 @@ _bt_doinsert(Relation rel, IndexTuple itup,
160163
if (checkUnique != UNIQUE_CHECK_NO)
161164
{
162165
TransactionId xwait;
166+
uint32 speculativeToken;
163167

164168
offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
165169
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
166-
checkUnique, &is_unique);
170+
checkUnique, &is_unique, &speculativeToken);
167171

168172
if (TransactionIdIsValid(xwait))
169173
{
170174
/* Have to wait for the other guy ... */
171175
_bt_relbuf(rel, buf);
172-
XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
176+
/*
177+
* If it's a speculative insertion, wait for it to finish (ie.
178+
* to go ahead with the insertion, or kill the tuple). Otherwise
179+
* wait for the transaction to finish as usual.
180+
*/
181+
if (speculativeToken)
182+
SpeculativeInsertionWait(xwait, speculativeToken);
183+
else
184+
XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
185+
173186
/* start over... */
174187
_bt_freestack(stack);
175188
goto top;
@@ -211,9 +224,12 @@ _bt_doinsert(Relation rel, IndexTuple itup,
211224
* also point to end-of-page, which means that the first tuple to check
212225
* is the first tuple on the next page.
213226
*
214-
* Returns InvalidTransactionId if there is no conflict, else an xact ID
215-
* we must wait for to see if it commits a conflicting tuple. If an actual
216-
* conflict is detected, no return --- just ereport().
227+
* Returns InvalidTransactionId if there is no conflict, else an xact ID we
228+
* must wait for to see if it commits a conflicting tuple. If an actual
229+
* conflict is detected, no return --- just ereport(). If an xact ID is
230+
* returned, and the conflicting tuple still has a speculative insertion in
231+
* progress, *speculativeToken is set to non-zero, and the caller can wait for
232+
* the verdict on the insertion using SpeculativeInsertionWait().
217233
*
218234
* However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
219235
* InvalidTransactionId because we don't want to wait. In this case we
@@ -223,7 +239,8 @@ _bt_doinsert(Relation rel, IndexTuple itup,
223239
static TransactionId
224240
_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
225241
Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
226-
IndexUniqueCheck checkUnique, bool *is_unique)
242+
IndexUniqueCheck checkUnique, bool *is_unique,
243+
uint32 *speculativeToken)
227244
{
228245
TupleDesc itupdesc = RelationGetDescr(rel);
229246
int natts = rel->rd_rel->relnatts;
@@ -340,6 +357,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
340357
if (nbuf != InvalidBuffer)
341358
_bt_relbuf(rel, nbuf);
342359
/* Tell _bt_doinsert to wait... */
360+
*speculativeToken = SnapshotDirty.speculativeToken;
343361
return xwait;
344362
}
345363

0 commit comments

Comments
 (0)
0