8000 2PC in pgoutput for shardman: append gid to avoid collisions. · postgrespro/postgres_cluster@576d589 · GitHub
[go: up one dir, main page]

Skip to content

Commit 576d589

Browse files
committed
2PC in pgoutput for shardman: append gid to avoid collisions.
For all non-shardman xacts 2PC decoding is disabled. Also, add GUC enabling 2PC decoding always in pgoutput to test it (011_twophase.pl).
1 parent f46f9d8 commit 576d589

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

src/backend/replication/logical/proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data,
140140
*/
141141
void
142142
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
143-
XLogRecPtr prepare_lsn)
143+
XLogRecPtr prepare_lsn, const char *gid)
144144
{
145145
uint8 flags = 0;
146146

@@ -165,7 +165,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
165165
pq_sendint64(out, txn->commit_time);
166166

167167
/* send gid */
168-
pq_sendstring(out, txn->gid);
168+
pq_sendstring(out, gid);
169169
}
170170

171171
/*

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,18 @@
1212
*/
1313
#include "postgres.h"
1414

15+
#include "access/genam.h"
16+
#include "access/heapam.h"
1517
#include "catalog/pg_publication.h"
1618

19+
#include "nodes/makefuncs.h"
20+
1721
#include "replication/logical.h"
1822
#include "replication/logicalproto.h"
1923
#include "replication/origin.h"
2024
#include "replication/pgoutput.h"
2125

26+
#include "utils/guc.h"
2227
#include "utils/inval.h"
2328
#include "utils/int8.h"
2429
#include "utils/memutils.h"
@@ -27,6 +32,8 @@
2732

2833
PG_MODULE_MAGIC;
2934

35+
extern void _PG_init(void);
36+
3037
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
3138

3239
static void pgoutput_startup(LogicalDecodingContext *ctx,
@@ -62,6 +69,7 @@ static bool publications_valid;
6269
static List *LoadPublications(List *pubnames);
6370
static void publication_invalidation_cb(Datum arg, int cacheid,
6471
uint32 hashvalue);
72+
static char *append_shardman_node_id(const char *gid);
6573

6674
/* Entry in the map used to remember which relation schemas we sent. */
6775
typedef struct RelationSyncEntry
@@ -75,12 +83,29 @@ typedef struct RelationSyncEntry
7583
/* Map used to remember which relation schemas we sent. */
7684
static HTAB *RelationSyncCache = NULL;
7785

86+
/* GUC just for tests */
87+
static bool use_twophase;
88+
7889
static void init_rel_sync_cache(MemoryContext decoding_context);
7990
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
8091
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
8192
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
8293
uint32 hashvalue);
8394

95+
void
96+
_PG_init(void)
97+
{
98+
DefineCustomBoolVariable(
99+
"pgoutput.use_twophase",
100+
"Toggle 2PC",
101+
NULL,
102+
&use_twophase,
103+
false,
104+
PGC_SUSET,
105+
0,
106+
NULL, NULL, NULL);
107+
}
108+
84109
/*
85110
* Specify output plugin callbacks
86111
*/
@@ -337,10 +362,17 @@ static void
337362
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
338363
XLogRecPtr prepare_lsn)
339364
{
365+
char *gid = txn->gid;
366+
340367
OutputPluginUpdateProgress(ctx);
341368

342369
OutputPluginPrepareWrite(ctx, true);
343-
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
370+
/* Append :sysid to gid to avoid collision */
371+
if (strstr(gid, "pgfdw:") != NULL)
372+
gid = psprintf("%s:%lx", txn->gid, GetSystemIdentifier());
373+
logicalrep_write_prepare(ctx->out, txn, prepare_lsn, gid);
374+
if (strstr(gid, "pgfdw:") != NULL)
375+
pfree(gid);
344376
OutputPluginWrite(ctx, true);
345377
}
346378

@@ -351,23 +383,38 @@ static void
351383
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
352384
XLogRecPtr prepare_lsn)
353385
{
386+
char *gid = txn->gid;
387+
354388
OutputPluginUpdateProgress(ctx);
355389

356390
OutputPluginPrepareWrite(ctx, true);
357-
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
391+
/* Append :sysid to gid to avoid collision */
392+
if (strstr(gid, "pgfdw:") != NULL)
393+
gid = psprintf("%s:%lx", txn->gid, GetSystemIdentifier());
394+
logicalrep_write_prepare(ctx->out, txn, prepare_lsn, gid);
395+
if (strstr(gid, "pgfdw:") != NULL)
396+
pfree(gid);
358397
OutputPluginWrite(ctx, true);
359398
}
399+
360400
/*
361401
* PREPARE callback
362402
*/
363403
static void
364404
pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
365405
XLogRecPtr prepare_lsn)
366406
{
407+
char *gid = txn->gid;
408+
367409
OutputPluginUpdateProgress(ctx);
368410

369411
OutputPluginPrepareWrite(ctx, true);
370-
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
412+
/* Append :sysid to gid to avoid collision */
413+
if (strstr(gid, "pgfdw:") != NULL)
414+
gid = psprintf("%s:%lx", txn->gid, GetSystemIdentifier());
415+
logicalrep_write_prepare(ctx->out, txn, prepare_lsn, gid);
416+
if (strstr(gid, "pgfdw:") != NULL)
417+
pfree(gid);
371418
OutputPluginWrite(ctx, true);
372419
}
373420

@@ -502,13 +549,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
502549
/*
503550
* Filter out unnecessary two-phase transactions.
504551
*
505-
* Currently, we forward all two-phase transactions
552+
* Make 2PC on shardman's xacts.
506553
*/
507554
static bool
508555
pgoutput_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
509-
TransactionId xid, const char *gid)
556+
TransactionId xid, const char *gid)
510557
{
511-
return false;
558+
if (strstr(gid, "pgfdw:") != NULL) /* shardman */
559+
{
560+
return false;
561+
}
562+
return !use_twophase;
512563
}
513564

514565
/*

src/include/replication/logicalproto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
9292
extern void logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn,
9393
XLogRecPtr abort_lsn);
9494
extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
95-
XLogRecPtr prepare_lsn);
95+
XLogRecPtr prepare_lsn, const char *gid);
9696
extern void logicalrep_read_commit(StringInfo in,
9797
LogicalRepCommitData *commit_data, uint8 *flags);
9898
extern void logicalrep_read_prepare(StringInfo in,

src/test/subscription/t/011_twophase.pl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
$node_publisher->append_conf(
1212
'postgresql.conf', qq(
1313
max_prepared_transactions = 10
14+
pgoutput.use_twophase = true
1415
));
1516
$node_publisher->start;
1617

0 commit comments

Comments
 (0)
0