8000 Prevent CREATE TABLE LIKE/INHERITS from (mis) copying whole-row Vars. · sureandrew/postgres@188a0a0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 188a0a0

Browse files
committed
Prevent CREATE TABLE LIKE/INHERITS from (mis) copying whole-row Vars.
If a CHECK constraint or index definition contained a whole-row Var (that is, "table.*"), an attempt to copy that definition via CREATE TABLE LIKE or table inheritance produced incorrect results: the copied Var still claimed to have the rowtype of the source table, rather than the created table. For the LIKE case, it seems reasonable to just throw error for this situation, since the point of LIKE is that the new table is not permanently coupled to the old, so there's no reason to assume its rowtype will stay compatible. In the inheritance case, we should ideally allow such constraints, but doing so will require nontrivial refactoring of CREATE TABLE processing (because we'd need to know the OID of the new table's rowtype before we adjust inherited CHECK constraints). In view of the lack of previous complaints, that doesn't seem worth the risk in a back-patched bug fix, so just make it throw error for the inheritance case as well. Along the way, replace change_varattnos_of_a_node() with a more robust function map_variable_attnos(), which is capable of being extended to handle insertion of ConvertRowtypeExpr whenever we get around to fixing the inheritance case nicely, and in the meantime it returns a failure indication to the caller so that a helpful message with some context can be thrown. Also, this code will do the right thing with subselects (if we ever allow them in CHECK or indexes), and it range-checks varattnos before using them to index into the map array. Per report from Sergey Konoplev. Back-patch to all supported branches.
1 parent ef0f9dd commit 188a0a0

File tree

5 files changed

+212
-127
lines changed

5 files changed

+212
-127
lines changed

src/backend/commands/tablecmds.c

Lines changed: 24 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include "parser/parser.h"
5858
#include "rewrite/rewriteDefine.h"
5959
#include "rewrite/rewriteHandler.h"
60+
#include "rewrite/rewriteManip.h"
6061
#include "storage/smgr.h"
6162
#include "utils/acl.h"
6263
#include "utils/builtins.h"
@@ -170,7 +171,6 @@ static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel
170171
static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
171172
static void add_nonduplicate_constraint(Constraint *cdef,
172173
ConstrCheck *check, int *ncheck);
173-
static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
174174
static void StoreCatalogInheritance(Oid relationId, List *supers);
175175
static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
176176
int16 seqNumber, Relation inhRelation);
@@ -868,7 +868,7 @@ MergeAttributes(List *schema, List *supers, bool istemp,
868868
* parents after the first one, nor if we have dropped columns.)
869869
*/
870870
newattno = (AttrNumber *)
871-
palloc(tupleDesc->natts * sizeof(AttrNumber));
871+
palloc0(tupleDesc->natts * sizeof(AttrNumber));
872872

873873
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
874874
parent_attno++)
@@ -882,15 +882,7 @@ MergeAttributes(List *schema, List *supers, bool istemp,
882882
* Ignore dropped columns in the parent.
883883
*/
884884
if (attribute->attisdropped)
885-
{
886-
/*
887-
* change_varattnos_of_a_node asserts that this is greater
888-
* than zero, so if anything tries to use it, we should find
889-
* out.
890-
*/
891-
newattno[parent_attno - 1] = 0;
892-
continue;
893-
}
885+
continue; /* leave newattno entry as zero */
894886

895887
/*
896888
* Does it conflict with some previously inherited column?
@@ -1000,13 +992,31 @@ MergeAttributes(List *schema, List *supers, bool istemp,
1000992
{
1001993
Constraint *cdef = makeNode(Constraint);
1002994
Node *expr;
995+
bool found_whole_row;
996+
997+
/* Adjust Vars to match new table's column numbering */
998+
expr = map_variable_attnos(stringToNode(check[i].ccbin),
999+
1, 0,
1000+
newattno, tupleDesc->natts,
1001+
&found_whole_row);
1002+
1003+
/*
1004+
* For the moment we have to reject whole-row variables.
1005+
* We could convert them, if we knew the new table's rowtype
1006+
* OID, but that hasn't been assigned yet.
1007+
*/
1008+
if (found_whole_row)
1009+
ereport(ERROR,
1010+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1011+
errmsg("cannot convert whole-row table reference"),
1012+
errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".",
1013+
check[i].ccname,
1014+
RelationGetRelationName(relation))));
10031015

10041016
cdef->contype = CONSTR_CHECK;
10051017
cdef->name = pstrdup(check[i].ccname);
10061018
cdef->raw_expr = NULL;
1007-
/* adjust varattnos of ccbin here */
1008-
expr = stringToNode(check[i].ccbin);
1009-
change_varattnos_of_a_node(expr, newattno);
1019+
10101020
cdef->cooked_expr = nodeToString(expr);
10111021
constraints = lappend(constraints, cdef);
10121022
}
@@ -1160,101 +1170,6 @@ add_nonduplicate_constraint(Constraint *cdef, ConstrCheck *check, int *ncheck)
11601170
}
11611171

