8000 Publication where clause by ololobus · Pull Request #10 · ololobus/postgres · GitHub
[go: up one dir, main page]

Skip to content

Publication where clause #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
Row filtering for logical replication
When you define or modify a publication you optionally can filter rows
to be published using a WHERE condition. This condition is any
expression that evaluates to boolean. Only those rows that
satisfy the WHERE condition will be sent to subscribers.
  • Loading branch information
Euler Taveira authored and ololobus committed Aug 25, 2019
commit 375fb2c11601e5542346bbd909ee94f8b4b8d450
9 changes: 6 additions & 3 deletions doc/src/sgml/ref/alter_publication.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ PostgreSQL documentation

<refsynopsisdiv>
<synopsis>
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_USER | SESSION_USER }
Expand Down Expand Up @@ -91,7 +91,10 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
table name, only that table is affected. If <literal>ONLY</literal> is not
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
name to explicitly indicate that descendant tables are included. If the
optional <literal>WHERE</literal> clause is specified, rows that do not
satisfy the <replaceable class="parameter">expression</replaceable> will
not be published. Note that parentheses are required around the expression.
</para>
</listitem>
</varlistentry>
Expand Down
14 changes: 12 additions & 2 deletions doc/src/sgml/ref/create_publication.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
[ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...]
[ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ...]
| FOR ALL TABLES ]
[ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]

Expand Down Expand Up @@ -68,7 +68,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
that table is added to the publication. If <literal>ONLY</literal> is not
specified, the table and all its descendant tables (if any) are added.
Optionally, <literal>*</literal> can be specified after the table name to
explicitly indicate that descendant tables are included.
explicitly indicate that descendant tables are included. If the optional
<literal>WHERE</literal> clause is specified, rows that do not satisfy
the <replaceable class="parameter">expression</replaceable> will not be
published. Note that parentheses are required around the expression.
</para>

<para>
Expand Down Expand Up @@ -183,6 +186,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
</programlisting>
</para>

<para>
Create a publication that publishes all changes from active departments:
<programlisting>
CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
</programlisting>
</para>

<para>
Create a publication that publishes all changes in all tables:
<programlisting>
Expand Down
46 changes: 41 additions & 5 deletions src/backend/catalog/pg_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"

#include "parser/parse_clause.h"
#include "parser/parse_collate.h"
#include "parser/parse_relation.h"

#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
Expand Down Expand Up @@ -149,18 +153,21 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
* Insert new publication / relation mapping.
*/
ObjectAddress
publication_add_relation(Oid pubid, Relation targetrel,
publication_add_relation(Oid pubid, PublicationRelationQual *targetrel,
bool if_not_exists)
{
Relation rel;
HeapTuple tup;
Datum values[Natts_pg_publication_rel];
bool nulls[Natts_pg_publication_rel];
Oid relid = RelationGetRelid(targetrel);
Oid relid = RelationGetRelid(targetrel->relation);
Oid prrelid;
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
referenced;
ParseState *pstate;
RangeTblEntry *rte;
Node *whereclause;

rel = table_open(PublicationRelRelationId, RowExclusiveLock);

Expand All @@ -180,10 +187,27 @@ publication_add_relation(Oid pubid, Relation targetrel,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("relation \"%s\" is already member of publication \"%s\"",
RelationGetRelationName(targetrel), pub->name)));
RelationGetRelationName(targetrel->relation), pub->name)));
}

check_publication_add_relation(targetrel);
check_publication_add_relation(targetrel->relation);

/* Set up a pstate to parse with */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = nodeToString(targetrel->whereClause);

rte = addRangeTableEntryForRelation(pstate, targetrel->relation,
AccessShareLock,
NULL, false, false);
addRTEtoQuery(pstate, rte, false, true, true);

whereclause = transformWhereClause(pstate,
copyObject(targetrel->whereClause),
EXPR_KIND_PUBLICATION_WHERE,
"PUBLICATION");

/* Fix up collation information */
assign_expr_collations(pstate, whereclause);

/* Form a tuple. */
memset(values, 0, sizeof(values));
Expand All @@ -197,6 +221,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
values[Anum_pg_publication_rel_prrelid - 1] =
ObjectIdGetDatum(relid);

/* Add row filter, if available */
if (whereclause)
values[Anum_pg_publication_rel_prrowfilter - 1] = CStringGetTextDatum(nodeToString(whereclause));
else
nulls[Anum_pg_publication_rel_prrowfilter - 1] = true;

tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);

/* Insert tuple into catalog. */
Expand All @@ -213,11 +243,17 @@ publication_add_relation(Oid pubid, Relation targetrel,
ObjectAddressSet(referenced, RelationRelationId, relid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);

/* Add dependency on the objects mentioned in the row filter expression */
if (whereclause)
recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);

free_parsestate(pstate);

/* Close the table. */
table_close(rel, RowExclusiveLock);

/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcache(targetrel);
CacheInvalidateRelcache(targetrel->relation);

return myself;
}
Expand Down
74 changes: 55 additions & 19 deletions src/backend/commands/publicationcmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,27 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,

Assert(list_length(stmt->tables) > 0);

/*
* ALTER PUBLICATION ... DROP TABLE cannot contain a WHERE clause. Use
* publication_table_list node (that accepts a WHERE clause) but forbid the
* WHERE clause in it. The use of relation_expr_list node just for the
* DROP TABLE part does not worth the trouble.
*/
if (stmt->tableAction == DEFELEM_DROP)
{
ListCell *lc;

foreach(lc, stmt->tables)
{
PublicationTable *t = lfirst(lc);
if (t->whereClause)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a WHERE clause for removing table from publication \"%s\"",
NameStr(pubform->pubname))));
}
}

