8000 Merge pull request #417 from supabase/fix/publications-all-tables-to-… · rogervaas/postgres-meta@bc54900 · GitHub
[go: up one dir, main page]

Skip to content

Commit bc54900

Browse files
authored
Merge pull request supabase#417 from supabase/fix/publications-all-tables-to-list-of-tables
fix(publications): support list of tables <-> all tables on update
2 parents 49be99e + 9609e31 commit bc54900

File tree

4 files changed

+165
-87
lines changed

4 files changed

+165
-87
lines changed

src/lib/PostgresMetaPublications.ts

Lines changed: 100 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,17 @@ export default class PostgresMetaPublications {
6666
publish_update = false,
6767
publish_delete = false,
6868
publish_truncate = false,
69-
tables,
69+
tables = null,
7070
}: {
7171
name: string
7272
publish_insert?: boolean
7373
publish_update?: boolean
7474
publish_delete?: boolean
7575
publish_truncate?: boolean
76-
tables?: string[]
76+
tables?: string[] | null
7777
}): Promise<PostgresMetaResult<PostgresPublication>> {
7878
let tableClause: string
79-
if (tables === undefined) {
79+
if (tables === undefined || tables === null) {
8080
tableClause = 'FOR ALL TABLES'
8181
} else if (tables.length === 0) {
8282
tableClause = ''
@@ -127,87 +127,106 @@ CREATE PUBLICATION ${ident(name)} ${tableClause}
127127
publish_update?: boolean
128128
publish_delete?: boolean
129129
publish_truncate?: boolean
130-
tables?: string[]
130+
tables?: string[] | null
131131
}
132132
): Promise<PostgresMetaResult<PostgresPublication>> {
133-
const { data: old, error } = await this.retrieve({ id })
134-
if (error) {
135-
return { data: null, error }
136-
}
137-
138-
// Need to work around the limitations of the SQL. Can't add/drop tables from
139-
// a publication with FOR ALL TABLES. Can't use the SET TABLE clause without
140-
// at least one table.
141-
//
142-
// new tables
143-
//
144-
// | undefined | string[] |
145-
// ---------|-----------|-----------------|
146-
// null | '' | 400 Bad Request |
147-
// old tables ---------|-----------|-----------------|
148-
// object[] | '' | See below |
149-
//
150-
// new tables
151-
//
152-
// | [] | [...] |
153-
// ---------|-----------|-----------------|
154-
// [] | '' | SET TABLE |
155-
// old tables ---------|-----------|-----------------|
156-
// [...] | DROP all | SET TABLE |
157-
//
158-
let tableSql: string
159-
if (tables === undefined) {
160-
tableSql = ''
161-
} else if (old!.tables === null) {
162-
throw new Error('Tables cannot be added to or dropped from FOR ALL TABLES publications')
163-
} else if (tables.length > 0) {
164-
tableSql = `ALTER PUBLICATION ${ident(old!.name)} SET TABLE ${tables
165-
.map((t) => {
166-
if (!t.includes('.')) {
167-
return ident(t)
168-
}
169-
170-
const [schema, ...rest] = t.split('.')
171-
const table = rest.join('.')
172-
return `${ident(schema)}.${ident(table)}`
173-
})
174-
.join(',')};`
175-
} else if (old!.tables.length === 0) {
176-
tableSql = ''
177-
} else {
178-
tableSql = `ALTER PUBLICATION ${ident(old!.name)} DROP TABLE ${old!.tables
179-
.map(
180-
(table: { schema: string; name: string }) => `${ident(table.schema)}.${ident(table.name)}`
133+
const sql = `
134+
do $$
135+
declare
136+
id oid := ${literal(id)};
137+
old record;
138+
new_name text := ${name === undefined ? null : literal(name)};
139+
new_owner text := ${owner === undefined ? null : literal(owner)};
140+
new_publish_insert bool := ${publish_insert ?? null};
141+
new_publish_update bool := ${publish_update ?? null};
142+
new_publish_delete bool := ${publish_delete ?? null};
143+
new_publish_truncate bool := ${publish_truncate ?? null};
144+
new_tables text := ${
145+
tables === undefined
146+
? null
147+
: literal(
148+
tables === null
149+
? 'all tables'
150+
: tables
151+
.map((t) => {
152+
if (!t.includes('.')) {
153+
return ident(t)
154+
}
155+
156+
const [schema, ...rest] = t.split('.')
157+
const table = rest.join('.')
158+
return `${ident(schema)}.${ident(table)}`
159+
})
160+
.join(',')
181161
)
182-
.join(',')};`
183-
}
184-
185-
let publishOps = []
186-
if (publish_insert ?? old!.publish_insert) publishOps.push('insert')
187-
if (publish_update ?? old!.publish_update) publishOps.push('update')
188-
if (publish_delete ?? old!.publish_delete) publishOps.push('delete')
189-
if (publish_truncate ?? old!.publish_truncate) publishOps.push('truncate')
190-
const publishSql = `ALTER PUBLICATION ${ident(old!.name)} SET (publish = '${publishOps.join(
191-
','
192-
)}');`
193-
194-
const ownerSql =
195-
owner === undefined ? '' : `ALTER PUBLICATION ${ident(old!.name)} OWNER TO ${ident(owner)};`
196-
197-
const nameSql =
198-
name === undefined || name === old!.name
199-
? ''
200-
: `ALTER PUBLICATION ${ident(old!.name)} RENAME TO ${ident(name)};`
201-
202-
// nameSql must be last
203-
const sql = `BEGIN; ${tableSql} ${publishSql} ${ownerSql} ${nameSql} COMMIT;`
204-
{
205-
const { error } = await this.query(sql)
206-
if (error) {
207-
return { data: null, error }
208-
}
209-
}
210-
return await this.retrieve({ id })
162+
};
163+
begin
164+
select * into old from pg_publication where oid = id;
165+
if old is null then
166+
raise exception 'Cannot find publication with id %', id;
167+
end if;
168+
169+
if new_tables is null then
170+
null;
171+
elsif new_tables = 'all tables' then
172+
if old.puballtables then
173+
null;
174+
else
175+
-- Need to recreate because going from list of tables <-> all tables with alter is not possible.
176+
execute(format('drop publication %1$I; create publication %1$I for all tables;', old.pubname));
177+
end if;
178+
else
179+
if old.puballtables then
180+
-- Need to recreate because going from list of tables <-> all tables with alter is not possible.
181+
execute(format('drop publication %1$I; create publication %1$I;', old.pubname));
182+
elsif exists(select from pg_publication_rel where prpubid = id) then
183+
execute(
184+
format(
185+
'alter publication %I drop table %s',
186+
old.pubname,
187+
(select string_agg(prrelid::regclass::text, ', ') from pg_publication_rel where prpubid = id)
188+
)
189+
);
190+
end if;
191+
192+
-- At this point the publication must have no tables.
193+
194+
if new_tables != '' then
195+
execute(format('alter publication %I add table %s', old.pubname, new_tables));
196+
end if;
197+
end if;
198+
199+
execute(
200+
format(
201+
'alter publication %I set (publish = %L);',
202+
old.pubname,
203+
concat_ws(
204+
', ',
205+
case when coalesce(new_publish_insert, old.pubinsert) then 'insert' end,
206+
case when coalesce(new_publish_update, old.pubupdate) then 'update' end,
207+
case when coalesce(new_publish_delete, old.pubdelete) then 'delete' end,
208+
case when coalesce(new_publish_truncate, old.pubtruncate) then 'truncate' end
209+
)
210+
)
211+
);
212+
213+
execute(format('alter publication %I owner to %I;', old.pubname, coalesce(new_owner, old.pubowner::regrole::name)));
214+
215+
-- Using the same name in the rename clause gives an error, so only do it if the new name is different.
216+
if new_name is not null and new_name != old.pubname then
217+
execute(format('alter publication %I rename to %I;', old.pubname, coalesce(new_name, old.pubname)));
218+
end if;
219+
220+
-- We need to retrieve the publication later, so we need a way to uniquely identify which publication this is.
221+
-- We can't rely on id because it gets changed if it got recreated.
222+
-- We use a temp table to store the unique name - DO blocks can't return a value.
223+
create temp table pg_meta_publication_tmp (name) on commit drop as values (coalesce(new_name, old.pubname));
224+
end $$;
225+
226+
with publications as (${publicationsSql}) select * from publications where name = (select name from pg_meta_publication_tmp);
227+
`
228+
const { data, error } = await this.query(sql)
229+
return { data: data[0], error }
211230
}
212231

213232
async remove(id: number): Promise<PostgresMetaResult<PostgresPublication>> {

src/lib/db.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,19 @@ export const init: (config: PoolConfig) => {
3535
try {
3636
if (!pool) {
3737
const pool = new Pool(config)
38-
const { rows } = await pool.query(sql)
38+
let res = await pool.query(sql)
39+
if (Array.isArray(res)) {
40+
res = res.reverse().find((x) => x.rows.length !== 0) ?? { rows: [] }
41+
}
3942
await pool.end()
40-
return { data: rows, error: null }
43+
return { data: res.rows, error: null }
4144
}
4245

43-
const { rows } = await pool.query(sql)
44-
return { data: rows, error: null }
46+
let res = await pool.query(sql)
47+
if (Array.isArray(res)) {
48+
res = res.reverse().find((x) => x.rows.length !== 0) ?? { rows: [] }
49+
}
50+
return { data: res.rows, error: null }
4551
} catch (e: any) {
4652
return { data: null, error: { message: e.message } }
4753
}

src/lib/sql/publications.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
SELECT
22
p.oid :: int8 AS id,
33
p.pubname AS name,
4-
r.rolname AS owner,
4+
p.pubowner::regrole::text AS owner,
55
p.pubinsert AS publish_insert,
66
p.pubupdate AS publish_update,
77
p.pubdelete AS publish_delete,
@@ -34,4 +34,3 @@ FROM
3434
WHERE
3535
pr.prpubid = p.oid
3636
) AS pr ON 1 = 1
37-
JOIN pg_roles AS r ON p.pubowner = r.oid

test/lib/publications.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,3 +213,57 @@ test('FOR ALL TABLES', async () => {
213213
)
214214
await pgMeta.publications.remove(res.data!.id)
215215
})
216+
217+
test('update no tables -> all tables', async () => {
218+
const { data } = await pgMeta.publications.create({
219+
name: 'pub',
220+
tables: [],
221+
})
222+
const res = await pgMeta.publications.update(data!.id, { tables: null })
223+
expect(cleanNondet(res)).toMatchInlineSnapshot(
224+
{ data: { id: expect.any(Number) } },
225+
`
226+
Object {
227+
"data": Object {
228+
"id": Any<Number>,
229+
"name": "pub",
230+
"owner": "postgres",
231+
"publish_delete": false,
232+
"publish_insert": false,
233+
"publish_truncate": false,
234+
"publish_update": false,
235+
"tables": null,
236+
},
237+
"error": null,
238+
}
239+
`
240+
)
241+
await pgMeta.publications.remove(res.data!.id)
242+
})
243+
244+
test('update all tables -> no tables', async () => {
245+
const { data } = await pgMeta.publications.create({
246+
name: 'pub',
247+
tables: null,
248+
})
249+
const res = await pgMeta.publications.update(data!.id, { tables: [] })
250+
expect(cleanNondet(res)).toMatchInlineSnapshot(
251+
{ data: { id: expect.any(Number) } },
252+
`
253+
Object {
254+
"data": Object {
255+
"id": Any<Number>,
256+
"name": "pub",
257+
"owner": "postgres",
258+
"publish_delete": false,
259+
"publish_insert": false,
260+
"publish_truncate": false,
261+
"publish_update": false,
262+
"tables": Array [],
263+
},
264+
"error": null,
265+
}
266+
`
267+
)
268+
await pgMeta.publications.remove(res.data!.id)
269+
})

0 commit comments

Comments
 (0)
0