11621172

1163-
/*
1164-
* Replace varattno values in 10000 an expression tree according to the given
1165-
* map array, that is, varattno N is replaced by newattno[N-1]. It is
1166-
* caller's responsibility to ensure that the array is long enough to
1167-
* define values for all user varattnos present in the tree. System column
1168-
* attnos remain unchanged.
1169-
*
1170-
* Note that the passed node tree is modified in-place!
1171-
*/
1172-
void
1173-
change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
1174-
{
1175-
/* no setup needed, so away we go */
1176-
(void) change_varattnos_walker(node, newattno);
1177-
}
1178-
1179-
static bool
1180-
change_varattnos_walker(Node *node, const AttrNumber *newattno)
1181-
{
1182-
if (node == NULL)
1183-
return false;
1184-
if (IsA(node, Var))
1185-
{
1186-
Var *var = (Var *) node;
1187-
1188-
if (var->varlevelsup == 0 && var->varno == 1 &&
1189-
var->varattno > 0)
1190-
{
1191-
/*
1192-
* ??? the following may be a problem when the node is multiply
1193-
* referenced though stringToNode() doesn't create such a node
1194-
* currently.
1195-
*/
1196-
Assert(newattno[var->varattno - 1] > 0);
1197-
var->varattno = var->varoattno = newattno[var->varattno - 1];
1198-
}
1199-
return false;
1200-
}
1201-
return expression_tree_walker(node, change_varattnos_walker,
1202-
(void *) newattno);
1203-
}
1204-
1205-
/*
1206-
* Generate a map for change_varattnos_of_a_node from old and new TupleDesc's,
1207-
* matching according to column name.
1208-
*/
1209-
AttrNumber *
1210-
varattnos_map(TupleDesc old, TupleDesc new)
1211-
{
1212-
AttrNumber *attmap;
1213-
int i,
1214-
j;
1215-
1216-
attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts);
1217-
for (i = 1; i <= old->natts; i++)
1218-
{
1219-
if (old->attrs[i - 1]->attisdropped)
1220-
continue; /* leave the entry as zero */
1221-
1222-
for (j = 1; j <= new->natts; j++)
1223-
{
1224-
if (strcmp(NameStr(old->attrs[i - 1]->attname),
1225-
NameStr(new->attrs[j - 1]->attname)) == 0)
1226-
{
1227-
attmap[i - 1] = j;
1228-
break;
1229-
}
1230-
}
1231-
}
1232-
return attmap;
1233-
}
1234-
1235-
/*
1236-
* Generate a map for change_varattnos_of_a_node from a TupleDesc and a list
1237-
* of ColumnDefs
1238-
*/
1239-
AttrNumber *
1240-
varattnos_map_schema(TupleDesc old, List *schema)
1241-
{
1242-
AttrNumber *attmap;
1243-
int i;
1244-
1245-
attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts);
1246-
for (i = 1; i <= old->natts; i++)
1247-
{
1248-
if (old->attrs[i - 1]->attisdropped)
1249-
continue; /* leave the entry as zero */
1250-
1251-
attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname),
1252-
schema);
1253-
}
1254-
return attmap;
1255-
}
1256-
1257-
12581173
/*
12591174
* StoreCatalogInheritance
12601175
* Updates the system catalogs with proper inheritance information.

src/backend/parser/parse_utilcmd.c

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ static void transformTableConstraint(ParseState *pstate,
101101
static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
102102
InhRelation *inhrelation);
103103
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
104-
Relation parent_index, AttrNumber *attmap);
104+
Relation source_idx,
105+
const AttrNumber *attmap, int attmap_length);
105106
static List *get_opclass(Oid opclass, Oid actual_datatype);
106107
static void transformIndexConstraints(ParseState *pstate,
107108
CreateStmtContext *cxt);
@@ -525,6 +526,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
525526
Relation relation;
526527
TupleDesc tupleDesc;
527528
TupleConstr *constr;
529+
AttrNumber *attmap;
528530
AclResult aclresult;
529531
bool including_defaults = false;
530532
bool including_constraints = false;
@@ -581,6 +583,13 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
581583
}
582584
}
583585

586+
/*
587+
* Initialize column number map for map_variable_attnos(). We need this
588+
* since dropped columns in the source table aren't copied, so the new
589+
* table can have different column numbers.
590+
*/
591+
attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts);
592+
584593
/*
585594
* Insert the copied attributes into the cxt for the new table definition.
586595
*/
@@ -592,7 +601,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
592601
ColumnDef *def;
593602

594603
/*
595-
* Ignore dropped columns in the parent.
604+
* Ignore dropped columns in the parent. attmap entry is left zero.
596605
*/
597606
if (attribute->attisdropped)
598607
continue;
@@ -619,6 +628,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
619628
*/
620629
cxt->columns = lappend(cxt->columns, def);
621630

