8000 Support INSERT ... ON CONFLICT {UPDATE | IGNORE} · petergeoghegan/postgres@2a46bfc · GitHub
[go: up one dir, main page]

Skip to content

Commit 2a46bfc

Browse files
author
Peter Geoghegan
committed
Support INSERT ... ON CONFLICT {UPDATE | IGNORE}
This non-standard INSERT clause allows DML statement authors to specify that in the event of each of any of the tuples being inserted duplicating an existing tuple in terms of a value or set of values constrained by a unique index, an alternative path may be taken. The statement may alternatively IGNORE the tuple being inserted without raising an error, or go to UPDATE the existing tuple whose value is duplicated by a value within one single tuple proposed for insertion. The implementation loops until either an insert or an UPDATE/IGNORE occurs. No existing tuple may be affected more than once per INSERT. This is implemented using a new infrastructure called "speculative insertion". (The approach to "Value locking" presenting here follows design postgres#2, as described on https://wiki.postgresql.org/wiki/Value_locking). Alternatively, we may go to UPDATE, using the EvalPlanQual() mechanism to execute a special auxiliary plan. READ COMMITTED isolation level is permitted to UPDATE a tuple even where no version is visible to the command's MVCC snapshot. Similarly, any query predicate associated with the UPDATE portion of the new statement need only satisfy an already locked, conclusively committed and visible conflict tuple. When the predicate isn't satisfied, the tuple is still locked, which implies that at READ COMMITTED, a tuple may be locked without any version being visible to the command's MVCC snapshot. Users specify a single unique index to take the alternative path on, which is inferred from a set of user-supplied column names (or expressions). This is mandatory for the ON CONFLICT UPDATE variant, which should address concerns about spuriously taking an incorrect alternative ON CONFLICT path (i.e. the wrong unique index is used for arbitration of whether or not to take the alternative path) due to there being more than one would-be unique violation. Previous revisions of the patch didn't mandate this. However, we may still IGNORE based on the first would-be unique violation detected, on the assumption that it doesn't particularly matter where it originated from for that variant (iff the user didn't make a point of indicated his or her intent). The auxiliary ModifyTable plan used by the UPDATE portion of the new statement is not formally a subplan of its parent INSERT ModifyTable plan. Rather, it's an independently planned subquery, whose execution is tightly driven by its parent. Special auxiliary state pertaining to the auxiliary UPDATE is tracked by its parent through all stages of query execution. The implementation imposes some restrictions on child auxiliary UPDATE plans, which make the plans comport with their parent to the extent required during the executor stage. One user-visible consequences of this is that the special auxiliary UPDATE query cannot have subselects within its targetlist or WHERE clause. UPDATEs may not reference any other table, and UPDATE FROM is disallowed. INSERT's RETURNING clause projects tuples successfully inserted (in a later commit, it is made to project tuples inserted and updated, though).
1 parent 78139bb commit 2a46bfc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1901
-122
lines changed

contrib/pg_stat_statements/pg_stat_statements.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2198,6 +2198,9 @@ JumbleQuery(pgssJumbleState *jstate, Query *query)
21982198
JumbleRangeTable(jstate, query->rtable);
21992199
JumbleExpr(jstate, (Node *) query->jointree);
22002200
JumbleExpr(jstate, (Node *) query->targetList);
2201+
JumbleExpr(jstate, (Node *) query->arbiterExpr);
2202+
if (query->onConflict)
2203+
JumbleQuery(jstate, (Query *) query->onConflict);
22012204
JumbleExpr(jstate, (Node *) query->returningList);
22022205
JumbleExpr(jstate, (Node *) query->groupClause);
22032206
JumbleExpr(jstate, query->havingQual);

contrib/postgres_fdw/deparse.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -847,8 +847,8 @@ appendWhereClause(StringInfo buf,
847847
void
848848
deparseInsertSql(StringInfo buf, PlannerInfo *root,
849849
Index rtindex, Relation rel,
850-
List *targetAttrs, List *returningList,
851-
List **retrieved_attrs)
850+
List *targetAttrs, bool ignore,
851+
List *returningList, List **retrieved_attrs)
852852
{
853853
AttrNumber pindex;
854854
bool first;
@@ -892,6 +892,9 @@ deparseInsertSql(StringInfo buf, PlannerInfo *root,
892892
else
893893
appendStringInfoString(buf, " DEFAULT VALUES");
894894

895+
if (ignore)
896+
appendStringInfoString(buf, " ON CONFLICT IGNORE");
897+
895898
deparseReturningList(buf, root, rtindex, rel,
896899
rel->trigdesc && rel->trigdesc->trig_insert_after_row,
897900
returningList, retrieved_attrs);

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,6 +1170,7 @@ postgresPlanForeignModify(PlannerInfo *root,
11701170
List *targetAttrs = NIL;
11711171
List *returningList = NIL;
11721172
List *retrieved_attrs = NIL;
1173+
bool ignore = false;
1173 F987 1174

11741175
initStringInfo(&sql);
11751176

@@ -1204,7 +1205,7 @@ postgresPlanForeignModify(PlannerInfo *root,
12041205
int col;
12051206

12061207
col = -1;
1207-
while ((col = bms_next_member(rte->modifiedCols, col)) >= 0)
1208+
while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
12081209
{
12091210
/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
12101211
AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
@@ -1221,14 +1222,25 @@ postgresPlanForeignModify(PlannerInfo *root,
12211222
if (plan->returningLists)
12221223
returningList = (List *) list_nth(plan->returningLists, subplan_index);
12231224

1225+
if (root->parse->arbiterExpr)
1226+
ereport(ERROR,
1227+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1228+
errmsg("postgres_fdw does not support ON CONFLICT unique index inference")));
1229+
else if (plan->spec == SPEC_INSERT)
1230+
ereport(ERROR,
1231+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1232+
errmsg("postgres_fdw does not support ON CONFLICT UPDATE")));
1233+
else if (plan->spec == SPEC_IGNORE)
1234+
ignore = true;
1235+
12241236
/*
12251237
* Construct the SQL command string.
12261238
*/
12271239
switch (operation)
12281240
{
12291241
case CMD_INSERT:
12301242
deparseInsertSql(&sql, root, resultRelation, rel,
1231-
targetAttrs, returningList,
1243+
targetAttrs, ignore, returningList,
12321244
&retrieved_attrs);
12331245
break;
12341246
case CMD_UPDATE:

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ extern void appendWhereClause(StringInfo buf,
6060
List **params);
6161
extern void deparseInsertSql(StringInfo buf, PlannerInfo *root,
6262
Index rtindex, Relation rel,
63-
List *targetAttrs, List *returningList,
63+
List *targetAttrs, bool ignore, List *returningList,
6464
List **retrieved_attrs);
6565
extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
6666
Index rtindex, Relation rel,

src/backend/access/heap/heapam.c

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,6 +2043,9 @@ FreeBulkInsertState(BulkInsertState bistate)
20432043
* This causes rows to be frozen, which is an MVCC violation and
20442044
* requires explicit options chosen by user.
20452045
*
2046+
* If HEAP_INSERT_SPECULATIVE is specified, the MyProc->specInsert fields
2047+
* are filled.
2048+
*
20462049
* Note that these options will be applied when inserting into the heap's
20472050
* TOAST table, too, if the tuple requires any out-of-line data.
20482051
*
@@ -2191,6 +2194,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
21912194

21922195
END_CRIT_SECTION();
21932196

2197+
/*
2198+
* Let others know that we speculatively inserted this tuple, before
2199+
* releasing the buffer lock.
2200+
*/
2201+
if (options & HEAP_INSERT_SPECULATIVE)
2202+
SetSpeculativeInsertion(relation->rd_node, &heaptup->t_self);
2203+
21942204
UnlockReleaseBuffer(buffer);
21952205
if (vmbuffer != InvalidBuffer)
21962206
ReleaseBuffer(vmbuffer);
@@ -2611,11 +2621,17 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
26112621
* (the last only for HeapTupleSelfUpdated, since we
26122622
* cannot obtain cmax from a combocid generated by another transaction).
26132623
* See comments for struct HeapUpdateFailureData for additional info.
2624+
*
2625+
* If 'killspeculative' is true, we're "super-deleting" a tuple we just
2626+
* inserted in the same command. Instead of the normal visibility checks, we
2627+
* check that the tuple was inserted by the current transaction and given
2628+
* command id. Also, instead of setting its xmax, we set xmin to invalid,
2629+
* making it immediately appear as dead to everyone.
26142630
*/
26152631
HTSU_Result
26162632
heap_delete(Relation relation, ItemPointer tid,
26172633
CommandId cid, Snapshot crosscheck, bool wait,
2618-
HeapUpdateFailureData *hufd)
2634+
HeapUpdateFailureData *hufd, bool killspeculative)
26192635
{
26202636
HTSU_Result result;
26212637
TransactionId xid = GetCurrentTransactionId();
@@ -2673,7 +2689,16 @@ heap_delete(Relation relation, ItemPointer tid,
26732689
tp.t_self = *tid;
26742690

26752691
l1:
2676-
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
2692+
if (!killspeculative)
2693+
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
2694+
else
2695+
{
2696+
if (tp.t_data->t_choice.t_heap.t_xmin != xid ||
2697+
tp.t_data->t_choice.t_heap.t_field3.t_cid != cid)
2698+
elog(ERROR, "attempted to kill a tuple inserted by another transaction or command");
2699+
result = HeapTupleMayBeUpdated;
2700+
}
2701+
26772702

26782703
if (result == HeapTupleInvisible)
26792704
{
@@ -2821,12 +2846,15 @@ heap_delete(Relation relation, ItemPointer tid,
28212846
* using our own TransactionId below, since some other backend could
28222847
* incorporate our XID into a MultiXact immediately afterwards.)
28232848
*/
2824-
MultiXactIdSetOldestMember();
2849+
if (!killspeculative)
2850+
{
2851+
MultiXactIdSetOldestMember();
28252852

2826-
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
2827-
tp.t_data->t_infomask, tp.t_data->t_infomask2,
2828-
xid, LockTupleExclusive, true,
2829-
&new_xmax, &new_infomask, &new_infomask2);
2853+
compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
2854+
tp.t_data->t_infomask, tp.t_data->t_infomask2,
2855+
xid, LockTupleExclusive, true,
2856+
&new_xmax, &new_infomask, &new_infomask2);
2857+
}
28302858

28312859
START_CRIT_SECTION();
28322860

@@ -2853,8 +2881,21 @@ heap_delete(Relation relation, ItemPointer tid,
28532881
tp.t_data->t_infomask |= new_infomask;
28542882
tp.t_data->t_infomask2 |= new_infomask2;
28552883
HeapTupleHeaderClearHotUpdated(tp.t_data);
2856-
HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
2857-
HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
2884+
/*
2885+
* When killing a speculatively-inserted tuple, we set xmin to invalid
2886+
* instead of setting xmax, to make the tuple clearly invisible to
2887+
* everyone. In particular, we want HeapTupleSatisfiesDirty() to regard
2888+
* the tuple as dead, so that another backend inserting a duplicate key
2889+
* value won't unnecessarily wait for our transaction to finish.
2890+
*/
2891+
if (killspeculative)
2892+
HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId);
2893+
else
2894+
{
2895+
HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
2896+
HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
2897+
}
2898+
28582899
/* Make sure there is no forward chain link in t_ctid */
28592900
tp.t_data->t_ctid = tp.t_self;
28602901

@@ -2870,7 +2911,11 @@ heap_delete(Relation relation, ItemPointer tid,
28702911
if (RelationIsAccessibleInLogicalDecoding(relation))
28712912
log_heap_new_cid(relation, &tp);
28722913

2873-
xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
2914+
xlrec.flags = 0;
2915+
if (all_visible_cleared)
2916+
xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
2917+
if (killspeculative)
2918+
xlrec.flags |= XLOG_HEAP_KILLED_SPECULATIVE_TUPLE;
28742919
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
28752920
tp.t_data->t_infomask2);
28762921
xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
@@ -2975,7 +3020,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
29753020
result = heap_delete(relation, tid,
29763021
GetCurrentCommandId(true), InvalidSnapshot,
29773022
true /* wait for commit */ ,
2978-
&hufd);
3023+
&hufd, false);
29793024
switch (result)
29803025
{
29813026
case HeapTupleSelfUpdated:
@@ -4061,15 +4106,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
40614106
*
40624107
* Function result may be:
40634108
* HeapTupleMayBeUpdated: lock was successfully acquired
4109+
* HeapTupleInvisible: lock failed because tuple instantaneously invisible
40644110
* HeapTupleSelfUpdated: lock failed because tuple updated by self
40654111
* HeapTupleUpdated: lock failed because tuple updated by other xact
40664112
* HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip
40674113
*
4068-
* In the failure cases, the routine fills *hufd with the tuple's t_ctid,
4069-
* t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
4070-
* (the last only for HeapTupleSelfUpdated, since we
4071-
* cannot obtain cmax from a combocid generated by another transaction).
4072-
* See comments for struct HeapUpdateFailureData for additional info.
4114+
* In the failure cases other than HeapTupleInvisible, the routine fills *hufd
4115+
* with the tuple's t_ctid, t_xmax (resolving a possible MultiXact, if
4116+
* necessary), and t_cmax (the last only for HeapTupleSelfUpdated, since we
4117+
* cannot obtain cmax from a combocid generated by another transaction). See
4118+
* comments for struct HeapUpdateFailureData for additional info.
40734119
*
40744120
* See README.tuplock for a thorough explanation of this mechanism.
40754121
*/
@@ -4106,8 +4152,15 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
41064152

41074153
if (result == HeapTupleInvisible)
41084154
{
4109-
UnlockReleaseBuffer(*buffer);
4110-
elog(ERROR, "attempted to lock invisible tuple");
4155+
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
4156+
4157+
/*
4158+
* This is possible, but only when locking a tuple for speculative
4159+
* insertion. We return this value here rather than throwing an error
4160+
* in order to give that case the opportunity to throw a more specific
4161+
* error.
4162+
*/
4163+
return HeapTupleInvisible;
41114164
}
41124165
else if (result == HeapTupleBeingUpdated)
41134166
{
@@ -7263,7 +7316,10 @@ heap_xlog_delete(XLogReaderState *record)
72637316
HeapTupleHeaderClearHotUpdated(htup);
72647317
fix_infomask_from_infobits(xlrec->infobits_set,
72657318
&htup->t_infomask, &htup->t_infomask2);
7266-
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
7319+
if (xlrec->flags & XLOG_HEAP_KILLED_SPECULATIVE_TUPLE)
7320+
HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
7321+
else
7322+
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
72677323
HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
72687324

72697325
/* Mark the page as a candidate for pruning */

src/backend/access/nbtree/nbtinsert.c

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "miscadmin.h"
2323
#include "storage/lmgr.h"
2424
#include "storage/predicate.h"
25+
#include "storage/procarray.h"
26+
#include "utils/inval.h"
2527
#include "utils/tqual.h"
2628

2729

@@ -51,7 +53,8 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
5153
static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
5254
Relation heapRel, Buffer buf, OffsetNumber offset,
5355
ScanKey itup_scankey,
54-
IndexUniqueCheck checkUnique, bool *is_unique);
56+
IndexUniqueCheck checkUnique, bool *is_unique,
57+
uint32 *speculativeToken);
5558
static void _bt_findinsertloc(Relation rel,
5659
Buffer *bufptr,
5760
OffsetNumber *offsetptr,
@@ -160,16 +163,26 @@ _bt_doinsert(Relation rel, IndexTuple itup,
160163
if (checkUnique != UNIQUE_CHECK_NO)
161164
{
162165
TransactionId xwait;
166+
uint32 speculativeToken;
163167

164168
offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
165169
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
166-
checkUnique, &is_unique);
170+
checkUnique, &is_unique, &speculativeToken);
167171

168172
if (TransactionIdIsValid(xwait))
169173
{
170174
/* Have to wait for the other guy ... */
171175
_bt_relbuf(rel, buf);
172-
XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
176+
/*
177+
* If it's a speculative insertion, wait for it to finish (ie.
178+
* to go ahead with the insertion, or kill the tuple). Otherwise
179+
* wait for the transaction to finish as usual.
180+
*/
181+
if (speculativeToken)
182+
SpeculativeInsertionWait(xwait, speculativeToken);
183+
else
184+
XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
185+
173186
/* start over... */
174187
_bt_freestack(stack);
175188
goto top;
@@ -211,9 +224,12 @@ _bt_doinsert(Relation rel, IndexTuple itup,
211224
* also point to end-of-page, which means that the first tuple to check
212225
* is the first tuple on the next page.
213226
*
214-
* Returns InvalidTransactionId if there is no conflict, else an xact ID
215-
* we must wait for to see if it commits a conflicting tuple. If an actual
216-
* conflict is detected, no return --- just ereport().
227+
* Returns InvalidTransactionId if there is no conflict, else an xact ID we
228+
* must wait for to see if it commits a conflicting tuple. If an actual
229+
* conflict is detected, no return --- just ereport(). If an xact ID is
230+
* returned, and the conflicting tuple still has a speculative insertion in
231+
* progress, *speculativeToken is set to non-zero, and the caller can wait for
232+
* the verdict on the insertion using SpeculativeInsertionWait().
217233
*
218234
* However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
219235
* InvalidTransactionId because we don't want to wait. In this case we
@@ -223,7 +239,8 @@ _bt_doinsert(Relation rel, IndexTuple itup,
223239
static TransactionId
224240
_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
225241
Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
226-
IndexUniqueCheck checkUnique, bool *is_unique)
242+
IndexUniqueCheck checkUnique, bool *is_unique,
243+
uint32 *speculativeToken)
227244
{
228245
TupleDesc itupdesc = RelationGetDescr(rel);
229246
int natts = rel->rd_rel->relnatts;
@@ -340,6 +357,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
340357
if (nbuf != InvalidBuffer)
341358
_bt_relbuf(rel, nbuf);
342359
/* Tell _bt_doinsert to wait... */
360+
*speculativeToken = SnapshotDirty.speculativeToken;
343361
return xwait;
344362
}
345363

0 commit comments

Comments
 (0)
0