8000 Handle DROP DATABASE getting interrupted · postgres/postgres@1c38e7a · GitHub
[go: up one dir, main page]

Skip to content

Commit 1c38e7a

Browse files
committed
Handle DROP DATABASE getting interrupted
Until now, when DROP DATABASE got interrupted in the wrong moment, the removal of the pg_database row would also roll back, even though some irreversible steps have already been taken. E.g. DropDatabaseBuffers() might have thrown out dirty buffers, or files could have been unlinked. But we continued to allow connections to such a corrupted database. To fix this, mark databases invalid with an in-place update, just before starting to perform irreversible steps. As we can't add a new column in the back branches, we use pg_database.datconnlimit = -2 for this purpose. An invalid database cannot be connected to anymore, but can still be dropped. Unfortunately we can't easily add output to psql's \l to indicate that some database is invalid, it doesn't fit in any of the existing columns. Add tests verifying that a interrupted DROP DATABASE is handled correctly in the backend and in various tools. Reported-by: Evgeny Morozov <postgresql3@realityexists.net> Author: Andres Freund <andres@anarazel.de> Reviewed-by: Daniel Gustafsson <daniel@yesql.se> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/20230509004637.cgvmfwrbht7xm7p6@awork3.anarazel.de Discussion: https://postgr.es/m/20230314174521.74jl6ffqsee5mtug@awork3.anarazel.de Backpatch: 11-, bug present in all supported versions
1 parent 1386f09 commit 1c38e7a

File tree

16 files changed

+383
-27
lines changed

16 files changed

+383
-27
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2655,7 +2655,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
26552655
<entry></entry>
26562656
<entry>
26572657
Sets maximum number of concurrent connections that can be made
2658-
to this database. -1 means no limit.
2658+
to this database. -1 means no limit, -2 indicates the database is
2659+
invalid.
26592660
</entry>
26602661
</row>
26612662

src/backend/commands/dbcommands.c

Lines changed: 87 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
138138
int encoding = -1;
139139
bool dbistemplate = false;
140140
bool dballowconnections = true;
141-
int dbconnlimit = -1;
141+
int dbconnlimit = DATCONNLIMIT_UNLIMITED;
142142
int notherbackends;
143143
int npreparedxacts;
144144
createdb_failure_params fparms;
@@ -287,7 +287,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
287287
if (dconnlimit && dconnlimit->arg)
288288
{
289289
dbconnlimit = defGetInt32(dconnlimit);
290-
if (dbconnlimit < -1)
290+
if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
291291
ereport(ERROR,
292292
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293293
errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -335,6 +335,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
335335
errmsg("template database \"%s\" does not exist",
336336
dbtemplate)));
337337

338+
/*
339+
* If the source database was in the process of being dropped, we can't
340+
* use it as a template.
341+
*/
342+
if (database_is_invalid_oid(src_dboid))
343+
ereport(ERROR,
344+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
345+
errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
346+
errhint("Use DROP DATABASE to drop invalid databases.")));
347+
338348
/*
339349
* Permission check: to copy a DB that's not marked datistemplate, you
340350
* must be superuser or the owner thereof.
@@ -784,6 +794,7 @@ dropdb(const char *dbname, bool missing_ok)
784794
bool db_istemplate;
785795
Relation pgdbrel;
786796
HeapTuple tup;
797+
Form_pg_database datform;
787798
int notherbackends;
788799
int npreparedxacts;
789800
int nslots,
@@ -891,17 +902,6 @@ dropdb(const char *dbname, bool missing_ok)
891902
"There are %d subscriptions.",
892903
nsubscriptions, nsubscriptions)));
893904

894-
/*
895-
* Remove the database's tuple from pg_database.
896-
*/
897-
tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
898-
if (!HeapTupleIsValid(tup))
899-
elog(ERROR, "cache lookup failed for database %u", db_id);
900-
901-
CatalogTupleDelete(pgdbrel, &tup->t_self);
902-
903-
ReleaseSysCache(tup);
904-
905905
/*
906906
* Delete any comments or security labels associated with the database.
907907
*/
@@ -918,6 +918,32 @@ dropdb(const char *dbname, bool missing_ok)
918918
*/
919919
dropDatabaseDependencies(db_id);
920920

