8000
We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 4d33a7f commit ff539daCopy full SHA for ff539da
doc/src/sgml/func.sgml
@@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
18876
<entry> 8000
18877
Drops the physical or logical replication slot
18878
named <parameter>slot_name</parameter>. Same as replication protocol
18879
- command <literal>DROP_REPLICATION_SLOT</>.
+ command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
18880
+ be called when connected to the same database the slot was created on.
18881
</entry>
18882
</row>
18883
doc/src/sgml/protocol.sgml
@@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
2034
<para>
2035
Drops a replication slot, freeing any reserved server-side resources. If
2036
the slot is currently in use by an active connection, this command fails.
2037
+ If the slot is a logical slot that was created in a database other than
2038
+ the database the walsender is connected to, this command fails.
2039
</para>
2040
<variablelist>
2041
<varlistentry>
src/backend/commands/dbcommands.c
@@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
845
errmsg("cannot drop the currently open database")));
846
847
/*
848
- * Check whether there are, possibly unconnected, logical slots that refer
849
- * to the to-be-dropped database. The database lock we are holding
850
- * prevents the creation of new slots using the database.
+ * Check whether there are active logical slots that refer to the
+ * to-be-dropped database. The database lock we are holding prevents the
+ * creation of new slots using the database or existing slots becoming
851
+ * active.
852
*/
- if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
853
+ (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
854
+ if (nslots_active)
855
+ {
856
ereport(ERROR,
857
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("database \"%s\" is used by a logical replication slot",
858
+ errmsg("database \"%s\" is used by an active logical replication slot",
859
dbname),
- errdetail_plural("There is %d slot, %d of them active.",
- "There are %d slots, %d of them active.",
- nslots,
860
- nslots, nslots_active)));
+ errdetail_plural("There is %d active slot",
861
+ "There are %d active slots",
862
+ nslots_active, nslots_active)));
863
+ }
864
865
866
* Check for other backends in the target database. (Because we hold the
@@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok)
914
917
915
918
dropDatabaseDependencies(db_id);
916
919
920
+ /*
921
+ * Drop db-specific replication slots.
922
+ */
923
+ ReplicationSlotsDropDBSlots(db_id);
924
+
925
926
* Drop pages for this database that are in the shared buffer cache. This
927
* is important to ensure that no remaining backend tries to write out a
@@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
2124
2132
* InitPostgres() cannot fully re-execute concurrently. This
2125
2133
* avoids backends re-connecting automatically to same database,
2126
2134
* which can happen in some cases.
2135
+ *
2136
+ * This will lock out walsenders trying to connect to db-specific
2137
+ * slots for logical decoding too, so it's safe for us to drop slots.
2127
2138
2128
2139
LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2129
2140
ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2130
2141
}
2131
2142
2143
+ /* Drop any database-specific replication slots */
2144
+ ReplicationSlotsDropDBSlots(xlrec->db_id);
2145
2146
/* Drop pages for this database that are in the shared buffer cache */
2147
DropDatabaseBuffers(xlrec->db_id);
2148
src/backend/replication/slot.c
@@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
796
return false;
797
798
799
+/*
800
+ * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
801
+ * passed database oid. The caller should hold an exclusive lock on the
802
+ * pg_database oid for the database to prevent creation of new slots on the db
803
+ * or replay from existing slots.
804
805
+ * This routine isn't as efficient as it could be - but we don't drop databases
806
+ * often, especially databases with lots of slots.
807
808
+ * Another session that concurrently acquires an existing slot on the target DB
809
+ * (most likely to drop it) may cause this function to ERROR. If that happens
810
+ * it may have dropped some but not all slots.
811
812
+void
813
+ReplicationSlotsDropDBSlots(Oid dboid)
814
+{
815
+ int i;
816
817
+ if (max_replication_slots <= 0)
818
+ return;
819
820
+restart:
821
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
822
+ for (i = 0; i < max_replication_slots; i++)
823
824
+ ReplicationSlot *s;
825
+ NameData slotname;
826
+ int active_pid;
827
828
+ s = &ReplicationSlotCtl->replication_slots[i];
829
830
+ /* cannot change while ReplicationSlotCtlLock is held */
831
+ if (!s->in_use)
832
+ continue;
833
834
+ /* only logical slots are database specific, skip */
835
+ if (!SlotIsLogical(s))
836
837
838
+ /* not our database, skip */
839
+ if (s->data.database != dboid)
840
841
842
+ /* Claim the slot, as if ReplicationSlotAcquire()ing. */
843
+ SpinLockAcquire(&s->mutex);
844
+ strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
+ NameStr(slotname)[NAMEDATALEN-1] = '\0';
+ active_pid = s->active_pid;
+ if (active_pid == 0)
+ MyReplicationSlot = s;
+ s->active_pid = MyProcPid;
+ SpinLockRelease(&s->mutex);
+ * We might fail here if the slot was active. Even though we hold an
+ * exclusive lock on the database object a logical slot for that DB can
+ * still be active if it's being dropped by a backend connected to
+ * another DB or is otherwise acquired.
+ * It's an unlikely race that'll only arise from concurrent user action,
+ * so we'll just bail out.
+ if (active_pid)
+ elog(ERROR, "replication slot %s is in use by pid %d",
+ NameStr(slotname), active_pid);
867
868
+ * To avoid largely duplicating ReplicationSlotDropAcquired() or
869
+ * complicating it with already_locked flags for ProcArrayLock,
870
+ * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
871
+ * just release our ReplicationSlotControlLock to drop the slot.
872
873
+ * For safety we'll restart our scan from the beginning each
874
+ * time we release the lock.
875
876
+ LWLockRelease(ReplicationSlotControlLock);
877
+ ReplicationSlotDropAcquired();
878
+ goto restart;
879
880
881
882
+ /* recompute limits once after all slots are dropped */
883
+ ReplicationSlotsComputeRequiredXmin(false);
884
+ ReplicationSlotsComputeRequiredLSN();
885
+}
886
887
888
889
* Check whether the server's configuration supports using replication
src/include/replication/slot.h
@@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
177
extern void ReplicationSlotsComputeRequiredLSN(void);
178
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
179
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
180
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
181
182
extern void StartupReplicationSlots(void);
183
extern void CheckPointReplicationSlots(void);
src/test/recovery/t/006_logical_decoding.pl
@@ -7,7 +7,7 @@
7
use warnings;
8
use PostgresNode;
9
use TestLib;
10
-use Test::More tests => 5;
+use Test::More tests => 16;
11
12
# Initialize master node
13
my $node_master = get_new_node('master');
@@ -54,7 +54,7 @@
54
is($stdout_sql, $expected, 'got expected output from SQL decoding session');
55
56
my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
57
-diag "waiting to replay $endpos";
+print "waiting to replay $endpos\n";
58
59
my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
60
chomp($stdout_recv);
@@ -64,5 +64,41 @@
64
65
is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
66
67
+$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
68
69
+is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
70
+ 'replaying logical slot from another database fails');
71
72
+$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
73
74
+# make sure you can't drop a slot while active
75
+my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
76
+$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
77
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
78
+ 'dropping a DB with inactive logical slots fails');
79
+$pg_recvlogical->kill_kill;
80
+is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
81
+ 'logical slot still exists');
82
83
+$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
84
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
85
+ 'dropping a DB with inactive logical slots succeeds');
86
87
+ 'logical slot was actually dropped with DB');
88
89
+# Restarting a node with wal_level = logical that has existing
90
+# slots must succeed, but decoding from those slots must fail.
91
+$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
92
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
93
+$node_master->restart;
94
+is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
95
+isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
96
+ 'restored slot catalog_xmin is nonzero');
97
+is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
98
+ 'reading from slot with wal_level < logical fails');
99
+is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
100
+ 'can drop logical slot while wal_level = replica');
101
+is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
102
103
# done with the node
104
$node_master->stop;
src/test/recovery/t/010_logical_decoding_timelines.pl
@@ -15,12 +15,15 @@
15
# This module uses the first approach to show that timeline following
16
# on a logical slot works.
17
#
18
+# (For convenience, it also tests some recovery-related operations
19
+# on logical slots).
20
+#
21
use strict;
22
23
24
25
-use Test::More tests => 10;
26
+use Test::More tests => 13;
27
use RecursiveCopy;
28
use File::Copy;
29
use IPC::Run ();
@@ -50,6 +53,16 @@
50
53
$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
51
$node_master->safe_psql('postgres',
52
"INSERT INTO decoding(blah) VALUES ('beforebb');");
+# We also want to verify that DROP DATABASE on a standby with a logical
+# slot works. This isn't strictly related to timeline following, but
+# the only way to get a logical slot on a standby right now is to use
+# the same physical copy trick, so:
61
+$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
62
+$node_master->safe_psql('dropme',
63
+"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
+);
$node_master->safe_psql('postgres', 'CHECKPOINT;');
my $backup_name = 'b1';
@@ -68,6 +81,17 @@
$node_replica->start;
+# If we drop 'dropme' on the master, the standby should drop the
+# db and associated slot.
+is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
+ 'dropped DB with logical slot OK on master');
+$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
+is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
+ 'dropped DB dropme on standby');
+is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
+ 'logical slot was actually dropped on standby');
+# Back to testing failover...
"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
);
@@ -99,10 +123,13 @@
123
cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
124
'xmin on physical slot must not be lower than catalog_xmin');
125
126
+$node_master->safe_psql('postgres', 'CHECKPOINT');
127
128
# Boom, crash
129
$node_master->stop('immediate');
130
105
131
$node_replica->promote;
132
+print "waiting for replica to come up\n";
106
133
$node_replica->poll_query_until('postgres',
107
134
"SELECT NOT pg_is_in_recovery();");
108
135
@@ -154,5 +181,4 @@ BEGIN
154
chomp($stdout);
155
is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
156
157
-# We don't need the standby anymore
158
184
$node_replica->teardown_node();