rels = OpenTableList(stmt->tables);

if (stmt->tableAction == DEFELEM_ADD)
Expand All @@ -373,9 +394,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,

foreach(newlc, rels)
{
Relation newrel = (Relation) lfirst(newlc);
PublicationRelationQual *newrel = (PublicationRelationQual *) lfirst(newlc);

if (RelationGetRelid(newrel) == oldrelid)
if (RelationGetRelid(newrel->relation) == oldrelid)
{
found = true;
break;
Expand All @@ -384,7 +405,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,

if (!found)
{
Relation oldrel = table_open(oldrelid,
PublicationRelationQual *oldrel = palloc(sizeof(PublicationRelationQual));
oldrel->relation = table_open(oldrelid,
ShareUpdateExclusiveLock);

delrels = lappend(delrels, oldrel);
Expand Down Expand Up @@ -510,16 +532,22 @@ OpenTableList(List *tables)
List *relids = NIL;
List *rels = NIL;
ListCell *lc;
PublicationRelationQual *relqual;

/*
* Open, share-lock, and check all the explicitly-specified relations
*/
foreach(lc, tables)
{
RangeVar *rv = castNode(RangeVar, lfirst(lc));
bool recurse = rv->inh;
Relation rel;
Oid myrelid;
// RangeVar *rv = castNode(RangeVar, lfirst(lc));
// bool recurse = rv->inh;
// Relation rel;
// Oid myrelid;
PublicationTable *t = lfirst(lc);
RangeVar *rv = t->relation;
Relation rel;
bool recurse = rv->inh;
Oid myrelid;

/* Allow query cancel in case this takes a long time */
CHECK_FOR_INTERRUPTS();
Expand All @@ -539,8 +567,10 @@ OpenTableList(List *tables)
table_close(rel, ShareUpdateExclusiveLock);
continue;
}

rels = lappend(rels, rel);
relqual = palloc(sizeof(PublicationRelationQual));
relqual->relation = rel;
relqual->whereClause = t->whereClause;
rels = lappend(rels, relqual);
relids = lappend_oid(relids, myrelid);

/* Add children of this rel, if requested */
Expand Down Expand Up @@ -568,7 +598,11 @@ OpenTableList(List *tables)

/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
rels = lappend(rels, rel);
relqual = palloc(sizeof(PublicationRelationQual));
relqual->relation = rel;
/* child inherits WHERE clause from parent */
relqual->whereClause = t->whereClause;
rels = lappend(rels, relqual);
relids = lappend_oid(relids, childrelid);
}
}
Expand All @@ -589,10 +623,12 @@ CloseTableList(List *rels)

foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);

table_close(rel, NoLock);
table_close(rel->relation, NoLock);
}

list_free_deep(rels);
}

/*
Expand All @@ -608,13 +644,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,

foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
ObjectAddress obj;

/* Must be owner of the table or superuser. */
if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
RelationGetRelationName(rel));
if (!pg_class_ownercheck(RelationGetRelid(rel->relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->relation->rd_rel->relkind),
RelationGetRelationName(rel->relation));

obj = publication_add_relation(pubid, rel, if_not_exists);
if (stmt)
Expand All @@ -640,8 +676,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)

foreach(lc, rels)
{
Relation rel = (Relation) lfirst(lc);
Oid relid = RelationGetRelid(rel);
PublicationRelationQual *rel = (PublicationRelationQual *) lfirst(lc);
Oid relid = RelationGetRelid(rel->relation);

prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
ObjectIdGetDatum(relid),
Expand All @@ -654,7 +690,7 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("relation \"%s\" is not part of the publication",
RelationGetRelationName(rel))));
RelationGetRelationName(rel->relation))));
}

ObjectAddressSet(obj, PublicationRelRelationId, prid);
Expand Down
26 changes: 20 additions & 6 deletions src/backend/parser/gram.y
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
relation_expr_list dostmt_opt_list
transform_element_list transform_type_list
TriggerTransitions TriggerReferencing
publication_name_list
publication_name_list publication_table_list
vacuum_relation_list opt_vacuum_relation_list

%type <list> group_by_list
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
%type <node> opt_publication_for_tables publication_for_tables
%type <node> opt_publication_for_tables publication_for_tables publication_table_elem
%type <value> publication_name_item

%type <list> opt_fdw_options fdw_options
Expand Down Expand Up @@ -9518,7 +9518,7 @@ opt_publication_for_tables:
;

publication_for_tables:
FOR TABLE relation_expr_list
FOR TABLE publication_table_list
{
$$ = (Node *) $3;
}
Expand Down Expand Up @@ -9549,23 +9549,23 @@ AlterPublicationStmt:
n->options = $5;
$$ = (Node *)n;
}
| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
| ALTER PUBLICATION name ADD_P TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
n->tableAction = DEFELEM_ADD;
$$ = (Node *)n;
}
| ALTER PUBLICATION name SET TABLE relation_expr_list
| ALTER PUBLICATION name SET TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
n->tableAction = DEFELEM_SET;
$$ = (Node *)n;
}
| ALTER PUBLICATION name DROP TABLE relation_expr_list
| ALTER PUBLICATION name DROP TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
Expand All @@ -9575,6 +9575,20 @@ AlterPublicationStmt:
}
;

publication_table_list:
publication_table_elem { $$ = list_make1($1); }
| publication_table_list ',' publication_table_elem { $$ = lappend($1, $3); }
;

publication_table_elem: relation_expr OptWhereClause
{
PublicationTable *n = makeNode(PublicationTable);
n->relation = $1;
n->whereClause = $2;
$$ = (Node *) n;
}
;

/*****************************************************************************
*
* CREATE SUBSCRIPTION name ...
Expand Down
Loading
0