-
-
Notifications
You must be signed in to change notification settings - Fork 189
Adds the pgmq extension to support queuing #1120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
4cd3e1d
add pgmq SQL only extension
olirice 836c5f4
revert accidental file inclusion
olirice 408a973
add public interface test coverage
olirice e63d9eb
order by namespace for tables in interface test to make timescale sta…
olirice efbb71d
pgmq in prime
olirice 0fa765b
update to pgmq 1.4.2
olirice 8b2bc6c
regression tests for pgmq
olirice 6415640
add pgmq migration test script
olirice 107d7b6
add pg_partman
olirice 1a416c9
remove pg_partman_bgq from test suite postgresql.conf
olirice 0222a12
checkin broken test
olirice 67b8f1e
checkin broken test 2
olirice 53d79c3
bump pgmq to 1.4.4
olirice 880db79
sync test outputs and pg_partman with 1.4.4
olirice b73d7df
add pgmq to supautils privileged extensions
olirice f039826
Merge branch 'develop' into or/ext-pgmq
olirice bc21534
Merge branch 'develop' into or/ext-pgmq
olirice File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
postgres-version = "15.8.1.002" | ||
postgres-version = "15.8.1.003" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled
8000
differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ lib, stdenv, fetchFromGitHub, postgresql }: | ||
|
||
stdenv.mkDerivation rec { | ||
pname = "pg_partman"; | ||
version = "5.1.0"; | ||
|
||
buildInputs = [ postgresql ]; | ||
|
||
src = fetchFromGitHub { | ||
owner = "pgpartman"; | ||
repo = pname; | ||
rev = "refs/tags/v${version}"; | ||
sha256 = "sha256-GrVOJ5ywZMyqyDroYDLdKkXDdIJSDGhDfveO/ZvrmYs="; | ||
}; | ||
|
||
installPhase = '' | ||
mkdir -p $out/{lib,share/postgresql/extension} | ||
|
||
cp src/*${postgresql.dlSuffix} $out/lib | ||
cp updates/* $out/share/postgresql/extension | ||
cp -r sql/* $out/share/postgresql/extension | ||
cp *.control $out/share/postgresql/extension | ||
''; | ||
|
||
meta = with lib; { | ||
description = "Partition management extension for PostgreSQL"; | ||
homepage = "https://github.com/pgpartman/pg_partman"; | ||
changelog = "https://github.com/pgpartman/pg_partman/blob/v${version}/CHANGELOG.md"; | ||
maintainers = with maintainers; [ samrose ]; | ||
platforms = postgresql.meta.platforms; | ||
license = licenses.postgresql; | ||
broken = versionOlder postgresql.version "14"; | ||
}; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
{ lib, stdenv, fetchFromGitHub, postgresql }: | ||
|
||
stdenv.mkDerivation rec { | ||
pname = "pgmq"; | ||
version = "1.4.4"; | ||
buildInputs = [ postgresql ]; | ||
src = fetchFromGitHub { | ||
owner = "tembo-io"; | ||
repo = pname; | ||
rev = "v${version}"; | ||
hash = "sha256-z+8/BqIlHwlMnuIzMz6eylmYbSmhtsNt7TJf/CxbdVw="; | ||
}; | ||
|
||
buildPhase = '' | ||
cd pgmq-extension | ||
''; | ||
|
||
installPhase = '' | ||
mkdir -p $out/{lib,share/postgresql/extension} | ||
|
||
mv sql/pgmq.sql $out/share/postgresql/extension/pgmq--${version}.sql | ||
cp sql/*.sql $out/share/postgresql/extension | ||
cp *.control $out/share/postgresql/extension | ||
''; | ||
|
||
meta = with lib; { | ||
description = "A lightweight message queue. Like AWS SQS and RSMQ but on Postgres."; | ||
homepage = "https://github.com/tembo-io/pgmq"; | ||
maintainers = with maintainers; [ olirice ]; | ||
platforms = postgresql.meta.platforms; | ||
license = licenses.postgresql; | ||
}; | ||
} |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
create schema if not exists partman_test; | ||
/* | ||
Simple Time Based: 1 Partition Per Day | ||
|
||
For native partitioning, you must start with a parent table that has already been set up to be partitioned in the desired type. Currently pg_partman only supports the RANGE type of partitioning (both for time & id). You cannot turn a non-partitioned table into the parent table of a partitioned set, which can make migration a challenge. This document will show you some techniques for how to manage this later. For now, we will start with a brand new table in this example. Any non-unique indexes can also be added to the parent table in PG11+ and they will automatically be created on all child tables. | ||
*/ | ||
create table partman_test.time_taptest_table( | ||
col1 int, | ||
col2 text default 'stuff', | ||
col3 timestamptz not null default now() | ||
) | ||
partition by range (col3); | ||
create index on partman_test.time_tap (col3); | ||
ERROR: relation "partman_test.time_tap" does not exist | ||
/* | ||
Unique indexes (including primary keys) cannot be created on a natively partitioned parent unless they include the partition key. For time-based partitioning that generally doesn't work out since that would limit only a single timestamp value in each child table. pg_partman helps to manage this by using a template table to manage properties that currently are not supported by native partitioning. Note that this does not solve the issue of the constraint not being enforced across the entire partition set. See the main documentation to see which properties are managed by the template. | ||
|
||
Manually create the template table first so that when we run create_parent() the initial child tables that are created will have a primary key. If you do not supply a template table to pg_partman, it will create one for you in the schema that you installed the extension to. However properties you add to that template are only then applied to newly created child tables after that point. You will have to retroactively apply those properties manually to any child tables that already existed. | ||
*/ | ||
create table partman_test.time_taptest_table_template (like partman_test.time_taptest_table); | ||
alter table partman_test.time_taptest_table_template add primary key (col1); | ||
/* | ||
Review tables in the partman_test schema | ||
*/ | ||
select | ||
table_name, | ||
table_type | ||
from | ||
information_schema.tables | ||
where | ||
table_schema = 'partman_test'; | ||
table_name | table_type | ||
-----------------------------+------------ | ||
time_taptest_table | BASE TABLE | ||
time_taptest_table_template | BASE TABLE | ||
(2 rows) | ||
|
||
select public.create_parent( | ||
p_parent_table := 'partman_test.time_taptest_table', | ||
p_control := 'col3', | ||
p_interval := '1 day', | ||
p_template_table := 'partman_test.time_taptest_table_template' | ||
); | ||
create_parent | ||
--------------- | ||
t | ||
(1 row) | ||
|
||
/* | ||
Review tables in the partman_test schema, which should now include daily partitions | ||
*/ | ||
select | ||
-- dates in partition names are variable, so reduced to the prefix | ||
substring(table_name, 1, 21) as table_prefix, | ||
table_type | ||
from | ||
information_schema.tables | ||
where | ||
table_schema = 'partman_test' | ||
order by | ||
table_name; | ||
table_prefix | table_type | ||
-----------------------+------------ | ||
time_taptest_table | BASE TABLE | ||
time_taptest_table_de | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_p2 | BASE TABLE | ||
time_taptest_table_te | BASE TABLE | ||
(12 rows) | ||
|
||
/* | ||
Confirm maintenance proc runs without issue | ||
*/ | ||
call public.run_maintenance_proc(); | ||
/* | ||
Make sure the background worker is NOT enabled. | ||
This is intentional. We document using pg_cron to schedule calls to | ||
public.run_maintenance_proc(). That is consistent with other providers. | ||
*/ | ||
select | ||
application_name | ||
from | ||
pg_stat_activity | ||
where | ||
application_name = 'pg_partman_bgw'; | ||
application_name | ||
------------------ | ||
(0 rows) | ||
|
||
-- Cleanup | ||
drop schema partman_test cascade; | ||
NOTICE: drop cascades to 2 other objects | ||
DETAIL: drop cascades to table partman_test.time_taptest_table | ||
drop cascades to table partman_test.time_taptest_table_template |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
-- Test the standard flow | ||
select | ||
pgmq.create('Foo'); | ||
create | ||
-------- | ||
|
||
(1 row) | ||
|
||
select | ||
* | ||
from | ||
pgmq.send( | ||
queue_name:='Foo', | ||
msg:='{"foo": "bar1"}' | ||
); | ||
send | ||
------ | ||
1 | ||
(1 row) | ||
|
||
-- Test queue is not case sensitive | ||
select | ||
msg_id, | ||
read_ct, | ||
message | ||
from | ||
pgmq.send( | ||
queue_name:='foo', -- note: lowercase useage | ||
msg:='{"foo": "bar2"}', | ||
delay:=5 | ||
); | ||
ERROR: column "msg_id" does not exist | ||
LINE 2: msg_id, | ||
^ | ||
select | ||
msg_id, | ||
read_ct, | ||
message | ||
from | ||
pgmq.read( | ||
queue_name:='Foo', | ||
vt:=30, | ||
qty:=2 | ||
); | ||
msg_id | read_ct | message | ||
--------+---------+----------------- | ||
1 | 1 | {"foo": "bar1"} | ||
(1 row) | ||
|
||
select | ||
msg_id, | ||
read_ct, | ||
message | ||
from | ||
pgmq.pop('Foo'); | ||
msg_id | read_ct | message | ||
--------+---------+--------- | ||
(0 rows) | ||
|
||
-- Archive message with msg_id=2. | ||
select | ||
pgmq.archive( | ||
queue_name:='Foo', | ||
msg_id:=2 | ||
); | ||
archive | ||
--------- | ||
f | ||
(1 row) | ||
|
||
select | ||
pgmq.create('my_queue'); | ||
create | ||
-------- | ||
|
||
(1 row) | ||
|
||
select | ||
pgmq.send_batch( | ||
queue_name:='my_queue', | ||
msgs:=array['{"foo": "bar3"}','{"foo": "bar4"}','{"foo": "bar5"}']::jsonb[] | ||
); | ||
send_batch | ||
------------ | ||
1 | ||
2 | ||
3 | ||
(3 rows) | ||
|
||
select | ||
pgmq.archive( | ||
queue_name:='my_queue', | ||
msg_ids:=array[3, 4, 5] | ||
); | ||
archive | ||
--------- | ||
3 | ||
(1 row) | ||
|
||
select | ||
pgmq.delete('my_queue', 6); | ||
delete | ||
-------- | ||
f | ||
(1 row) | ||
|
||
select | ||
pgmq.drop_queue('my_queue'); | ||
drop_queue | ||
------------ | ||
t | ||
(1 row) | ||
|
||
select | ||
pgmq.create_partitioned( | ||
'my_partitioned_queue', | ||
'5 seconds', | ||
'10 seconds' | ||
); | ||
create_partitioned | ||
-------------------- | ||
|
||
(1 row) | ||
|
||
-- Make sure SQLI enabling characters are blocked | ||
select pgmq.create('F--oo'); | ||
ERROR: queue name contains invalid characters: $, ;, --, or \' | ||
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE | ||
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization | ||
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)" | ||
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM | ||
select pgmq.create('F$oo'); | ||
ERROR: queue name contains invalid characters: $, ;, --, or \' | ||
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE | ||
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization | ||
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)" | ||
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM | ||
select pgmq.create($$F'oo$$); | ||
ERROR: queue name contains invalid characters: $, ;, --, or \' | ||
CONTEXT: PL/pgSQL function pgmq.format_table_name(text,text) line 5 at RAISE | ||
PL/pgSQL function pgmq.create_non_partitioned(text) line 3 during statement block local variable initialization | ||
SQL statement "SELECT pgmq.create_non_partitioned(queue_name)" | ||
PL/pgSQL function pgmq."create"(text) line 3 at PERFORM |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
select | ||
pgmq.create('Foo'); | ||
|
||
select | ||
* | ||
from | ||
pgmq.send( | ||
queue_name:='Foo', | ||
msg:='{"foo": "bar1"}' | ||
); | ||
|
||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.