921+
tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
922+
if (!HeapTupleIsValid(tup))
923+
elog(ERROR, "cache lookup failed for database %u", db_id);
924+
datform = (Form_pg_database) GETSTRUCT(tup);
925+
926+
/*
927+
* Except for the deletion of the catalog row, subsequent actions are not
928+
* transactional (consider DropDatabaseBuffers() discarding modified
929+
* buffers). But we might crash or get interrupted below. To prevent
930+
* accesses to a database with invalid contents, mark the database as
931+
* invalid using an in-place update.
932+
*
933+
* We need to flush the WAL before continuing, to guarantee the
934+
* modification is durable before performing irreversible filesystem
935+
* operations.
936+
*/
937+
datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
938+
heap_inplace_update(pgdbrel, tup);
939+
XLogFlush(XactLastRecEnd);
940+
941+
/*
942+
* Also delete the tuple - transactionally. If this transaction commits,
943+
* the row will be gone, but if we fail, dropdb() can be invoked again.
944+
*/
945+
CatalogTupleDelete(pgdbrel, &tup->t_self);
946+
921947
/*
922948
* Drop db-specific replication slots.
923949
*/
@@ -1401,12 +1427,13 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
14011427
Oid dboid;
14021428
HeapTuple tuple,
14031429
newtuple;
1430+
Form_pg_database datform;
14041431
ScanKeyData scankey;
14051432
SysScanDesc scan;
14061433
ListCell *option;
14071434
bool dbistemplate = false;
14081435
bool dballowconnections = true;
1409-
int dbconnlimit = -1;
1436+
int dbconnlimit = DATCONNLIMIT_UNLIMITED;
14101437
DefElem *distemplate = NULL;
14111438
DefElem *dallowconnections = NULL;
14121439
DefElem *dconnlimit = NULL;
@@ -1489,7 +1516,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
14891516
if (dconnlimit && dconnlimit->arg)
14901517
{
14911518
dbconnlimit = defGetInt32(dconnlimit);
1492-
if (dbconnlimit < -1)
1519+
if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
14931520
ereport(ERROR,
14941521
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
14951522
errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -1513,8 +1540,17 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
15131540
(errcode(ERRCODE_UNDEFINED_DATABASE),
15141541
errmsg("database \"%s\" does not exist", stmt->dbname)));
15151542

1543+
datform = (Form_pg_database) GETSTRUCT(tuple);
15161544
dboid = HeapTupleGetOid(tuple);
15171545

1546+
if (database_is_invalid_form(datform))
1547+
{
1548+
ereport(FATAL,
1549+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1550+
errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
1551+
errhint("Use DROP DATABASE to drop invalid databases.")));
1552+
}
1553+
15181554
if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
15191555
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
15201556
stmt->dbname);
@@ -2071,6 +2107,42 @@ get_database_name(Oid dbid)
20712107
return result;
20722108
}
20732109

