10000 Add 2PC support to pgoutput (based on some patch from -hackers). · postgrespro/postgres_cluster@5fcda94 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5fcda94

Browse files
committed
Add 2PC support to pgoutput (based on some patch from -hackers).
1 parent ef5bb79 commit 5fcda94

File tree

5 files changed

+624
-11
lines changed

5 files changed

+624
-11
lines changed

src/backend/replication/logical/proto.c

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,11 @@ void
7575
logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
7676
XLogRecPtr commit_lsn)
7777
{
78-
uint8 flags = 0;
78+
uint8 flags = 0;
7979

8080
pq_sendbyte(out, 'C'); /* sending COMMIT */
8181

82+
flags |= LOGICALREP_IS_COMMIT;
8283
/* send the flags field (unused for now) */
8384
pq_sendbyte(out, flags);
8485

@@ -89,21 +90,106 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
8990
}
9091

9192
/*
92-
* Read transaction COMMIT from the stream.
93+
* Write ABORT to the output stream.
94+
*/
95+
void
96+
logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn,
97+
XLogRecPtr abort_lsn)
98+
{
99+
uint8 flags = 0;
100+
101+
pq_sendbyte(out, 'C'); /* sending ABORT flag below */
102+
103+
flags |= LOGICALREP_IS_ABORT;
104+
/* send the flags field */
105+
pq_sendbyte(out, flags);
106+
107+
/* send fields */
108+
pq_sendint64(out, abort_lsn);
109+
pq_sendint64(out, txn->end_lsn);
110+
pq_sendint64(out, txn->commit_time);
111+
}
112+
113+
/*
114+
* Read transaction COMMIT|ABORT from the stream.
93115
*/
94116
void
95-
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
117+
logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data,
118+
uint8 *flags)
96119
{
97-
/* read flags (unused for now) */
98-
uint8 flags = pq_getmsgbyte(in);
120+
/* read flags */
121+
uint8 commit_flags = pq_getmsgbyte(in);
99122

100-
if (flags != 0)
101-
elog(ERROR, "unrecognized flags %u in commit message", flags);
123+
if (!(commit_flags & LOGICALREP_COMMIT_MASK))
124+
elog(ERROR, "unrecognized flags %u in commit|abort message",
125+
commit_flags);
102126

103127
/* read fields */
104128
commit_data->commit_lsn = pq_getmsgint64(in);
105129
commit_data->end_lsn = pq_getmsgint64(in);
106130
commit_data->committime = pq_getmsgint64(in);
131+
132+
/* set gid to empty */
133+
commit_data->gid[0] = '\0';
134+
135+
*flags = commit_flags;
136+
}
137+
138+
/*
139+
* Write PREPARE to the output stream.
140+
*/
141+
void
142+
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
143+
XLogRecPtr prepare_lsn)
144+
{
145+
uint8 flags = 0;
146+
147+
pq_sendbyte(out, 'P'); /* sending PREPARE protocol */
148+
149+
if (txn->txn_flags & RBTXN_COMMIT_PREPARED)
150+
flags |= LOGICALREP_IS_COMMIT_PREPARED;
151+
else if (txn->txn_flags & RBTXN_ROLLBACK_PREPARED)
152+
flags |= LOGICALREP_IS_ROLLBACK_PREPARED;
153+
else if (txn->txn_flags & RBTXN_PREPARE)
154+
flags |= LOGICALREP_IS_PREPARE;
155+
156+
if (flags == 0)
157+
elog(ERROR, "unrecognized flags %u in [commit|rollback] prepare message", flags);
158+
159+
/* send the flags field */
160+
pq_sendbyte(out, flags);
161+
162+
/* send fields */
163+
pq_sendint64(out, prepare_lsn);
164+
pq_sendint64(out, txn->end_lsn);
165+
pq_sendint64(out, txn->commit_time);
166+
167+
/* send gid */
168+
pq_sendstring(out, txn->gid);
169+
}
170+
171+
/*
172+
* Read transaction PREPARE from the stream.
173+
*/
174+
void
175+
logicalrep_read_prepare(StringInfo in, LogicalRepCommitData *commit_data, uint8 *flags)
176+
{
177+
/* read flags */
178+
uint8 prep_flags = pq_getmsgbyte(in);
179+
180+
if (!(prep_flags & LOGICALREP_PREPARE_MASK))
181+
elog(ERROR, "unrecognized flags %u in prepare message", prep_flags);
182+
183+
/* read fields */
184+
commit_data->commit_lsn = pq_getmsgint64(in);
185+
commit_data->end_lsn = pq_getmsgint64(in);
186+
commit_data->committime = pq_getmsgint64(in);
187+
188+
/* read gid */
189+
strcpy(commit_data->gid, pq_getmsgstring(in));
190+
191+
/* set flags */
192+
*flags = prep_flags;
107193
}
108194

