8000 Publication where clause by ololobus · Pull Request #10 · ololobus/postgres · GitHub
[go: up one dir, main page]

Skip to content

Publication where clause #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
9 changes: 6 additions & 3 deletions doc/src/sgml/ref/alter_publication.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ PostgreSQL documentation

<refsynopsisdiv>
<synopsis>
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
ALTER PUBLICATION & 8000 lt;replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
Expand Down Expand Up @@ -91,7 +91,10 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
table name, only that table is affected. If <literal>ONLY</literal> is not
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
name to explicitly indicate that descendant tables are included. If the
optional <literal>WHERE</literal> clause is specified, rows that do not
satisfy the <replaceable class="parameter">expression</replaceable> will
not be published. Note that parentheses are required around the expression.
</para>
</listitem>
</varlistentry>
Expand Down
14 changes: 12 additions & 2 deletions doc/src/sgml/ref/create_publication.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
[ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
[ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
| FOR ALL TABLES ]
[ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]

Expand Down Expand Up @@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
that table is added to the publication. If <literal>ONLY</literal> is not
specified, the table and all its descendant tables (if any) are added.
Optionally, <literal>*</literal> can be specified after the table name to
explicitly indicate that descendant tables are included.
explicitly indicate that descendant tables are included. If the optional
<literal>WHERE</literal> clause is specified, rows that do not satisfy
the <replaceable class="parameter">expression</replaceable> will not be
published. Note that parentheses are required around the expression.
</para>

<para>
Expand Down Expand Up @@ -183,6 +186,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
</programlisting>
</para>

<para>
Create a publication that publishes all changes from active departments:
<programlisting>
CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
</programlisting>
</para>

<para>
Create a publication that publishes all changes in all tables:
<programlisting>
Expand Down
46 changes: 41 additions & 5 deletions src/backend/catalog/pg_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"

#include "parser/parse_clause.h"
#include "parser/parse_collate.h"
#include "parser/parse_relation.h"

#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
Expand Down Expand Up @@ -149,18 +153,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
* Insert new publication / relation mapping.
*/
ObjectAddress
publication_add_relation(Oid pubid, Relation targetrel,
publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
bool if_not_exists)
{
Relation rel;
HeapTuple tup;
Datum values[Natts_pg_publication_rel];
bool nulls[Natts_pg_publication_rel];
Oid relid = RelationGetRelid(targetrel);
Oid relid = RelationGetRelid(targetrel->relation);
Oid prrelid;
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
referenced;
ParseState *pstate;
RangeTblEntry *rte;
Node *whereclause;

rel = table_open(PublicationRelRelationId, RowExclusiveLock);

Expand All @@ -180,10 +187,27 @@ publication_add_relation(Oid pubid, Relation targetrel,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("relation \"%s\" is already member of publication \"%s\"",
RelationGetRelationName(targetrel), pub->name)));
RelationGetRelationName(targetrel->relation), pub->name)));
}

check_publication_add_relation(targetrel);
check_publication_add_relation(targetrel->relation);

/* Set up a pstate to parse with */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = nodeToString(targetrel->whereClause);

rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
AccessShareLock,
NULL, false, false);
addRTEtoQuery(pstate, rte, false, true, true);

whereclause = transformWhereClause(pstate,
copyObject(targetrel->whereClause),
EXPR_KIND_PUBLICATION_WHERE,
"PUBLICATION");

/* Fix up collation information */
assign_expr_collations(pstate, whereclause);

/* Form a tuple. */
memset(values, 0, sizeof(values));
Expand All @@ -197,6 +221,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
values[Anum_pg_publication_rel_prrelid - 1] =
ObjectIdGetDatum(relid);

/* Add row filter, if available */
if (whereclause)
values[Anum_pg_publication_rel_prrowfilter - 1] = CStringGetTextDatum(nodeToString(whereclause));
else
nulls[Anum_pg_publication_rel_prrowfilter - 1] = true;

tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);

/* Insert tuple into catalog. */
Expand All @@ -213,11 +243,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
ObjectAddressSet(referenced, RelationRelationId, relid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);

/* Add dependency on the objects mentioned in the row filter expression */
if (whereclause)
recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);

free_parsestate(pstate);

/* Close the table. */
table_close(rel, RowExclusiveLock);

/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcache(targetrel);
CacheInvalidateRelcache(targetrel->relation);

return myself;
}
Expand Down
85 changes: 66 additions & 19 deletions src/backend/commands/publicationcmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
Oid pubid = pubform->oid;
ListCell *xpto;

