8000 Allow to enable failover property for replication slots via SQL API. · postgrespro/postgres@c393308 · GitHub
[go: up one dir, main page]

Skip to content

Commit c393308

Browse files
author
Amit Kapila
committed
Allow to enable failover property for replication slots via SQL API.
This commit adds the failover property to the replication slot. The failover property indicates whether the slot will be synced to the standby servers, enabling the resumption of corresponding logical replication after failover. But note that this commit does not yet include the capability to sync the replication slot; the subsequent commits will add that capability. A new optional parameter 'failover' is added to the pg_create_logical_replication_slot() function. We will also enable to set 'failover' option for slots via the subscription commands in the subsequent commits. The value of the 'failover' flag is displayed as part of pg_replication_slots view. Author: Hou Zhijie, Shveta Malik, Ajin Cherian Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
1 parent 86232a4 commit c393308

File tree

16 files changed

+141
-25
lines changed

16 files changed

+141
-25
lines changed

contrib/test_decoding/expected/slot.out

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp');
406406

407407
(1 row)
408408

409+
-- Test failover option of slots.
410+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
411+
?column?
412+
----------
413+
init
414+
(1 row)
415+
416+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
417+
?column?
418+
----------
419+
init
420+
(1 row)
421+
422+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
423+
?column?
424+
----------
425+
init
426+
(1 row)
427+
428+
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
429+
?column?
430+
----------
431+
init
432+
(1 row)
433+
434+
SELECT slot_name, slot_type, failover FROM pg_replication_slots;
435+
slot_name | slot_type | failover
436+
-----------------------+-----------+----------
437+
failover_true_slot | logical | t
438+
failover_false_slot | logical | f
439+
failover_default_slot | logical | f
440+
physical_slot | physical | f
441+
(4 rows)
442+
443+
SELECT pg_drop_replication_slot('failover_true_slot');
444+
pg_drop_replication_slot
445+
--------------------------
446+
447+
(1 row)
448+
449+
SELECT pg_drop_replication_slot('failover_false_slot');
450+
pg_drop_replication_slot
451+
--------------------------
452+
453+
(1 row)
454+
455+
SELECT pg_drop_replication_slot('failover_default_slot');
456+
pg_drop_replication_slot
457+
--------------------------
458+
459+
(1 row)
460+
461+
SELECT pg_drop_replication_slot('physical_slot');
462+
pg_drop_replication_slot
463+
--------------------------
464+
465+
(1 row)
466+

contrib/test_decoding/sql/slot.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name;
176176
SELECT pg_drop_replication_slot('orig_slot2');
177177
SELECT pg_drop_replication_slot('copied_slot2_no_change');
178178
SELECT pg_drop_replication_slot('copied_slot2_notemp');
179+
180+
-- Test failover option of slots.
181+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
182+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
183+
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
184+
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
185+
186+
SELECT slot_name, slot_type, failover FROM pg_replication_slots;
187+
188+
SELECT pg_drop_replication_slot('failover_true_slot');
189+
SELECT pg_drop_replication_slot('failover_false_slot');
190+
SELECT pg_drop_replication_slot('failover_default_slot');
191+
SELECT pg_drop_replication_slot('physical_slot');

doc/src/sgml/func.sgml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27707,7 +27707,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2770727707
<indexterm>
2770827708
<primary>pg_create_logical_replication_slot</primary>
2770927709
</indexterm>
27710-
<function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type> </optional> )
27710+
<function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type> </optional> )
2771127711
<returnvalue>record</returnvalue>
2771227712
( <parameter>slot_name</parameter> <type>name</type>,
2771327713
<parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -27722,8 +27722,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2772227722
released upon any error. The optional fourth parameter,
2772327723
<parameter>twophase</parameter>, when set to true, specifies
2772427724
that the decoding of prepared transactions is enabled for this
27725-
slot. A call to this function has the same effect as the replication
27726-
protocol command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
27725+
slot. The optional fifth parameter,
27726+
<parameter>failover</parameter>, when set to true,
27727+
specifies that this slot is enabled to be synced to the
27728+
standbys so that logical replication can be resumed after
27729+
failover. A call to this function has the same effect as
27730+
the replication protocol command
27731+
<literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
2772727732
</para></entry>
2772827733
</row>
2772927734

doc/src/sgml/system-views.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2555,6 +2555,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
25552555
</itemizedlist>
25562556
</para></entry>
25572557
</row>
2558+
2559+
<row>
2560+
<entry role="catalog_table_entry"><para role="column_definition">
2561+
<structfield>failover</structfield> <type>bool</type>
2562+
</para>
2563+
<para>
2564+
True if this is a logical slot enabled to be synced to the standbys.
2565+
Always false for physical slots.
2566+
</para></entry>
2567+
</row>
25582568
</tbody>
25592569
</tgroup>
25602570
</table>

src/backend/catalog/system_functions.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
479479
IN slot_name name, IN plugin name,
480480
IN temporary boolean DEFAULT false,
481481
IN twophase boolean DEFAULT false,
482+
IN failover boolean DEFAULT false,
482483
OUT slot_name name, OUT lsn pg_lsn)
483484
RETURNS RECORD
484485
LANGUAGE INTERNAL

src/backend/catalog/system_views.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS
10231023
L.wal_status,
10241024
L.safe_wal_size,
10251025
L.two_phase,
1026-
L.conflict_reason
1026+
L.conflict_reason,
1027+
L.failover
10271028
FROM pg_get_replication_slots() AS L
10281029
LEFT JOIN pg_database D ON (L.datoid = D.oid);
10291030