109195
/*

src/backend/replication/logical/worker.c

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,8 +474,9 @@ static void
474474
apply_handle_commit(StringInfo s)
475475
{
476476
LogicalRepCommitData commit_data;
477+
uint8 flags = 0;
477478

478-
logicalrep_read_commit(s, &commit_data);
479+
logicalrep_read_commit(s, &commit_data, &flags);
479480

480481
Assert(commit_data.commit_lsn == remote_final_lsn);
481482

@@ -489,7 +490,11 @@ apply_handle_commit(StringInfo s)
489490
replorigin_session_origin_lsn = commit_data.end_lsn;
490491
replorigin_session_origin_timestamp = commit_data.committime;
491492

492-
CommitTransactionCommand();
493+
if (flags & LOGICALREP_IS_COMMIT)
494+
CommitTransactionCommand();
495+
else if (flags & LOGICALREP_IS_ABORT)
496+
AbortCurrentTransaction();
497+
493498
pgstat_report_stat(false);
494499

495500
store_flush_position(commit_data.end_lsn);
@@ -509,6 +514,121 @@ apply_handle_commit(StringInfo s)
509514
pgstat_report_activity(STATE_IDLE, NULL);
510515
}
511516

517+
static void
518+
apply_handle_prepare_txn(LogicalRepCommitData *commit_data)
519+
{
520+
Assert(commit_data->commit_lsn == remote_final_lsn);
521+
/* The synchronization worker runs in single transaction. */
522+
if (IsTransactionState() && !am_tablesync_worker())
523+
{
524+
/* End the earlier transaction and start a new one */
525+
BeginTransactionBlock();
526+
CommitTransactionCommand();
527+
StartTransactionCommand();
528+
/*
529+
* Update origin state so we can restart streaming from correct
530+
* position in case of crash.
531+
*/
532+
replorigin_session_origin_lsn = commit_data->end_lsn;
533+
replorigin_session_origin_timestamp = commit_data->committime;
534+
535+
PrepareTransactionBlock(commit_data->gid);
536+
CommitTransactionCommand();
537+
pgstat_report_stat(false);
538+
539+
store_flush_position(commit_data->end_lsn);
540+
}
541+
else
542+
{
543+
/* Process any invalidation messages that might have accumulated. */
544+
AcceptInvalidationMessages();
545+
maybe_reread_subscription();
546+
}
547+
548+
in_remote_transaction = false;
549+
550+
/* Process any tables that are being synchronized in parallel. */
551+
process_syncing_tables(commit_data->end_lsn);
552+
553+
pgstat_report_activity(STATE_IDLE, NULL);
554+
}
555+
556+
static void
557+
apply_handle_commit_prepared_txn(LogicalRepCommitData *commit_data)
558+
{
559+
/* there is no transaction when COMMIT PREPARED is called */
560+
ensure_transaction();
561+
562+
/*
563+
* Update origin state so we can restart streaming from correct
564+
* position in case of crash.
565+
*/
566+
replorigin_session_origin_lsn = commit_data->end_lsn;
567+
replorigin_session_origin_timestamp = commit_data->committime;
568+
569+
FinishPreparedTransaction(commit_data->gid, true);
570+
CommitTransactionCommand();
571+
pgstat_report_stat(false);
572+
573+
store_flush_position(commit_data->end_lsn);
574+
in_remote_transaction = false;
575+
576+
/* Process any tables that are being synchronized in parallel. */
577+
process_syncing_tables(commit_data->end_lsn);
578+
579+
pgstat_report_activity(STATE_IDLE, NULL);
580+
}
581+
582+
static void
583+
apply_handle_rollback_prepared_txn(LogicalRepCommitData *commit_data)
584+
{
585+
/* there is no transaction when ABORT/ROLLBACK PREPARED is called */
586+
ensure_transaction();
587+
588+
/*
589+
* Update origin state so we can restart streaming from correct
590+
* position in case of crash.
591+
*/
592+
replorigin_session_origin_lsn = commit_data->end_lsn;
593+
replorigin_session_origin_timestamp = commit_data->committime;
594+
595+
/* FIXME: it is ok if xact is absent */
596+
FinishPreparedTransaction(commit_data->gid, false);
597+
CommitTransactionCommand();
598+
pgstat_report_stat(false);
599+
600+
store_flush_position(commit_data->end_lsn);
601+
in_remote_transaction = false;
602+
603+
/* Process any tables that are being synchronized in parallel. */
604+
process_syncing_tables(commit_data->end_lsn);
605+
606+
pgstat_report_activity(STATE_IDLE, NULL);
607+
}
608+
609+
/*
610+
* Handle PREPARE message.
611+
*/
612+
static void
613+
apply_handle_prepare(StringInfo s)
614+
{
615+
LogicalRepCommitData commit_data;
616+
uint8 flags = 0;
617+
618+
logicalrep_read_prepare(s, &commit_data, &flags);
619+
620+
if (flags & LOGICALREP_IS_PREPARE)
621+
apply_handle_prepare_txn(&commit_data);
622+
else if (flags & LOGICALREP_IS_COMMIT_PREPARED)
623+
apply_handle_commit_prepared_txn(&commit_data);
624+
else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED)
625+
apply_handle_rollback_prepared_txn(&commit_data);
626+
else
627+
ereport(ERROR,
628+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
629+
errmsg("wrong [commit|rollback] prepare message")));
630+
}
631+
512632
/*
513633
* Handle ORIGIN message.
514634
*
@@ -969,10 +1089,14 @@ apply_dispatch(StringInfo s)
9691089
case 'B':
9701090
apply_handle_begin(s);
9711091
break;
972-
/* COMMIT */
1092+
/* COMMIT|ABORT */
9731093
case 'C':
9741094
apply_handle_commit(s);
9751095
break;
1096+
/* [COMMIT|ROLLBACK] PREPARE */
1097+
case 'P':
1098+
apply_handle_prepare(s);
1099+
break;
9761100
/* INSERT */
9771101
case 'I':
9781102
apply_handle_insert(s);

0 commit comments

Comments
 (0)
0