631+
attmap[parent_attno - 1] = list_length(cxt->columns);
632+
622633
/*
623634
* Copy default, if present and the default has been requested
624635
*/
@@ -652,21 +663,38 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
652663

653664
/*
654665
* Copy CHECK constraints if requested, being careful to adjust attribute
655-
* numbers
666+
* numbers so they match the child.
656667
*/
657668
if (including_constraints && tupleDesc->constr)
658669
{
659-
AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
660670
int ccnum;
661671

662672
for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
663673
{
664674
char *ccname = tupleDesc->constr->check[ccnum].ccname;
665675
char *ccbin = tupleDesc->constr->check[ccnum].ccbin;
666-
Node *ccbin_node = stringToNode(ccbin);
667676
Constraint *n = makeNode(Constraint);
677+
Node *ccbin_node;
678+
bool found_whole_row;
668679

669-
change_varattnos_of_a_node(ccbin_node, attmap);
680+
ccbin_node = map_variable_attnos(stringToNode(ccbin),
681+
1, 0,
682+
attmap, tupleDesc->natts,
683+
&found_whole_row);
684+
685+
/*
686+
* We reject whole-row variables because the whole point of LIKE
687+
* is that the new table's rowtype might later diverge from the
688+
* parent's. So, while translation might be possible right now,
689+
* it wouldn't be possible to guarantee it would work in future.
690+
*/
691+
if (found_whole_row)
692+
ereport(ERROR,
693+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
694+
errmsg("cannot convert whole-row table reference"),
695+
errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".",
696+
ccname,
697+
RelationGetRelationName(relation))));
670698

671699
n->contype = CONSTR_CHECK;
672700
n->name = pstrdup(ccname);
@@ -682,7 +710,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
682710
*/
683711
if (including_indexes && relation->rd_rel->relhasindex)
684712
{
685-
AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
686713
List *parent_indexes;
687714
ListCell *l;
688715

@@ -697,7 +724,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
697724
parent_index = index_open(parent_index_oid, AccessShareLock);
698725

699726
/* Build CREATE INDEX statement to recreate the parent_index */
700-
index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap);
727+
index_stmt = generateClonedIndexStmt(cxt, parent_index,
728+
attmap, tupleDesc->natts);
701729

702730
/* Save it in the inh_indexes list for the time being */
703731
cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt);
@@ -720,7 +748,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
720748
*/
721749
static IndexStmt *
722< 341A /code>750
generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
723-
AttrNumber *attmap)
751+
const AttrNumber *attmap, int attmap_length)
724752
{
725753
Oid source_relid = RelationGetRelid(source_idx);
726754
HeapTuple ht_idxrel;
@@ -833,14 +861,26 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
833861
{
834862
/* Expressional index */
835863
Node *indexkey;
864+
bool found_whole_row;
836865

837866
if (indexpr_item == NULL)
838867
elog(ERROR, "too few entries in indexprs list");
839868
indexkey = (Node *) lfirst(indexpr_item);
840869
indexpr_item = lnext(indexpr_item);
841870

842-
/* OK to modify indexkey since we are working on a private copy */
843-
change_varattnos_of_a_node(indexkey, attmap);
871+
/* Adjust Vars to match new table's column numbering */
872+
indexkey = map_variable_attnos(indexkey,
873+
1, 0,
874+
attmap, attmap_length,
875+
&found_whole_row);
876+
877+
/* As in transformTableLikeClause, reject whole-row variables */
878+
if (found_whole_row)
879+
ereport(ERROR,
880+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
881+
errmsg("cannot convert whole-row table reference"),
882+
errdetail("Index \"%s\" contains a whole-row table reference.",
883+
RelationGetRelationName(source_idx))));
844884

845885
iparam->name = NULL;
846886
iparam->expr = indexkey;
@@ -891,12 +931,28 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
891931
if (!isnull)
892932
{
893933
char *pred_str;
934+
Node *pred_tree;
935+
bool found_whole_row;
894936

895937
/* Convert text string to node tree */
896938
pred_str = DatumGetCString(DirectFunctionCall1(textout, datum));
897-
index->whereClause = (Node *) stringToNode(pred_str);
898-
/* Adjust attribute numbers */
899-
change_varattnos_of_a_node(index->whereClause, attmap);
939+
pred_tree = (Node *) stringToNode(pred_str);
940+
941+
/* Adjust Vars to match new table's column numbering */
942+
pred_tree = map_variable_attnos(pred_tree,
943+
1, 0,
944+
attmap, attmap_length,
945+
&found_whole_row);
946+
947+
/* As in transformTableLikeClause, reject whole-row variables */
948+
if (found_whole_row)
949+
ereport(ERROR,
950+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
951+
errmsg("cannot convert whole-row table reference"),
952+
errdetail("Index \"%s\" contains a whole-row table reference.",
953+
RelationGetRelationName(source_idx))));
954+
955+
index->whereClause = pred_tree;
900956
}
901957

902958
/* Clean up */

0 commit comments

Comments
 (0)
0