8000 Adds the pgmq extension to support queuing by olirice · Pull Request #1120 · supabase/postgres · GitHub
[go: up one dir, main page]

Skip to content

Adds the pgmq extension to support queuing #1120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible/files/postgresql_config/supautils.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ supautils.policy_grants = '{"postgres":["auth.audit_log_entries","auth.identitie
# full list: address_standardizer, address_standardizer_data_us, adminpack, amcheck, autoinc, bloom, btree_gin, btree_gist, citext, cube, dblink, dict_int, dict_xsyn, earthdistance, file_fdw, fuzzystrmatch, hstore, http, hypopg, index_advisor, insert_username, intagg, intarray, isn, lo, ltree, moddatetime, old_snapshot, orioledb, pageinspect, pg_buffercache, pg_cron, pg_freespacemap, pg_graphql, pg_hashids, pg_jsonschema, pg_net, pg_prewarm, pg_repack, pg_stat_monitor, pg_stat_statements, pg_surgery, pg_tle, pg_trgm, pg_visibility, pg_walinspect, pgaudit, pgcrypto, pgjwt, pgroonga, pgroonga_database, pgrouting, pgrowlocks, pgsodium, pgstattuple, pgtap, plcoffee, pljava, plls, plpgsql, plpgsql_check, plv8, postgis, postgis_raster, postgis_sfcgal, postgis_tiger_geocoder, postgis_topology, postgres_fdw, refint, rum, seg, sslinfo, supabase_vault, supautils, tablefunc, tcn, timescaledb, tsm_system_rows, tsm_system_time, unaccent, uuid-ossp, vector, wrappers, xml2
# omitted because may be unsafe: adminpack, amcheck, file_fdw, lo, old_snapshot, pageinspect, pg_buffercache, pg_freespacemap, pg_surgery, pg_visibility
# omitted because deprecated: intagg, xml2
supautils.privileged_extensions = 'address_standardizer, address_standardizer_data_us, autoinc, bloom, btree_gin, btree_gist, citext, cube, dblink, dict_int, dict_xsyn, earthdistance, fuzzystrmatch, hstore, http, hypopg, index_advisor, insert_username, intarray, isn, ltree, moddatetime, orioledb, pg_cron, pg_graphql, pg_hashids, pg_jsonschema, pg_net, pg_repack, pg_stat_monitor, pg_stat_statements, pg_tle, pg_trgm, pg_walinspect, pgaudit, pgcrypto, pgjwt, pg_prewarm, pgroonga, pgroonga_database, pgrouting, pgrowlocks, pgstattuple, pgsodium, pgtap, plcoffee, pljava, plls, plpgsql, plpgsql_check, plv8, postgis, postgis_raster, postgis_sfcgal, postgis_tiger_geocoder, postgis_topology, postgres_fdw, refint, rum, seg, sslinfo, supabase_vault, supautils, tablefunc, tcn, timescaledb, tsm_system_rows, tsm_system_time, unaccent, uuid-ossp, vector, wrappers'
supautils.privileged_extensions = 'address_standardizer, address_standardizer_data_us, autoinc, bloom, btree_gin, btree_gist, citext, cube, dblink, dict_int, dict_xsyn, earthdistance, fuzzystrmatch, hstore, http, hypopg, index_advisor, insert_username, intarray, isn, ltree, moddatetime, orioledb, pg_cron, pg_graphql, pg_hashids, pg_jsonschema, pg_net, pg_partman, pg_repack, pg_stat_monitor, pg_stat_statements, pg_tle, pg_trgm, pg_walinspect, pgaudit, pgcrypto, pgjwt, pg_prewarm, pgmq, pgroonga, pgroonga_database, pgrouting, pgrowlocks, pgstattuple, pgsodium, pgtap, plcoffee, pljava, plls, plpgsql, plpgsql_check, plv8, postgis, postgis_raster, postgis_sfcgal, postgis_tiger_geocoder, postgis_topology, postgres_fdw, refint, rum, seg, sslinfo, supabase_vault, supautils, tablefunc, tcn, timescaledb, tsm_system_rows, tsm_system_time, unaccent, uuid-ossp, vector, wrappers'
supautils.privileged_extensions_custom_scripts_path = '/etc/postgresql-custom/extension-custom-scripts'
supautils.privileged_extensions_superuser = 'supabase_admin'
supautils.privileged_role = 'postgres'
Expand Down
2 changes: 1 addition & 1 deletion common-nix.vars.pkr.hcl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
postgres-version = "15.8.1.002"
postgres-version = "15.8.1.003"
2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
./nix/ext/pgroonga.nix
./nix/ext/index_advisor.nix
./nix/ext/wal2json.nix
./nix/ext/pgmq.nix
./nix/ext/pg_repack.nix
./nix/ext/pg-safeupdate.nix
./nix/ext/plpgsql-check.nix
Expand All @@ -119,6 +120,7 @@
./nix/ext/pg_hashids.nix
./nix/ext/pgsodium.nix
./nix/ext/pg_graphql.nix
./nix/ext/pg_partman.nix
./nix/ext/pg_stat_monitor.nix
./nix/ext/pg_jsonschema.nix
./nix/ext/pgvector.nix
Expand Down
34 changes: 34 additions & 0 deletions nix/ext/pg_partman.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{ lib, stdenv, fetchFromGitHub, postgresql }:

stdenv.mkDerivation rec {
pname = "pg_partman";
version = "5.1.0";

buildInputs = [ postgresql ];

src = fetchFromGitHub {
owner = "pgpartman";
repo = pname;
rev = "refs/tags/v${version}";
sha256 = "sha256-GrVOJ5ywZMyqyDroYDLdKkXDdIJSDGhDfveO/ZvrmYs=";
};

installPhase = ''
mkdir -p $out/{lib,share/postgresql/extension}

cp src/*${postgresql.dlSuffix} $out/lib
cp updates/* $out/share/postgresql/extension
cp -r sql/* $out/share/postgresql/extension
cp *.control $out/share/postgresql/extension
'';

meta = with lib; {
description = "Partition management extension for PostgreSQL";
homepage = "https://github.com/pgpartman/pg_partman";
changelog = "https://github.com/pgpartman/pg_partman/blob/v${version}/CHANGELOG.md";
maintainers = with maintainers; [ samrose ];
platforms = postgresql.meta.platforms;
license = licenses.postgresql;
broken = versionOlder postgresql.version "14";
};
}
33 changes: 33 additions & 0 deletions nix/ext/pgmq.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{ lib, stdenv, fetchFromGitHub, postgresql }:

stdenv.mkDerivation rec {
pname = "pgmq";
version = "1.4.4";
buildInputs = [ postgresql ];
src = fetchFromGitHub {
owner = "tembo-io";
repo = pname;
rev = "v${version}";
hash = "sha256-z+8/BqIlHwlMnuIzMz6eylmYbSmhtsNt7TJf/CxbdVw=";
};

buildPhase = ''
cd pgmq-extension
'';

installPhase = ''
mkdir -p $out/{lib,share/postgresql/extension}

mv sql/pgmq.sql $out/share/postgresql/extension/pgmq--${version}.sql
cp sql/*.sql $out/share/postgresql/extension
cp *.control $out/share/postgresql/extension
'';

meta = with lib; {
description = "A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.";
homepage = "https://github.com/tembo-io/pgmq";
maintainers = with maintainers; [ olirice ];
platforms = postgresql.meta.platforms;
license = licenses.postgresql;
};
}
375 changes: 252 additions & 123 deletions nix/tests/expected/extensions_sql_interface.out

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions nix/tests/expected/pg_partman.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
create schema if not exists partman_test;
/*
Simple Time Based: 1 Partition Per Day

For native partitioning, you must start with a parent table that has already been set up to be partitioned in the desired type. Currently pg_partman only supports the RANGE type of partitioning (both for time & id). You cannot turn a non-partitioned table into the parent table of a partitioned set, which can make migration a challenge. This document will show you some techniques for how to manage this later. For now, we will start with a brand new table in this example. Any non-unique indexes can also be added to the parent table in PG11+ and they will automatically be created on all child tables.
*/
create table partman_test.time_taptest_table(
col1 int,
col2 text default 'stuff',
col3 timestamptz not null default now()
)
partition by range (col3);
create index on partman_test.time_tap (col3);
ERROR: relation "partman_test.time_tap" does not exist
/*
Unique indexes (including primary keys) cannot be created on a natively partitioned parent unless they include the partition key. For time-based partitioning that generally doesn't work out since that would limit only a single timestamp value in each child table. pg_partman helps to manage this by using a template table to manage properties that currently are not supported by native partitioning. Note that this does not solve the issue of the constraint not being enforced across the entire partition set. See the main documentation to see which properties are managed by the template.

Manually create the template table first so that when we run create_parent() the initial child tables that are created will have a primary key. If you do not supply a template table to pg_partman, it will create one for you in the schema that you installed the extension to. However properties you add to that template are only then applied to newly created child tables after that point. You will have to retroactively apply those properties manually to any child tables that already existed.
*/
create table partman_test.time_taptest_table_template (like partman_test.time_taptest_table);
alter table partman_test.time_taptest_table_template add primary key (col1);
/*
Review tables in the partman_test schema
*/
select
table_name,
table_type
from
information_schema.tables
where
table_schema = 'partman_test';
table_name | table_type
-----------------------------+------------
time_taptest_table | BASE TABLE
time_taptest_table_template | BASE TABLE
(2 rows)

select public.create_parent(
p_parent_table := 'partman_test.time_taptest_table',
p_control := 'col3',
p_interval := '1 day',
p_template_table := 'partman_test.time_taptest_table_template'
);
create_parent
---------------
t
(1 row)

/*
Review tables in the partman_test schema, which should now include daily partitions
*/
select
-- dates in partition names are variable, so reduced to the prefix
substring(table_name, 1, 21) as table_prefix,
table_type
from
information_schema.tables
where
table_schema = 'partman_test'
order by
table_name;
table_prefix | table_type
-----------------------+------------
time_taptest_table | BASE TABLE
time_taptest_table_de | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_p2 | BASE TABLE
time_taptest_table_te | BASE TABLE
(12 rows)

/*
Confirm maintenance proc runs without issue
*/
call public.run_maintenance_proc();
/*
Make sure the background worker is NOT enabled.
This is intentional. We document using pg_cron to schedule calls to
public.run_maintenance_proc(). That is consistent with other providers.
*/
select
application_name
from
pg_stat_activity
where
application_name = 'pg_partman_bgw';
application_name
------------------
(0 rows)

-- Cleanup
drop schema partman_test cascade;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table partman_test.time_taptest_table
drop cascades to table partman_test.time_taptest_table_template
143 changes: 143 additions & 0 deletions nix/tests/expected/pgmq.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
-- Test the standard flow
select
pgmq.create('Foo');
create
--------

(1 row)

select
*
from
pgmq.send(
queue_name:='Foo',
msg:='{"foo": "bar1"}'
);
send
------
1
(1 row)

-- Test queue is not case sensitive
select
msg_id,
read_ct,
message
from
pgmq.send(
queue_name:='foo', -- note: lowercase useage
msg:='{"foo": "bar2"}',
delay:=5
);
ERROR: column "msg_id" does not exist
LINE 2: msg_id,
^
select
msg_id,
read_ct,
message
from
pgmq.read(
queue_name:='Foo',
vt:=30,
qty:=2
);
msg_id | read_ct | message
--------+---------+-----------------
1 | 1 | {"foo": "bar1"}
(1 row)

select
msg_id,
read_ct,
message
from
pgmq.pop('Foo');
msg_id | read_ct | message
--------+---------+---------
(0 rows)

-- Archive message with msg_id=2.
select
pgmq.archive(
queue_name:='Foo',
msg_id:=2
);
archive
---------
f
(1 row)

select
pgmq.create('my_queue');
create
--------

(1 row)

select
pgmq.send_batch(
queue_name:='my_queue',
msgs:=array['{"foo": "bar3"}','{"foo": "bar4"}','{"foo": "bar5"}']::jsonb[]
);
send_batch
------------
1
2
3
(3 rows)

select
pgmq.archive(
queue_name:='my_queue',
msg_ids:=array[3, 4, 5]
);
archive
---------
3
(1 row)

select
pgmq.delete('my_queue', 6);
delete
--------
f
(1 row)

select
pgmq.drop_queue('my_queue');
drop_queue
------------
t
(1 row)

select
pgmq.create_partitioned(
'my_partitioned_queue',
'5 seconds',
'10 seconds'
);
create_partitioned
--------------------

(1 row)

-- Make sure SQLI enabling characters are blocked
select pgmq.create('F--oo');
ERROR: queue name contains invalid characters: $, ;, --, or \'
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)"
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM
select pgmq.create('F$oo');
ERROR: queue name contains invalid characters: $, ;, --, or \'
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)"
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM
select pgmq.create($$F'oo$$);
ERROR: queue name contains invalid characters: $, ;, --, or \'
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)"
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM
12 changes: 12 additions & 0 deletions nix/tests/migrations/pgmq.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
select
pgmq.create('Foo');

select
*
from
pgmq.send(
queue_name:='Foo',
msg:='{"foo": "bar1"}'
);


Loading
Loading
0