src/backend/replication/slot.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
9090
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
9191

9292
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
93-
#define SLOT_VERSION 3 /* version for new files */
93+
#define SLOT_VERSION 4 /* version for new files */
9494

9595
/* Control array for replication slot management */
9696
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel)
248248
* during getting changes, if the two_phase option is enabled it can skip
249249
* prepare because by that time start decoding point has been moved. So the
250250
* user will only get commit prepared.
251+
* failover: If enabled, allows the slot to be synced to standbys so
252+
* that logical replication can be resumed after failover.
251253
*/
252254
void
253255
ReplicationSlotCreate(const char *name, bool db_specific,
254-
ReplicationSlotPersistency persistency, bool two_phase)
256+
ReplicationSlotPersistency persistency,
257+
bool two_phase, bool failover)
255258
{
256259
ReplicationSlot *slot = NULL;
257260
int i;
@@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
311314
slot->data.persistency = persistency;
312315
slot->data.two_phase = two_phase;
313316
slot->data.two_phase_at = InvalidXLogRecPtr;
317+
slot->data.failover = failover;
314318

315319
/* and then data only present in shared memory */
316320
slot->just_dirtied = false;

src/backend/replication/slotfuncs.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
4242

4343
/* acquire replication slot, this will check for conflicting names */
4444
ReplicationSlotCreate(name, false,
45-
temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
45+
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
46+
false);
4647

4748
if (immediately_reserve)
4849
{
@@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
117118
static void
118119
create_logical_replication_slot(char *name, char *plugin,
119120
bool temporary, bool two_phase,
121+
bool failover,
120122
XLogRecPtr restart_lsn,
121123
bool find_startpoint)
122124
{
@@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
133135
* error as well.
134136
*/
135137
ReplicationSlotCreate(name, true,
136-
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
138+
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
139+
failover);
137140

138141
/*
139142
* Create logical decoding context to find start point or, if we don't
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
171174
Name plugin = PG_GETARG_NAME(1);
172175
bool temporary = PG_GETARG_BOOL(2);
173176
bool two_phase = PG_GETARG_BOOL(3);
177+
bool failover = PG_GETARG_BOOL(4);
174178
Datum result;
175179
TupleDesc tupdesc;
176180
HeapTuple tuple;
@@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
188192
NameStr(*plugin),
189193
temporary,
190194
two_phase,
195+
failover,
191196
InvalidXLogRecPtr,
192197
true);
193198

@@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
232237
Datum
233238
pg_get_replication_slots(PG_FUNCTION_ARGS)
234239
{
235-
#define PG_GET_REPLICATION_SLOTS_COLS 15
240+
#define PG_GET_REPLICATION_SLOTS_COLS 16
236241
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
237242
XLogRecPtr currlsn;
238243
int slotno;
@@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
426431
}
427432
}
428433

434+
values[i++] = BoolGetDatum(slot_contents.data.failover);
435+
429436
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
430437

431438
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
693700
XLogRecPtr src_restart_lsn;
694701
bool src_islogical;
695702
bool temporary;
703+
bool failover;
696704
char *plugin;
697705
Datum values[2];
698706
bool nulls[2];
@@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
748756
src_islogical = SlotIsLogical(&first_slot_contents);
749757
src_restart_lsn = first_slot_contents.data.restart_lsn;
750758
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
759+
failover = first_slot_contents.data.failover;
751760
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
752761

753762
/* Check type of replication slot */
@@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
787796
plugin,
788797
temporary,
789798
false,
799+
failover,
790800
src_restart_lsn,
791801
false);
792802
}

src/backend/replication/walsender.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12121212
{
12131213
ReplicationSlotCreate(cmd->slotname, false,
12141214
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
1215-
false);
1215+
false, false);
12161216

12171217
if (reserve_wal)
12181218
{
@@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
12431243
*/
12441244
ReplicationSlotCreate(cmd->slotname, true,
12451245
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
1246-
two_phase);
1246+
two_phase, false);
12471247

12481248
/*
12491249
* Do options check early so that we can bail before calling the

src/bin/pg_upgrade/info.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
666666
* started and stopped several times causing any temporary slots to be
667667
* removed.
668668
*/
669-
res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, "
669+
res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
670670
"%s as caught_up, conflict_reason IS NOT NULL as invalid "
671671
"FROM pg_catalog.pg_replication_slots "
672672
"WHERE slot_type = 'logical' AND "
@@ -684,6 +684,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
684684
int i_slotname;
685685
int i_plugin;
686686
int i_twophase;
687+
int i_failover;
687688
int i_caught_up;
688689
int i_invalid;
689690

@@ -692,6 +693,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
692693
i_slotname = PQfnumber(res, "slot_name");
693694
i_plugin = PQfnumber(res, "plugin");
694695
i_twophase = PQfnumber(res, "two_phase");
696+
i_failover = PQfnumber(res, "failover");
695697
i_caught_up = PQfnumber(res, "caught_up");
696698
i_invalid = PQfnumber(res, "invalid");
697699

@@ -702,6 +704,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
702704
curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
703705
curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
704706
curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
707+
curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);
705708
curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
706709
curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
707710
}

0 commit comments

Comments
 (0)
0