2110+
2111+
/*
2112+
* While dropping a database the pg_database row is marked invalid, but the
2113+
* catalog contents still exist. Connections to such a database are not
2114+
* allowed.
2115+
*/
2116+
bool
2117+
database_is_invalid_form(Form_pg_database datform)
2118+
{
2119+
return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
2120+
}
2121+
2122+
2123+
/*
2124+
* Convenience wrapper around database_is_invalid_form()
2125+
*/
2126+
bool
2127+
database_is_invalid_oid(Oid dboid)
2128+
{
2129+
HeapTuple dbtup;
2130+
Form_pg_database dbform;
2131+
bool invalid;
2132+
2133+
dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
2134+
if (!HeapTupleIsValid(dbtup))
2135+
elog(ERROR, "cache lookup failed for database %u", dboid);
2136+
dbform = (Form_pg_database) GETSTRUCT(dbtup);
2137+
2138+
invalid = database_is_invalid_form(dbform);
2139+
2140+
ReleaseSysCache(dbtup);
2141+
2142+
return invalid;
2143+
}
2144+
2145+
20742146
/*
20752147
* recovery_create_dbdir()
20762148
*

src/backend/commands/vacuum.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,20 @@ vac_truncate_clog(TransactionId frozenXID,
12271227
Assert(TransactionIdIsNormal(datfrozenxid));
12281228
Assert(MultiXactIdIsValid(datminmxid));
12291229

1230+
/*
1231+
* If database is in the process of getting dropped, or has been
1232+
* interrupted while doing so, no connections to it are possible
1233+
* anymore. Therefore we don't need to take it into account here.
1234+
* Which is good, because it can't be processed by autovacuum either.
1235+
*/
1236+
if (database_is_invalid_form((Form_pg_database) dbform))
1237+
{
1238+
elog(DEBUG2,
1239+
"skipping invalid database \"%s\" while computing relfrozenxid",
1240+
NameStr(dbform->datname));
1241+
continue;
1242+
}
1243+
12301244
/*
12311245
* If things are working properly, no database should have a
12321246
* datfrozenxid or datminmxid that is "in the future". However, such

src/backend/postmaster/autovacuum.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1907,6 +1907,18 @@ get_database_list(void)
19071907
avw_dbase *avdb;
19081908
MemoryContext oldcxt;
19091909

1910+
/*
1911+
* If database has partially been dropped, we can't, nor need to,
1912+
* vacuum it.
1913+
*/
1914+
if (database_is_invalid_form(pgdatabase))
1915+
{
1916+
elog(DEBUG2,
1917+
"autovacuum: skipping invalid database \"%s\"",
1918+
NameStr(pgdatabase->datname));
1919+
continue;
1920+
}
1921+
19101922
/*
19111923
* Allocate our results in the caller's context, not the
19121924
* transaction's. We do this inside the loop, and restore the original

src/backend/utils/init/postinit.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
950950
if (!bootstrap)
951951
{
952952
HeapTuple tuple;
953+
Form_pg_database datform;
953954

954955
tuple = GetDatabaseTuple(dbname);
955956
if (!HeapTupleIsValid(tuple) ||
@@ -959,6 +960,15 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
959960
(errcode(ERRCODE_UNDEFINED_DATABASE),
960961
errmsg("database \"%s\" does not exist", dbname),
961962
errdetail("It seems to have just been dropped or renamed.")));
963+
964+
datform = (Form_pg_database) GETSTRUCT(tuple);
965+
if (database_is_invalid_form(datform))
966+
{
967+
ereport(FATAL,
968+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
969+
errmsg("cannot connect to invalid database \"%s\"", dbname),
970+
errhint("Use DROP DATABASE to drop invalid databases.")));
971+
}
962972
}
963973

964974
/*

src/bin/pg_dump/pg_dumpall.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1287,7 +1287,7 @@ dropDBs(PGconn *conn)
12871287
res = executeQuery(conn,
12881288
"SELECT datname "
12891289
"FROM pg_database d "
1290-
"WHERE datallowconn "
1290+
"WHERE datallowconn AND datconnlimit != -2 "
12911291
"ORDER BY datname");
12921292

12931293
if (PQntuples(res) > 0)
@@ -1389,7 +1389,7 @@ dumpDatabases(PGconn *conn)
13891389
res = executeQuery(conn,
13901390
"SELECT datname "
13911391
"FROM pg_database d "
1392-
"WHERE datallowconn "
1392+
"WHERE datallowconn AND datconnlimit != -2 "
13931393
"ORDER BY (datname <> 'template1'), datname");
13941394

13951395
for (i = 0; i < PQntuples(res); i++)

src/bin/pg_dump/t/002_pg_dump.pl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,17 @@
14161416
},
14171417
},
14181418

1419+
'CREATE DATABASE regression_invalid...' => {
1420+
create_order => 1,
1421+
create_sql => q(
1422+
CREATE DATABASE regression_invalid;
1423+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid'),
1424+
regexp => qr/^CREATE DATABASE regression_invalid/m,
1425+
not_like => {
1426+
pg_dumpall_dbprivs => 1,
1427+
},
1428+
},
1429+
14191430
'CREATE ACCESS METHOD gist2' => {
14201431
create_order => 52,
14211432
create_sql =>
@@ -3387,7 +3398,7 @@
33873398
33883399
# Start with number of command_fails_like()*2 tests below (each
33893400
# command_fails_like is actually 2 tests)
3390-
my $num_tests = 12;
3401+
my $num_tests = 14;
33913402
33923403
foreach my $run (sort keys %pgdump_runs)
33933404
{
@@ -3516,6 +3527,14 @@
35163527
'pg_dump: [archiver (db)] connection to database "qqq" failed: FATAL: database "qqq" does not exist'
35173528
);
35183529
3530+
#########################################
3531+
# Test connecting to an invalid database
3532+
3533+
command_fails_like(
3534+
[ 'pg_dump', '-p', "$port", '-d', 'regression_invalid' ],
3535+
qr/pg_dump: .* connection to database .* failed: FATAL: cannot connect to invalid database "regression_invalid"/,
3536+
'connecting to an invalid database');
3537+
35193538
#########################################
35203539
# Test connecting with an unprivileged user
35213540

src/bin/scripts/clusterdb.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ cluster_all_databases(ConnParams *cparams, const char *progname,
236236
int i;
237237

238238
conn = connectMaintenanceDatabase(cparams, progname, echo);
239-
result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
239+
result = executeQuery(conn,
240+
"SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
241+
progname, echo);
240242
PQfinish(conn);
241243

242244
for (i = 0; i < PQntuples(result); i++)

src/bin/scripts/reindexdb.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,9 @@ reindex_all_databases(ConnParams *cparams,
341341
int i;
342342

343343
conn = connectMaintenanceDatabase(cparams, progname, echo);
344-
result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
344+
result = executeQuery(conn,
345+
"SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
346+
progname, echo);
345347
PQfinish(conn);
346348

347349
for (i = 0; i < PQntuples(result); i++)

src/bin/scripts/t/011_clusterdb_all.pl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 2;
6+
use Test::More tests => 4;
77

88
my $node = get_new_node('main');
99
$node->init;
@@ -17,3 +17,16 @@
1717
[ 'clusterdb', '-a' ],
1818
qr/statement: CLUSTER.*statement: CLUSTER/s,
1919
'cluster all databases');
20+
21+
$node->safe_psql(
22+
'postgres', q(
23+
CREATE DATABASE regression_invalid;
24+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
25+
));
26+
$node->command_ok([ 'clusterdb', '-a' ],
27+
'invalid database not targeted by clusterdb -a');
28+
29+
# Doesn't quite belong here, but don't want to waste time by creating an
30+
# invalid database in 010_clusterdb.pl as well.
31+
$node->command_fails([ 'clusterdb', '-d', 'regression_invalid'],
32+
'clusterdb cannot target invalid database');

src/bin/scripts/t/050_dropdb.pl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use PostgresNode;
55
use TestLib;
6-
use Test::More tests => 11;
6+
use Test::More tests => 12;
77

88
program_help_ok('dropdb');
99
program_version_ok('dropdb');
@@ -21,3 +21,12 @@
2121

2222
$node->command_fails([ 'dropdb', 'nonexistent' ],
2323
'fails with nonexistent database');
24+
25+
# check that invalid database can be dropped with dropdb
26+
$node->safe_psql(
27+
'postgres', q(
28+
CREATE DATABASE regression_invalid;
29+
UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
30+
));
31+
$node->command_ok([ 'dropdb', 'regression_invalid' ],
32+
'invalid database can be dropped');

0 commit comments

Comments
 (0)
0