/* Check that user is allowed to manipulate the publication tables. */
if (pubform->puballtables)
Expand All @@ -352,6 +353,37 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,

Assert(list_length(stmt->tables) > 0);

foreach(xpto, stmt->tables)
{
PublicationTable *t = lfirst(xpto);

if (t->whereClause == NULL)
elog(DEBUG3, "publication \"%s\" has no WHERE clause", NameStr(pubform->pubname));
else
elog(DEBUG3, "publication \"%s\" has WHERE clause", NameStr(pubform->pubname));
}

/*
* ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause. Use
* publication_table_list node (that accepts a WHERE clause) but forbid the
* WHERE clause in it. The use of relation_expr_list node just for the
* DROP TABLE part does not worth the trouble.
*/
if (stmt->tableAction == DEFELEM_DROP)
{
ListCell *lc;

foreach(lc, stmt->tables)
{
PublicationTable *t = lfirst(lc);
if (t->whereClause)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
NameStr(pubform->pubname))));
}
}

rels = OpenTableList(stmt->tables);

if (stmt->tableAction == DEFELEM_ADD)
Expand All @@ -373,9 +405,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,

foreach(newlc, rels)
{
Relation newrel = (Relation) lfirst(newlc);
PublicationRelationQual *newrel = (PublicationRelationQual *) lfirst(newlc);

if (RelationGetRelid(newrel) == oldrelid)
if (RelationGetRelid(newrel->relation) == oldrelid)
{
found = true;
break;
Expand All @@ -384,7 +416,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,

if (!found)
{
Relation oldrel = table_open(oldrelid,
PublicationRelationQual *oldrel = palloc(sizeof(PublicationRelationQual));
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);

delrels = lappend(delrels, oldrel);
Expand Down Expand Up @@ -510,16 +543,22 @@ OpenTableList(List *tables)
List *relids = NIL;
List *rels = NIL;
ListCell *lc;
PublicationRelationQual *relqual;

/*
* Open, share-lock, and check all the explicitly-specified relations
*/
foreach(lc, tables)
{
RangeVar *rv = castNode(RangeVar, lfirst(lc));
bool recurse = rv->inh;
Relation rel;
Oid myrelid;
// RangeVar *rv = castNode(RangeVar, lfirst(lc));
// bool recurse = rv->inh;
// Relation rel;
// Oid myrelid;
PublicationTable *t = lfirst(lc);
RangeVar *rv = t->relation;
Relation rel;
bool recurse = rv->inh;
Oid myrelid;

/* Allow query cancel in case this takes a long time */
CHECK_FOR_INTERRUPTS();
Expand All @@ -539,8 +578,10 @@ OpenTableList(List *tables)
table_close(rel, ShareUpdateExclusiveLock);
continue;
}

rels = lappend(rels, rel);
relqual = palloc(sizeof(PublicationRelationQual));
relqual->relation = rel;
relqual->whereClause = t->whereClause;
rels = lappend(rels, relqual);
relids = lappend_oid(relids, myrelid);

/* Add children of this rel, if requested */
Expand Down Expand Up @@ -568,7 +609,11 @@ OpenTableList(List *tables)

/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
rels = lappend(rels, rel);
relqual = palloc(sizeof(PublicationRelationQual));
relqual->relation = rel;
/* child inherits WHERE clause from parent */
relqual->whereClause = t->whereClause;
rels = lappend(rels, relqual);
relids = lappend_oid(relids, childrelid);
}
}
Expand All @@ -589,10 +634,12 @@ CloseTableList(List *rels)

foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);

table_close(rel, NoLock);
table_close(rel->relation, NoLock);
}

list_free_deep(rels);
}

/*
Expand All @@ -608,13 +655,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,

foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
ObjectAddress obj;

/* Must be owner of the table or superuser. */
if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
RelationGetRelationName(rel));
if (!pg_class_ownercheck(RelationGetRelid(rel->relation 964D ), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
RelationGetRelationName(rel->relation));

obj = publication_add_relation(pubid, rel, if_not_exists);
if (stmt)
Expand All @@ -640,8 +687,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)

foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
Oid relid = RelationGetRelid(rel);
PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
Oid relid = RelationGetRelid(rel->relation);

prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
ObjectIdGetDatum(relid),
Expand All @@ -654,7 +701,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("relation \"%s\" is not part of the publication",
RelationGetRelationName(rel))));
RelationGetRelationName(rel->relation))));
}

ObjectAddressSet(obj, PublicationRelRelationId, prid);
Expand Down
Loading
0