8000 Adds the pgmq extension to support queuing (#1120) · senal88/postgres@263aa1e · GitHub
[go: up one dir, main page]

Skip to content

Commit 263aa1e

Browse files
authored
Adds the pgmq extension to support queuing (supabase#1120)
* add pgmq SQL only extension * revert accidental file inclusion * add public interface test coverage * order by namespace for tables in interface test to make timescale stable since they use 3 schemas * pgmq in prime * update to pgmq 1.4.2 * regression tests for pgmq * add pgmq migration test script * add pg_partman * remove pg_partman_bgq from test suite postgresql.conf * checkin broken test * checkin broken test 2 * bump pgmq to 1.4.4 * sync test outputs and pg_partman with 1.4.4 * add pgmq to supautils privileged extensions
1 parent 6757ba0 commit 263aa1e

File tree

13 files changed

+754
-134
lines changed

13 files changed

+754
-134
lines changed

ansible/files/postgresql_config/supautils.conf.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ supautils.policy_grants = '{"postgres":["auth.audit_log_entries","auth.identitie
33
# full list: address_standardizer, address_standardizer_data_us, adminpack, amcheck, autoinc, bloom, btree_gin, btree_gist, citext, cube, dblink, dict_int, dict_xsyn, earthdistance, file_fdw, fuzzystrmatch, hstore, http, hypopg, index_advisor, insert_username, intagg, intarray, isn, lo, ltree, moddatetime, old_snapshot, orioledb, pageinspect, pg_buffercache, pg_cron, pg_freespacemap, pg_graphql, pg_hashids, pg_jsonschema, pg_net, pg_prewarm, pg_repack, pg_stat_monitor, pg_stat_statements, pg_surgery, pg_tle, pg_trgm, pg_visibility, pg_walinspect, pgaudit, pgcrypto, pgjwt, pgroonga, pgroonga_database, pgrouting, pgrowlocks, pgsodium, pgstattuple, pgtap, plcoffee, pljava, plls, plpgsql, plpgsql_check, plv8, postgis, postgis_raster, postgis_sfcgal, postgis_tiger_geocoder, postgis_topology, postgres_fdw, refint, rum, seg, sslinfo, supabase_vault, supautils, tablefunc, tcn, timescaledb, tsm_system_rows, tsm_system_time, unaccent, uuid-ossp, vector, wrappers, xml2
44
# omitted because may be unsafe: adminpack, amcheck, file_fdw, lo, old_snapshot, pageinspect, pg_buffercache, pg_freespacemap, pg_surgery, pg_visibility
55
# omitted because deprecated: intagg, xml2
6-
supautils.privileged_extensions = 'address_standardizer, address_standardizer_data_us, autoinc, bloom, btree_gin, btree_gist, citext, cube, dblink, dict_int, dict_xsyn, earthdistance, fuzzystrmatch, hstore, http, hypopg, index_advisor, insert_username, intarray, isn, ltree, moddatetime, orioledb, pg_cron, pg_graphql, pg_hashids, pg_jsonschema, pg_net, pg_repack, pg_stat_monitor, pg_stat_statements, pg_tle, pg_trgm, pg_walinspect, pgaudit, pgcrypto, pgjwt, pg_prewarm, pgroonga, pgroonga_database, pgrouting, pgrowlocks, pgstattuple, pgsodium, pgtap, plcoffee, pljava, plls, plpgsql, plpgsql_check, plv8, postgis, postgis_raster, postgis_sfcgal, postgis_tiger_geocoder, postgis_topology, postgres_fdw, refint, rum, seg, sslinfo, supabase_vault, supautils, tablefunc, tcn, timescaledb, tsm_system_rows, tsm_system_time, unaccent, uuid-ossp, vector, wrappers'
6+
supautils.privileged_extensions = 'address_standardizer, address_standardizer_data_us, autoinc, bloom, btree_gin, btree_gist, citext, cube, dblink, dict_int, dict_xsyn, earthdistance, fuzzystrmatch, hstore, http, hypopg, index_advisor, insert_username, intarray, isn, ltree, moddatetime, orioledb, pg_cron, pg_graphql, pg_hashids, pg_jsonschema, pg_net, pg_partman, pg_repack, pg_stat_monitor, pg_stat_statements, pg_tle, pg_trgm, pg_walinspect, pgaudit, pgcrypto, pgjwt, pg_prewarm, pgmq, pgroonga, pgroonga_database, pgrouting, pgrowlocks, pgstattuple, pgsodium, pgtap, plcoffee, pljava, plls, plpgsql, plpgsql_check, plv8, postgis, postgis_raster, postgis_sfcgal, postgis_tiger_geocoder, postgis_topology, postgres_fdw, refint, rum, seg, sslinfo, supabase_vault, supautils, tablefunc, tcn, timescaledb, tsm_system_rows, tsm_system_time, unaccent, uuid-ossp, vector, wrappers'
77
supautils.privileged_extensions_custom_scripts_path = '/etc/postgresql-custom/extension-custom-scripts'
88
supautils.privileged_extensions_superuser = 'supabase_admin'
99
supautils.privileged_role = 'postgres'

common-nix.vars.pkr.hcl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
postgres-version = "15.8.1.002"
1+
postgres-version = "15.8.1.003"

flake.nix

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
./nix/ext/pgroonga.nix
104104
./nix/ext/index_advisor.nix
105105
./nix/ext/wal2json.nix
106+
./nix/ext/pgmq.nix
106107
./nix/ext/pg_repack.nix
107108
./nix/ext/pg-safeupdate.nix
108109
./nix/ext/plpgsql-check.nix
@@ -119,6 +120,7 @@
119120
./nix/ext/pg_hashids.nix
120121
./nix/ext/pgsodium.nix
121122
./nix/ext/pg_graphql.nix
123+
./nix/ext/pg_partman.nix
122124
./nix/ext/pg_stat_monitor.nix
123125
./nix/ext/pg_jsonschema.nix
124126
./nix/ext/pgvector.nix

nix/ext/pg_partman.nix

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{ lib, stdenv, fetchFromGitHub, postgresql }:
2+
3+
stdenv.mkDerivation rec {
4+
pname = "pg_partman";
5+
version = "5.1.0";
6+
7+
buildInputs = [ postgresql ];
8+
9+
src = fetchFromGitHub {
10+
owner = "pgpartman";
11+
repo = pname;
12+
rev = "refs/tags/v${version}";
13+
sha256 = "sha256-GrVOJ5ywZMyqyDroYDLdKkXDdIJSDGhDfveO/ZvrmYs=";
14+
};
15+
16+
installPhase = ''
17+
mkdir -p $out/{lib,share/postgresql/extension}
18+
19+
cp src/*${postgresql.dlSuffix} $out/lib
20+
cp updates/* $out/share/postgresql/extension
2 341A 1+
cp -r sql/* $out/share/postgresql/extension
22+
cp *.control $out/share/postgresql/extension
23+
'';
24+
25+
meta = with lib; {
26+
description = "Partition management extension for PostgreSQL";
27+
homepage = "https://github.com/pgpartman/pg_partman";
28+
changelog = "https://github.com/pgpartman/pg_partman/blob/v${version}/CHANGELOG.md";
29+
maintainers = with maintainers; [ samrose ];
30+
platforms = postgresql.meta.platforms;
31+
license = licenses.postgresql;
32+
broken = versionOlder postgresql.version "14";
33+
};
34+
}

nix/ext/pgmq.nix

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{ lib, stdenv, fetchFromGitHub, postgresql }:
2+
3+
stdenv.mkDerivation rec {
4+
pname = "pgmq";
5+
version = "1.4.4";
6+
buildInputs = [ postgresql ];
7+
src = fetchFromGitHub {
8+
owner = "tembo-io";
9+
repo = pname;
10+
rev = "v${version}";
11+
hash = "sha256-z+8/BqIlHwlMnuIzMz6eylmYbSmhtsNt7TJf/CxbdVw=";
12+
};
13+
14+
buildPhase = ''
15+
cd pgmq-extension
16+
'';
17+
18+
installPhase = ''
19+
mkdir -p $out/{lib,share/postgresql/extension}
20+
21+
mv sql/pgmq.sql $out/share/postgresql/extension/pgmq--${version}.sql
22+
cp sql/*.sql $out/share/postgresql/extension
23+
cp *.control $out/share/postgresql/extension
24+
'';
25+
26+
meta = with lib; {
27+
description = "A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.";
28+
homepage = "https://github.com/tembo-io/pgmq";
29+
maintainers = with maintainers; [ olirice ];
30+
platforms = postgresql.meta.platforms;
31+
license = licenses.postgresql;
32+
};
33+
}

nix/tests/expected/extensions_sql_interface.out

Lines changed: 252 additions & 123 deletions
Large diffs are not rendered by default.

nix/tests/expected/pg_partman.out

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
create schema if not exists partman_test;
2+
/*
3+
Simple Time Based: 1 Partition Per Day
4+
5+
For native partitioning, you must start with a parent table that has already been set up to be partitioned in the desired type. Currently pg_partman only supports the RANGE type of partitioning (both for time & id). You cannot turn a non-partitioned table into the parent table of a partitioned set, which can make migration a challenge. This document will show you some techniques for how to manage this later. For now, we will start with a brand new table in this example. Any non-unique indexes can also be added to the parent table in PG11+ and they will automatically be created on all child tables.
6+
*/
7+
create table partman_test.time_taptest_table(
8+
col1 int,
9+
col2 text default 'stuff',
10+
col3 timestamptz not null default now()
11+
)
12+
partition by range (col3);
13+
create index on partman_test.time_tap (col3);
14+
ERROR: relation "partman_test.time_tap" does not exist
15+
/*
16+
Unique indexes (including primary keys) cannot be created on a natively partitioned parent unless they include the partition key. For time-based partitioning that generally doesn't work out since that would limit only a single timestamp value in each child table. pg_partman helps to manage this by using a template table to manage properties that currently are not supported by native partitioning. Note that this does not solve the issue of the constraint not being enforced across the entire partition set. See the main documentation to see which properties are managed by the template.
17+
18+
Manually create the template table first so that when we run create_parent() the initial child tables that are created will have a primary key. If you do not supply a template table to pg_partman, it will create one for you in the schema that you installed the extension to. However properties you add to that template are only then applied to newly created child tables after that point. You will have to retroactively apply those properties manually to any child tables that already existed.
19+
*/
20+
create table partman_test.time_taptest_table_template (like partman_test.time_taptest_table);
21+
alter table partman_test.time_taptest_table_template add primary key (col1);
22+
/*
23+
Review tables in the partman_test schema
24+
*/
25+
select
26+
table_name,
27+
table_type
28+
from
29+
information_schema.tables
30+
where
31+
table_schema = 'partman_test';
32+
table_name | table_type
33+
-----------------------------+------------
34+
time_taptest_table | BASE TABLE
35+
time_taptest_table_template | BASE TABLE
36+
(2 rows)
37+
38+
select public.create_parent(
39+
p_parent_table := 'partman_test.time_taptest_table',
40+
p_control := 'col3',
41+
p_interval := '1 day',
42+
p_template_table := 'partman_test.time_taptest_table_template'
43+
);
44+
create_parent
45+
---------------
46+
t
47+
(1 row)
48+
49+
/*
50+
Review tables in the partman_test schema, which should now include daily partitions
51+
*/
52+
select
53+
-- dates in partition names are variable, so reduced to the prefix
54+
substring(table_name, 1, 21) as table_prefix,
55+
table_type
56+
from
57+
information_schema.tables
58+
where
59+
table_schema = 'partman_test'
60+
order by
61+
table_name;
62+
table_prefix | table_type
63+
-----------------------+------------
64+
time_taptest_table | BASE TABLE
65+
time_taptest_table_de | BASE TABLE
66+
time_taptest_table_p2 | BASE TABLE
67+
time_taptest_table_p2 | BASE TABLE
68+
time_taptest_table_p2 | BASE TABLE
69+
time_taptest_table_p2 | BASE TABLE
70+
time_taptest_table_p2 | BASE TABLE
71+
time_taptest_table_p2 | BASE TABLE
72+
time_taptest_table_p2 | BASE TABLE
73+
time_taptest_table_p2 | BASE TABLE
74+
time_taptest_table_p2 | BASE TABLE
75+
time_taptest_table_te | BASE TABLE
76+
(12 rows)
77+
78+
/*
79+
Confirm maintenance proc runs without issue
80+
*/
81+
call public.run_maintenance_proc();
82+
/*
83+
Make sure the background worker is NOT enabled.
84+
This is intentional. We document using pg_cron to schedule calls to
85+
public.run_maintenance_proc(). That is consistent with other providers.
86+
*/
87+
select
88+
application_name
89+
from
90+
pg_stat_activity
91+
where
92+
application_name = 'pg_partman_bgw';
93+
application_name
94+
------------------
95+
(0 rows)
96+
97+
-- Cleanup
98+
drop schema partman_test cascade;
99+
NOTICE: drop cascades to 2 other objects
100+
DETAIL: drop cascades to table partman_test.time_taptest_table
101+
drop cascades to table partman_test.time_taptest_table_template

nix/tests/expected/pgmq.out

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
-- Test the standard flow
2+
select
3+
pgmq.create('Foo');
4+
create
5+
--------
6+
7+
(1 row)
8+
9+
select
10+
*
11+
from
12+
pgmq.send(
13+
queue_name:='Foo',
14+
msg:='{"foo": "bar1"}'
15+
);
16+
send
17+
------
18+
1
19+
(1 row)
20+
21+
-- Test queue is not case sensitive
22+
select
23+
msg_id,
24+
read_ct,
25+
message
26+
from
27+
pgmq.send(
28+
queue_name:='foo', -- note: lowercase useage
29+
msg:='{"foo": "bar2"}',
30+
delay:=5
31+
);
32+
ERROR: column "msg_id" does not exist
33+
LINE 2: msg_id,
34+
^
35+
select
36+
msg_id,
37+
read_ct,
38+
message
39+
from
40+
pgmq.read(
41+
queue_name:='Foo',
42+
vt:=30,
43+
qty:=2
44+
);
45+
msg_id | read_ct | message
46+
--------+---------+-----------------
47+
1 | 1 | {"foo": "bar1"}
48+
(1 row)
49+
50+
select
51+
msg_id,
52+
read_ct,
53+
message
54+
from
55+
pgmq.pop('Foo');
56+
msg_id | read_ct | message
57+
--------+---------+---------
58+
(0 rows)
59+
60+
-- Archive message with msg_id=2.
61+
select
62+
pgmq.archive(
63+
queue_name:='Foo',
64+
msg_id:=2
65+
);
66+
archive
67+
---------
68+
f
69+
(1 row)
70+
71+
select
72+
pgmq.create('my_queue');
73+
create
74+
--------
75+
76+
(1 row)
77+
78+
select
79+
pgmq.send_batch(
80+
queue_name:='my_queue',
81+
msgs:=array['{"foo": "bar3"}','{"foo": "bar4"}','{"foo": "bar5"}']::jsonb[]
82+
);
83+
send_batch
84+
------------
85+
1
86+
2
87+
3
88+
(3 rows)
89+
90+
select
91+
pgmq.archive(
92+
queue_name:='my_queue',
93+
msg_ids:=array[3, 4, 5]
94+
);
95+
archive
96+
---------
97+
3
98+
(1 row)
99+
100+
select
101+
pgmq.delete('my_queue', 6);
102+
delete
103+
--------
104+
f
105+
(1 row)
106+
107+
select
108+
pgmq.drop_queue('my_queue');
109+
drop_queue
110+
------------
111+
t
112+
(1 row)
113+
114+
select
115+
pgmq.create_partitioned(
116+
'my_partitioned_queue',
117+
'5 seconds',
118+
'10 seconds'
119+
);
120+
create_partitioned
121+
--------------------
122+
123+
(1 row)
124+
125+
-- Make sure SQLI enabling characters are blocked
126+
select pgmq.create('F--oo');
127+
ERROR: queue name contains invalid characters: $, ;, --, or \'
128+
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE
129+
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization
130+
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)"
131+
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM
132+
select pgmq.create('F$oo');
133+
ERROR: queue name contains invalid characters: $, ;, --, or \'
134+
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE
135+
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization
136+
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)"
137+
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM
138+
select pgmq.create($$F'oo$$);
139+
ERROR: queue name contains invalid characters: $, ;, --, or \'
140+
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE
141+
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization
142+
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)"
143+
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM

nix/tests/migrations/pgmq.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
select
2+
pgmq.create('Foo');
3+
4+
select
5+
*
6+
from
7+
pgmq.send(
8+
queue_name:='Foo',
9+
msg:='{"foo": "bar1"}'
10+
);
11+
12+

0 commit comments

Comments
 (0)
0