8000 2PC decoding and tablesync. · postgrespro/postgres_cluster@37862be · GitHub
[go: up one dir, main page]

Skip to content

Commit 37862be

Browse files
committed
2PC decoding and tablesync.
Attempt to cover subtle issues with tablesync and 2PC decoding. Before going to SYNCDONE, now sync worker finishes all xacts prepared beforehand; see tablesync.c header comments. To make this work, 2PC decoding is now controlled by pgoutput option which is set by subscriber depending on value of logical_replication_2pc GUC. Tablesync worker always disables 2PC. Add option to return num of unfinished prepares in CREATE_REPLICATION_SLOT. To preserve backward compatibility, e.g. with basebackup which expects 4 atts.
1 parent 31495de commit 37862be

File tree

23 files changed

+948
-364
lines changed

23 files changed

+948
-364
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "mb/pg_wchar.h"
2828
#include "miscadmin.h"
2929
#include "pgstat.h"
30+
#include "replication/logicalworker.h"
3031
#include "replication/walreceiver.h"
3132
#include "utils/builtins.h"
3233
#include "utils/memutils.h"
@@ -73,7 +74,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
7374
const char *slotname,
7475
bool temporary,
7576
CRSSnapshotAction snapshot_action,
76-
XLogRecPtr *lsn);
77+
XLogRecPtr *lsn,
78+
int *num_unfinished_prepares);
7779
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
7880
const char *query,
7981
const int nRetTypes,
@@ -419,6 +421,22 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
419421
PQfreemem(pubnames_literal);
420422
pfree(pubnames_str);
421423

424+
/*
425+
* If logical_replication_2pc is off, don't include it at all --
426+
* probably we are talking to vanilla server.
427+
*/
428+
if (options->proto.logical.twophase != LOGICAL_REPLICATION_2PC_OFF)
429+
{
430+
appendStringInfo(&cmd, ", twophase '%u'",
431+
options->proto.logical.twophase);
432+
}
433+
434+
/* Same with prepare_notifies */
435+
if (options->proto.logical.prepare_notifies)
436+
{
437+
appendStringInfo(&cmd, ", prepare_notifies 'true'");
438+
}
439+
422440
appendStringInfoChar(&cmd, ')');
423441
}
424442
else
@@ -790,7 +808,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
790808
static char *
791809
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
792810
bool temporary, CRSSnapshotAction snapshot_action,
793-
XLogRecPtr *lsn)
811+
XLogRecPtr *lsn, int *num_unfinished_prepares)
794812
{
795813
PGresult *res;
796814
StringInfoData cmd;
@@ -818,6 +836,10 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
818836
appendStringInfoString(&cmd, " USE_SNAPSHOT");
819837
break;
820838
}
839+
if (num_unfinished_prepares != NULL)
840+
{
841+
appendStringInfoString(&cmd, " COUNT_PREPARES");
842+
}
821843
}
822844

823845
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
@@ -838,6 +860,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
838860
else
839861
snapshot = NULL;
840862

863+
if (num_unfinished_prepares != NULL && (PQnfields(res) == 5))
864+
*num_unfinished_prepares = pg_atoi(PQgetvalue(res, 0, 4), sizeof(int32), 0);
865+
841866
PQclear(res);
842867

843868
return snapshot;

0 commit comments

Comments
 (0)
0