@@ -66,17 +66,17 @@ export default class PostgresMetaPublications {
66
66
publish_update = false ,
67
67
publish_delete = false ,
68
68
publish_truncate = false ,
69
- tables,
69
+ tables = null ,
70
70
} : {
71
71
name : string
72
72
publish_insert ?: boolean
73
73
publish_update ?: boolean
74
74
publish_delete ?: boolean
75
75
publish_truncate ?: boolean
76
- tables ?: string [ ]
76
+ tables ?: string [ ] | null
77
77
} ) : Promise < PostgresMetaResult < PostgresPublication > > {
78
78
let tableClause : string
79
- if ( tables === undefined ) {
79
+ if ( tables === undefined || tables === null ) {
80
80
tableClause = 'FOR ALL TABLES'
81
81
} else if ( tables . length === 0 ) {
82
82
tableClause = ''
@@ -127,87 +127,106 @@ CREATE PUBLICATION ${ident(name)} ${tableClause}
127
127
publish_update ?: boolean
128
128
publish_delete ?: boolean
129
129
publish_truncate ?: boolean
130
- tables ?: string [ ]
130
+ tables ?: string [ ] | null
131
131
}
132
132
) : Promise < PostgresMetaResult < PostgresPublication > > {
133
- const { data : old , error } = await this . retrieve ( { id } )
134
- if ( error ) {
135
- return { data : null , error }
136
- }
137
-
138
- // Need to work around the limitations of the SQL. Can't add/drop tables from
139
- // a publication with FOR ALL TABLES. Can't use the SET TABLE clause without
140
- // at least one table.
141
- //
142
- // new tables
143
- //
144
- // | undefined | string[] |
145
- // ---------|-----------|-----------------|
146
- // null | '' | 400 Bad Request |
147
- // old tables ---------|-----------|-----------------|
148
- // object[] | '' | See below |
149
- //
150
- // new tables
151
- //
152
- // | [] | [...] |
153
- // ---------|-----------|-----------------|
154
- // [] | '' | SET TABLE |
155
- // old tables ---------|-----------|-----------------|
156
- // [...] | DROP all | SET TABLE |
157
- //
158
- let tableSql : string
159
- if ( tables === undefined ) {
160
- tableSql = ''
161
- } else if ( old ! . tables === null ) {
162
- throw new Error ( 'Tables cannot be added to or dropped from FOR ALL TABLES publications' )
163
- } else if ( tables . length > 0 ) {
164
- tableSql = `ALTER PUBLICATION ${ ident ( old ! . name ) } SET TABLE ${ tables
165
- . map ( ( t ) => {
166
- if ( ! t . includes ( '.' ) ) {
167
- return ident ( t )
168
- }
169
-
170
- const [ schema , ...rest ] = t . split ( '.' )
171
- const table = rest . join ( '.' )
172
- return `${ ident ( schema ) } .${ ident ( table ) } `
173
- } )
174
- . join ( ',' ) } ;`
175
- } else if ( old ! . tables . length === 0 ) {
176
- tableSql = ''
177
- } else {
178
- tableSql = `ALTER PUBLICATION ${ ident ( old ! . name ) } DROP TABLE ${ old ! . tables
179
- . map (
180
- ( table : { schema : string ; name : string } ) => `${ ident ( table . schema ) } .${ ident ( table . name ) } `
133
+ const sql = `
134
+ do $$
135
+ declare
136
+ id oid := ${ literal ( id ) } ;
137
+ old record;
138
+ new_name text := ${ name === undefined ? null : literal ( name ) } ;
139
+ new_owner text := ${ owner === undefined ? null : literal ( owner ) } ;
140
+ new_publish_insert bool := ${ publish_insert ?? null } ;
141
+ new_publish_update bool := ${ publish_update ?? null } ;
142
+ new_publish_delete bool := ${ publish_delete ?? null } ;
143
+ new_publish_truncate bool := ${ publish_truncate ?? null } ;
144
+ new_tables text := ${
145
+ tables === undefined
146
+ ? null
147
+ : literal (
148
+ tables === null
149
+ ? 'all tables'
150
+ : tables
151
+ . map ( ( t ) => {
152
+ if ( ! t . includes ( '.' ) ) {
153
+ return ident ( t )
154
+ }
155
+
156
+ const [ schema , ...rest ] = t . split ( '.' )
157
+ const table = rest . join ( '.' )
158
+ return `${ ident ( schema ) } .${ ident ( table ) } `
159
+ } )
160
+ . join ( ',' )
181
161
)
182
- . join ( ',' ) } ;`
183
- }
184
-
185
- let publishOps = [ ]
186
- if ( publish_insert ?? old ! . publish_insert ) publishOps . push ( 'insert' )
187
- if ( publish_update ?? old ! . publish_update ) publishOps . push ( 'update' )
188
- if ( publish_delete ?? old ! . publish_delete ) publishOps . push ( 'delete' )
189
- if ( publish_truncate ?? old ! . publish_truncate ) publishOps . push ( 'truncate' )
190
- const publishSql = `ALTER PUBLICATION ${ ident ( old ! . name ) } SET (publish = '${ publishOps . join (
191
- ','
192
- ) } ');`
193
-
194
- const ownerSql =
195
- owner === undefined ? '' : `ALTER PUBLICATION ${ ident ( old ! . name ) } OWNER TO ${ ident ( owner ) } ;`
196
-
197
- const nameSql =
198
- name === undefined || name === old ! . name
199
- ? ''
200
- : `ALTER PUBLICATION ${ ident ( old ! . name ) } RENAME TO ${ ident ( name ) } ;`
201
-
202
- // nameSql must be last
203
- const sql = `BEGIN; ${ tableSql } ${ publishSql } ${ ownerSql } ${ nameSql } COMMIT;`
204
- {
205
- const { error } = await this . query ( sql )
206
- if ( error ) {
207
- return { data : null , error }
208
- }
209
- }
210
- return await this . retrieve ( { id } )
162
+ } ;
163
+ begin
164
+ select * into old from pg_publication where oid = id;
165
+ if old is null then
166
+ raise exception 'Cannot find publication with id %', id;
167
+ end if;
168
+
169
+ if new_tables is null then
170
+ null;
171
+ elsif new_tables = 'all tables' then
172
+ if old.puballtables then
173
+ null;
174
+ else
175
+ -- Need to recreate because going from list of tables <-> all tables with alter is not possible.
176
+ execute(format('drop publication %1$I; create publication %1$I for all tables;', old.pubname));
177
+ end if;
178
+ else
179
+ if old.puballtables then
180
+ -- Need to recreate because going from list of tables <-> all tables with alter is not possible.
181
+ execute(format('drop publication %1$I; create publication %1$I;', old.pubname));
182
+ elsif exists(select from pg_publication_rel where prpubid = id) then
183
+ execute(
184
+ format(
185
+ 'alter publication %I drop table %s',
186
+ old.pubname,
187
+ (select string_agg(prrelid::regclass::text, ', ') from pg_publication_rel where prpubid = id)
188
+ )
189
+ );
190
+ end if;
191
+
192
+ -- At this point the publication must have no tables.
193
+
194
+ if new_tables != '' then
195
+ execute(format('alter publication %I add table %s', old.pubname, new_tables));
196
+ end if;
197
+ end if;
198
+
199
+ execute(
200
+ format(
201
+ 'alter publication %I set (publish = %L);',
202
+ old.pubname,
203
+ concat_ws(
204
+ ', ',
205
+ case when coalesce(new_publish_insert, old.pubinsert) then 'insert' end,
206
+ case when coalesce(new_publish_update, old.pubupdate) then 'update' end,
207
+ case when coalesce(new_publish_delete, old.pubdelete) then 'delete' end,
208
+ case when coalesce(new_publish_truncate, old.pubtruncate) then 'truncate' end
209
+ )
210
+ )
211
+ );
212
+
213
+ execute(format('alter publication %I owner to %I;', old.pubname, coalesce(new_owner, old.pubowner::regrole::name)));
214
+
215
+ -- Using the same name in the rename clause gives an error, so only do it if the new name is different.
216
+ if new_name is not null and new_name != old.pubname then
217
+ execute(format('alter publication %I rename to %I;', old.pubname, coalesce(new_name, old.pubname)));
218
+ end if;
219
+
220
+ -- We need to retrieve the publication later, so we need a way to uniquely identify which publication this is.
221
+ -- We can't rely on id because it gets changed if it got recreated.
222
+ -- We use a temp table to store the unique name - DO blocks can't return a value.
223
+ create temp table pg_meta_publication_tmp (name) on commit drop as values (coalesce(new_name, old.pubname));
224
+ end $$;
225
+
226
+ with publications as (${ publicationsSql } ) select * from publications where name = (select name from pg_meta_publication_tmp);
227
+ `
228
+ const { data, error } = await this . query ( sql )
229
+ return { data : data [ 0 ] , error }
211
230
}
212
231
213
232
async remove ( id : number ) : Promise < PostgresMetaResult < PostgresPublication > > {
0 commit comments