8000 Cleanup slots during drop database · postgrespro/postgres@ff539da · GitHub
[go: up one dir, main page]

Skip to content

Commit ff539da

Browse files
Cleanup slots during drop database
Automatically drop all logical replication slots associated with a database when the database is dropped. Previously we threw an ERROR if a slot existed. Now we throw ERROR only if a slot is active in the database being dropped. Craig Ringer
1 parent 4d33a7f commit ff539da

File tree

7 files changed

+182
-14
lines changed
  • include/replication
  • test/recovery/t
  • 7 files changed

    +182
    -14
    lines changed

    doc/src/sgml/func.sgml

    Lines changed: 2 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -18876,7 +18876,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
    1887618876
    <entry>
    1887718877
    Drops the physical or logical replication slot
    1887818878
    named <parameter>slot_name</parameter>. Same as replication protocol
    18879-
    command <literal>DROP_REPLICATION_SLOT</>.
    18879+
    command <literal>DROP_REPLICATION_SLOT</>. For logical slots, this must
    18880+
    be called when connected to the same database the slot was created on.
    1888018881
    </entry>
    1888118882
    </row>
    1888218883

    doc/src/sgml/protocol.sgml

    Lines changed: 2 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -2034,6 +2034,8 @@ The commands accepted in walsender mode are:
    20342034
    <para>
    20352035
    Drops a replication slot, freeing any reserved server-side resources. If
    20362036
    the slot is currently in use by an active connection, this command fails.
    2037+
    If the slot is a logical slot that was created in a database other than
    2038+
    the database the walsender is connected to, this command fails.
    20372039
    </para>
    20382040
    <variablelist>
    20392041
    <varlistentry>

    src/backend/commands/dbcommands.c

    Lines changed: 23 additions & 9 deletions
    Original file line numberDiff line numberDiff line change
    @@ -845,19 +845,22 @@ dropdb(const char *dbname, bool missing_ok)
    845845
    errmsg("cannot drop the currently open database")));
    846846

    847847
    /*
    848-
    * Check whether there are, possibly unconnected, logical slots that refer
    849-
    * to the to-be-dropped database. The database lock we are holding
    850-
    * prevents the creation of new slots using the database.
    848+
    * Check whether there are active logical slots that refer to the
    849+
    * to-be-dropped database. The database lock we are holding prevents the
    850+
    * creation of new slots using the database or existing slots becoming
    851+
    * active.
    851852
    */
    852-
    if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
    853+
    (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
    854+
    if (nslots_active)
    855+
    {
    853856
    ereport(ERROR,
    854857
    (errcode(ERRCODE_OBJECT_IN_USE),
    855-
    errmsg("database \"%s\" is used by a logical replication slot",
    858+
    errmsg("database \"%s\" is used by an active logical replication slot",
    856859
    dbname),
    857-
    errdetail_plural("There is %d slot, %d of them active.",
    858-
    "There are %d slots, %d of them active.",
    859-
    nslots,
    860-
    nslots, nslots_active)));
    860+
    errdetail_plural("There is %d active slot",
    861+
    "There are %d active slots",
    862+
    nslots_active, nslots_active)));
    863+
    }
    861864

    862865
    /*
    863866
    * Check for other backends in the target database. (Because we hold the
    @@ -914,6 +917,11 @@ dropdb(const char *dbname, bool missing_ok)
    914917
    */
    915918
    dropDatabaseDependencies(db_id);
    916919

    920+
    /*
    921+
    * Drop db-specific replication slots.
    922+
    */
    923+
    ReplicationSlotsDropDBSlots(db_id);
    924+
    917925
    /*
    918926
    * Drop pages for this database that are in the shared buffer cache. This
    919927
    * is important to ensure that no remaining backend tries to write out a
    @@ -2124,11 +2132,17 @@ dbase_redo(XLogReaderState *record)
    21242132
    * InitPostgres() cannot fully re-execute concurrently. This
    21252133
    * avoids backends re-connecting automatically to same database,
    21262134
    * which can happen in some cases.
    2135+
    *
    2136+
    * This will lock out walsenders trying to connect to db-specific
    2137+
    * slots for logical decoding too, so it's safe for us to drop slots.
    21272138
    */
    21282139
    LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
    21292140
    ResolveRecoveryConflictWithDatabase(xlrec->db_id);
    21302141
    }
    21312142

    2143+
    /* Drop any database-specific replication slots */
    2144+
    ReplicationSlotsDropDBSlots(xlrec->db_id);
    2145+
    21322146
    /* Drop pages for this database that are in the shared buffer cache */
    21332147
    DropDatabaseBuffers(xlrec->db_id);
    21342148

    src/backend/replication/slot.c

    Lines changed: 88 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -796,6 +796,94 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    796796
    return false;
    797797
    }
    798798

    799+
    /*
    800+
    * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    801+
    * passed database oid. The caller should hold an exclusive lock on the
    802+
    * pg_database oid for the database to prevent creation of new slots on the db
    803+
    * or replay from existing slots.
    804+
    *
    805+
    * This routine isn't as efficient as it could be - but we don't drop databases
    806+
    * often, especially databases with lots of slots.
    807+
    *
    808+
    * Another session that concurrently acquires an existing slot on the target DB
    809+
    * (most likely to drop it) may cause this function to ERROR. If that happens
    810+
    * it may have dropped some but not all slots.
    811+
    */
    812+
    void
    813+
    ReplicationSlotsDropDBSlots(Oid dboid)
    814+
    {
    815+
    int i;
    816+
    817+
    if (max_replication_slots <= 0)
    818+
    return;
    819+
    820+
    restart:
    821+
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    822+
    for (i = 0; i < max_replication_slots; i++)
    823+
    {
    824+
    ReplicationSlot *s;
    825+
    NameData slotname;
    826+
    int active_pid;
    827+
    828+
    s = &ReplicationSlotCtl->replication_slots[i];
    829+
    830+
    /* cannot change while ReplicationSlotCtlLock is held */
    831+
    if (!s->in_use)
    832+
    continue;
    833+
    834+
    /* only logical slots are database specific, skip */
    835+
    if (!SlotIsLogical(s))
    836+
    continue;
    837+
    838+
    /* not our database, skip */
    839+
    if (s->data.database != dboid)
    840+
    continue;
    841+
    842+
    /* Claim the slot, as if ReplicationSlotAcquire()ing. */
    843+
    SpinLockAcquire(&s->mutex);
    844+
    strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
    845+
    NameStr(slotname)[NAMEDATALEN-1] = '\0';
    846+
    active_pid = s->active_pid;
    847+
    if (active_pid == 0)
    848+
    {
    849+
    MyReplicationSlot = s;
    850+
    s->active_pid = MyProcPid;
    851+
    }
    852+
    SpinLockRelease(&s->mutex);
    853+
    854+
    /*
    855+
    * We might fail here if the slot was active. Even though we hold an
    856+
    * exclusive lock on the database object a logical slot for that DB can
    857+
    * still be active if it's being dropped by a backend connected to
    858+
    * another DB or is otherwise acquired.
    859+
    *
    860+
    * It's an unlikely race that'll only arise from concurrent user action,
    861+
    * so we'll just bail out.
    862+
    */
    863+
    if (active_pid)
    864+
    elog(ERROR, "replication slot %s is in use by pid %d",
    865+
    NameStr(slotname), active_pid);
    866+
    867+
    /*
    868+
    * To avoid largely duplicating ReplicationSlotDropAcquired() or
    869+
    * complicating it with already_locked flags for ProcArrayLock,
    870+
    * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
    871+
    * just release our ReplicationSlotControlLock to drop the slot.
    872+
    *
    873+
    * For safety we'll restart our scan from the beginning each
    874+
    * time we release the lock.
    875+
    */
    876+
    LWLockRelease(ReplicationSlotControlLock);
    877+
    ReplicationSlotDropAcquired();
    878+
    goto restart;
    879+
    }
    880+
    LWLockRelease(ReplicationSlotControlLock);
    881+
    882+
    /* recompute limits once after all slots are dropped */
    883+
    ReplicationSlotsComputeRequiredXmin(false);
    884+
    ReplicationSlotsComputeRequiredLSN();
    885+
    }
    886+
    799887

    800888
    /*
    801889
    * Check whether the server's configuration supports using replication

    src/include/replication/slot.h

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -177,6 +177,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
    177177
    extern void ReplicationSlotsComputeRequiredLSN(void);
    178178
    extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
    179179
    extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
    180+
    extern void ReplicationSlotsDropDBSlots(Oid dboid);
    180181

    181182
    extern void StartupReplicationSlots(void);
    182183
    extern void CheckPointReplicationSlots(void);

    src/test/recovery/t/006_logical_decoding.pl

    Lines changed: 38 additions & 2 deletions
    < 10000 tr class="diff-line-row">
    Original file line numberDiff line numberDiff line change
    @@ -7,7 +7,7 @@
    77
    use warnings;
    88
    use PostgresNode;
    99
    use TestLib;
    10-
    use Test::More tests => 5;
    10+
    use Test::More tests => 16;
    1111

    1212
    # Initialize master node
    1313
    my $node_master = get_new_node('master');
    @@ -54,7 +54,7 @@
    5454
    is($stdout_sql, $expected, 'got expected output from SQL decoding session');
    5555

    5656
    my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
    57-
    diag "waiting to replay $endpos";
    57+
    print "waiting to replay $endpos\n";
    5858

    5959
    my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
    6060
    chomp($stdout_recv);
    @@ -64,5 +64,41 @@
    6464
    chomp($stdout_recv);
    6565
    is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
    6666

    67+
    $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
    68+
    69+
    is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
    70+
    'replaying logical slot from another database fails');
    71+
    72+
    $node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
    73+
    74+
    # make sure you can't drop a slot while active
    75+
    my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
    76+
    $node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
    77+
    is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
    78+
    'dropping a DB with inactive logical slots fails');
    79+
    $pg_recvlogical->kill_kill;
    80+
    is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
    81+
    'logical slot still exists');
    82+
    83+
    $node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
    84+
    is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
    85+
    'dropping a DB with inactive logical slots succeeds');
    86+
    is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
    87+
    'logical slot was actually dropped with DB');
    88+
    89+
    # Restarting a node with wal_level = logical that has existing
    90+
    # slots must succeed, but decoding from those slots must fail.
    91+
    $node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
    92+
    is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
    93+
    $node_master->restart;
    94+
    is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
    95+
    isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
    96+
    'restored slot catalog_xmin is nonzero');
    97+
    is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
    98+
    'reading from slot with wal_level < logical fails');
    99+
    is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
    100+
    'can drop logical slot while wal_level = replica');
    101+
    is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
    102+
    67103
    # done with the node
    68104
    $node_master->stop;

    src/test/recovery/t/010_logical_decoding_timelines.pl

    Lines changed: 28 additions & 2 deletions
    Original file line numberDiff line numberDiff line change
    @@ -15,12 +15,15 @@
    1515
    # This module uses the first approach to show that timeline following
    1616
    # on a logical slot works.
    1717
    #
    18+
    # (For convenience, it also tests some recovery-related operations
    19+
    # on logical slots).
    20+
    #
    1821
    use strict;
    1922
    use warnings;
    2023

    2124
    use PostgresNode;
    2225
    use TestLib;
    23-
    use Test::More tests => 10;
    26+
    use Test::More tests => 13;
    2427
    use RecursiveCopy;
    2528
    use File::Copy;
    2629
    use IPC::Run ();
    @@ -50,6 +53,16 @@
    5053
    $node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
    5154
    $node_master->safe_psql('postgres',
    5255
    "INSERT INTO decoding(blah) VALUES ('beforebb');");
    56+
    57+
    # We also want to verify that DROP DATABASE on a standby with a logical
    58+
    # slot works. This isn't strictly related to timeline following, but
    59+
    # the only way to get a logical slot on a standby right now is to use
    60+
    # the same physical copy trick, so:
    61+
    $node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
    62+
    $node_master->safe_psql('dropme',
    63+
    "SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
    64+
    );
    65+
    5366
    $node_master->safe_psql('postgres', 'CHECKPOINT;');
    5467

    5568
    my $backup_name = 'b1';
    @@ -68,6 +81,17 @@
    6881

    6982
    $node_replica->start;
    7083

    84+
    # If we drop 'dropme' on the master, the standby should drop the
    85+
    # db and associated slot.
    86+
    is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
    87+
    'dropped DB with logical slot OK on master');
    88+
    $node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
    89+
    is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
    90+
    'dropped DB dropme on standby');
    91+
    is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
    92+
    'logical slot was actually dropped on standby');
    93+
    94+
    # Back to testing failover...
    7195
    $node_master->safe_psql('postgres',
    7296
    "SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
    7397
    );
    @@ -99,10 +123,13 @@
    99123
    cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
    100124
    'xmin on physical slot must not be lower than catalog_xmin');
    101125

    126+
    $node_master->safe_psql('postgres', 'CHECKPOINT');
    127+
    102128
    # Boom, crash
    103129
    $node_master->stop('immediate');
    104130

    105131
    $node_replica->promote;
    132+
    print "waiting for replica to come up\n";
    106133
    $node_replica->poll_query_until('postgres',
    107134
    "SELECT NOT pg_is_in_recovery();");
    108135

    @@ -154,5 +181,4 @@ BEGIN
    154181
    chomp($stdout);
    155182
    is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
    156183

    157-
    # We don't need the standby anymore
    158184
    $node_replica->teardown_node();

    0 commit comments

    Comments
     (0)
    0