8000 Update multimaster · postgrespro/postgres_cluster@42ae5c4 · GitHub
[go: up one dir, main page]

Skip to content

Commit 42ae5c4

Browse files
committed
Update multimaster
1 parent 25fe3cc commit 42ae5c4

File tree

5 files changed

+143
-44
lines changed

5 files changed

+143
-44
lines changed

contrib/multimaster/multimaster.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
2-
* pg_dtm.c
2+
* multimaster.c
33
*
4-
* Pluggable distributed transaction manager
4+
* Multimaster based on logical replication
55
*
66
*/
77

contrib/multimaster/receiver_raw.c

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
*/
1515

1616
/* Some general headers for custom bgworker facility */
17+
#include <unistd.h>
1718
#include "postgres.h"
1819
#include "fmgr.h"
1920
#include "libpq-fe.h"
2021
#include "pqexpbuffer.h"
2122
#include "access/xact.h"
23+
#include "access/transam.h"
2224
#include "lib/stringinfo.h"
2325
#include "pgstat.h"
2426
#include "executor/spi.h"
@@ -50,7 +52,7 @@ static int receiver_idle_time = 1;
5052
static bool receiver_sync_mode = true;
5153

5254
/* Worker name */
53-
static char *worker_name = "multimaster";
55+
char worker_proc[16];
5456

5557
/* Lastly written positions */
5658
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -93,7 +95,7 @@ sendFeedback(PGconn *conn, int64 now)
9395
ereport(LOG, (errmsg("%s: confirming write up to %X/%X, "
9496
"flush to %X/%X (slot custom_slot), "
9597
"applied to %X/%X",
96-
worker_name,
98+
worker_proc,
9799
(uint32) (output_written_lsn >> 32),
98100
(uint32) output_written_lsn,
99101
(uint32) (output_fsync_lsn >> 32),
@@ -119,7 +121,7 @@ sendFeedback(PGconn *conn, int64 now)
119121
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
120122
{
121123
ereport(LOG, (errmsg("%s: could not send feedback packet: %s",
122-
worker_name, PQerrorMessage(conn))));
124+
worker_proc, PQerrorMessage(conn))));
123125
return false;
124126
}
125127

@@ -209,13 +211,15 @@ receiver_raw_main(Datum main_arg)
209211
PQExpBuffer query;
210212
PGconn *conn;
211213
PGresult *res;
214+
TransactionId xid = InvalidTransactionId;
212215
bool insideTrans = false;
213216
bool rollbackTransaction = false;
214217

215218
/* Register functions for SIGTERM/SIGHUP management */
216219
pqsignal(SIGHUP, receiver_raw_sighup);
217220
pqsignal(SIGTERM, receiver_raw_sigterm);
218221

222+
sprintf(worker_proc, "mm_recv_%d", getpid());
219223

220224
/* We're now ready to receive signals */
221225
BackgroundWorkerUnblockSignals();
@@ -229,13 +233,13 @@ receiver_raw_main(Datum main_arg)
229233
{
230234
PQfinish(conn);
231235
ereport(ERROR, (errmsg("%s: Could not establish connection to remote server",
232-
worker_name)));
236+
worker_proc)));
233237
proc_exit(1);
234238
}
235239

236240
query = createPQExpBuffer();
237241

238-
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", args->receiver_slot, worker_name);
242+
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", args->receiver_slot, worker_proc);
239243
res = PQexec(conn, query->data);
240244
if (PQresultStatus(res) != PGRES_TUPLES_OK)
241245
{
@@ -244,7 +248,7 @@ receiver_raw_main(Datum main_arg)
244248
{
245249
PQclear(res);
246250
ereport(ERROR, (errmsg("%s: Could not create logical slot",
247-
worker_name)));
251+
worker_proc)));
248252
proc_exit(1);
249253
}
250254
}
@@ -259,7 +263,7 @@ receiver_raw_main(Datum main_arg)
259263
{
260264
PQclear(res);
261265
ereport(LOG, (errmsg("%s: Could not start logical replication",
262-
worker_name)));
266+
worker_proc)));
263267
proc_exit(1);
264268
}
265269
PQclear(res);
@@ -284,13 +288,13 @@ receiver_raw_main(Datum main_arg)
284288
/* Process config file */
285289
ProcessConfigFile(PGC_SIGHUP);
286290
got_sighup = false;
287-
ereport(LOG, (errmsg("%s: processed SIGHUP", worker_name)));
291+
ereport(LOG, (errmsg("%s: processed SIGHUP", worker_proc)));
288292
}
289293

290294
if (got_sigterm)
291295
{
292296
/* Simply exit */
293-
ereport(LOG, (errmsg("%s: processed SIGTERM", worker_name)));
297+
ereport(LOG, (errmsg("%s: processed SIGTERM", worker_proc)));
294298
proc_exit(0);
295299
}
296300

@@ -342,15 +346,15 @@ receiver_raw_main(Datum main_arg)
342346
walEnd = fe_recvint64(&copybuf[pos]);
343347
ereport(LOG, (errmsg("%s: keepalive message from server, "
344348
"walEnd %X/%X, ",
345-
worker_name,
349+
worker_proc,
346350
(uint32) (walEnd >> 32),
347351
(uint32) walEnd)));
348352
pos += 8; /* read walEnd */
349353
pos += 8; /* skip sendTime */
350354
if (rc < pos + 1)
351355
{
352356
ereport(LOG, (errmsg("%s: streaming header too small: %d",
353-
worker_name, rc)));
357+
worker_proc, rc)));
354358
proc_exit(1);
355359
}
356360
replyRequested = copybuf[pos];
@@ -378,7 +382,7 @@ receiver_raw_main(Datum main_arg)
378382
else if (copybuf[0] != 'w')
379383
{
380384
ereport(LOG, (errmsg("%s: Incorrect streaming header",
381-
worker_name)));
385+
worker_proc)));
382386
proc_exit(1);
383387
}
384388

@@ -392,14 +396,14 @@ receiver_raw_main(Datum main_arg)
392396
if (rc < hdr_len + 1)
393397
{
394398
ereport(LOG, (errmsg("%s: Streaming header too small",
395-
worker_name)));
399+
worker_proc)));
396400
proc_exit(1);
397401
}
398402

399403
/* Log some useful information */
400404
ereport(LOG, (errmsg("%s: received from server, walStart %X/%X, "
401405
"and walEnd %X/%X",
402-
worker_name,
406+
worker_proc,
403407
(uint32) (walStart >> 32),
404408
(uint32) walStart,
405409
(uint32) (walEnd >> 32),
@@ -411,17 +415,13 @@ receiver_raw_main(Datum main_arg)
411415
SetCurrentStatementStartTimestamp();
412416

413417
if (strncmp(stmt, "BEGIN ", 6) == 0) {
414-
TransactionId xid;
415418
int rc = sscanf(stmt + 6, "%u", &xid);
416419
Assert(rc == 1);
417420
Assert(!insideTrans);
418421
SetCurrentStatementStartTimestamp();
419422
MMJoinTransaction(xid);
420423

421424
StartTransactionCommand();
422-
BeginTransactionBlock();
423-
CommitTransactionCommand();
424-
425425
SPI_connect();
426426
PushActiveSnapshot(GetTransactionSnapshot());
427427
insideTrans = true;
@@ -431,19 +431,20 @@ receiver_raw_main(Datum main_arg)
431431
insideTrans = false;
432432
SPI_finish();
433433
PopActiveSnapshot();
434-
StartTransactionCommand();
435434
if (rollbackTransaction) {
436-
UserAbortTransactionBlock();
437-
}
438-
PG_TRY();
439-
{
440-
CommitTransactionCommand();
441-
}
442-
PG_CATCH();
443-
{
444-
elog(WARNING, "%s: Current transaction is aborted at receiver", worker_name);
435+
elog(WARNING, "%s: Rollback transaction %u", worker_proc, xid);
436+
AbortCurrentTransaction();
437+
} else {
438+
PG_TRY();
439+
{
440+
CommitTransactionCommand();
441+
}
442+
PG_CATCH();
443+
{
444+
elog(WARNING, "%s: Commit of transaction %u is failed", worker_proc, xid);
445+
}
446+
PG_END_TRY();
445447
}
446-
PG_END_TRY();
447448
} else if (!rollbackTransaction) {
448449
Assert(insideTrans);
449450
/* Execute query */
@@ -452,20 +453,20 @@ receiver_raw_main(Datum main_arg)
452453
rc = SPI_execute(stmt, false, 0);
453454
if (rc == SPI_OK_INSERT)
454455
ereport(LOG, (errmsg("%s: INSERT received correctly: %s",
455-
worker_name, stmt)));
456+
worker_proc, stmt)));
456457
else if (rc == SPI_OK_UPDATE)
457458
ereport(LOG, (errmsg("%s: UPDATE received correctly: %s",
458-
worker_name, stmt)));
459+
worker_proc, stmt)));
459460
else if (rc == SPI_OK_DELETE)
460461
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
461-
worker_name, stmt)));
462+
worker_proc, stmt)));
462463
else
463464
ereport(WARNING, (errmsg("%s: Error when applying change: %s",
464-
worker_name, stmt)));
465+
worker_proc, stmt)));
465466
}
466467
PG_CATCH();
467468
{
468-
elog(WARNING, "%s: %s failed at receiver", worker_name, stmt);
469+
elog(WARNING, "%s: %s failed in transaction %u", worker_proc, stmt, xid);
469470
rollbackTransaction = true;
470471
}
471472
PG_END_TRY();
@@ -531,15 +532,15 @@ receiver_raw_main(Datum main_arg)
531532
else if (r < 0)
532533
{
533534
ereport(LOG, (errmsg("%s: Incorrect status received... Leaving.",
534-
worker_name)));
535+
worker_proc)));
535536
proc_exit(1);
536537
}
537538

538539
/* Else there is actually data on the socket */
539540
if (PQconsumeInput(conn) == 0)
540541
{
541542
ereport(LOG, (errmsg("%s: Data remaining on the socket... Leaving.",
542-
worker_name)));
543+
worker_proc)));
543544
proc_exit(1);
544545
}
545546
continue;
@@ -549,15 +550,15 @@ receiver_raw_main(Datum main_arg)
549550
if (rc == -1)
550551
{
551552
ereport(LOG, (errmsg("%s: COPY Stream has abruptly ended...",
552-
worker_name)));
553+
worker_proc)));
553554
break;
554555
}
555556

556557
/* Failure when reading copy stream, leave */
557558
if (rc == -2)
558559
{
559560
ereport(LOG, (errmsg("%s: Failure while receiving changes...",
560-
worker_name)));
561+
worker_proc)));
561562
proc_exit(1);
562563
}
563564
}
@@ -608,4 +609,9 @@ int MMStartReceivers(char* conns, int node_id)
608609
worker.bgw_main_arg = (Datum)ctx;
609610
RegisterBackgroundWorker(&worker);
610611
}
611-
con
612+
conn_str = p + 1;
613+
}
614+
615+
return i;
616+
}
617+

contrib/multimaster/tests/dtmbench

-49.8 KB
Binary file not shown.

contrib/multimaster/tests/pg_hba.conf

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# PostgreSQL Client Authentication Configuration File
2+
# ===================================================
3+
#
4+
# Refer to the "Client Authentication" section in the PostgreSQL
5+
# documentation for a complete description of this file. A short
6+
# synopsis follows.
7+
#
8+
# This file controls: which hosts are allowed to connect, how clients
9+
# are authenticated, which PostgreSQL user names they can use, which
10+
# databases they can access. Records take one of these forms:
11+
#
12+
# local DATABASE USER METHOD [OPTIONS]
13+
# host DATABASE USER ADDRESS METHOD [OPTIONS]
14+
# hostssl DATABASE USER ADDRESS METHOD [OPTIONS]
15+
# hostnossl DATABASE USER ADDRESS METHOD [OPTIONS]
16+
#
17+
# (The uppercase items must be replaced by actual values.)
18+
#
19+
# The first field is the connection type: "local" is a Unix-domain
20+
# socket, "host" is either a plain or SSL-encrypted TCP/IP socket,
21+
# "hostssl" is an SSL-encrypted TCP/IP socket, and "hostnossl" is a
22+
# plain TCP/IP socket.
23+
#
24+
# DATABASE can be "all", "sameuser", "samerole", "replication", a
25+
# database name, or a comma-separated list thereof. The "all"
26+
# keyword does not match "replication". Access to replication
27+
# must be enabled in a separate record (see example below).
28+
#
29+
# USER can be "all", a user name, a group name prefixed with "+", or a
30+
# comma-separated list thereof. In both the DATABASE and USER fields
31+
# you can also write a file name prefixed with "@" to include names
32+
# from a separate file.
33+
#
34+
# ADDRESS specifies the set of hosts the record matches. It can be a
35+
# host name, or it is made up of an IP address and a CIDR mask that is
36+
# an integer (between 0 and 32 (IPv4) or 128 (IPv6) inclusive) that
37+
# specifies the number of significant bits in the mask. A host name
38+
# that starts with a dot (.) matches a suffix of the actual host name.
39+
# Alternatively, you can write an IP address and netmask in separate
40+
# columns to specify the set of hosts. Instead of a CIDR-address, you
41+
# can write "samehost" to match any of the server's own IP addresses,
42+
# or "samenet" to match any address in any subnet that the server is
43+
# directly connected to.
44+
#
45+
# METHOD can be "trust", "reject", "md5", "password", "gss", "sspi",
46+
# "ident", "peer", "pam", "ldap", "radius" or "cert". Note that
47+
# "password" sends passwords in clear text; "md5" is preferred since
48+
# it sends encrypted passwords.
49+
#
50+
# OPTIONS are a set of options for the authentication in the format
51+
# NAME=VALUE. The available options depend on the different
52+
# authentication methods -- refer to the "Client Authentication"
53+
# section in the documentation for a list of which options are
54+
# available for which authentication methods.
55+
#
56+
# Database and user names containing spaces, commas, quotes and other
57+
# special characters must be quoted. Quoting one of the keywords
58+
# "all", "sameuser", "samerole" or "replication" makes the name lose
59+
# its special character, and just match a database or username with
60+
# that name.
61+
#
62+
# This file is read on server startup and when the postmaster receives
63+
# a SIGHUP signal. If you edit the file on a running system, you have
64+
# to SIGHUP the postmaster for the changes to take effect. You can
65+
# use "pg_ctl reload" to do that.
66+
67+
# Put your actual configuration here
68+
# ----------------------------------
69+
#
70+
# If you want to allow non-local connections, you need to add more
71+
# "host" records. In that case you will also need to make PostgreSQL
72+
# listen on a non-local interface via the listen_addresses
73+
# configuration parameter, or via the -i or -h command line switches.
74+
75+
# CAUTION: Configuring the system for local "trust" authentication
76+
# allows any local user to connect as any PostgreSQL user, including
77+
# the database superuser. If you do not trust all your local users,
78+
# use another authentication method.
79+
80+
81+
# TYPE DATABASE USER ADDRESS METHOD
82+
83+
# "local" is for Unix domain socket connections only
84+
local all all trust
85+
# IPv4 local connections:
86+
host all all 127.0.0.1/32 trust
87+
# IPv6 local connections:
88+
host all all ::1/128 trust
89+
# Allow replication connections from localhost, by a user with the
90+
# replication privilege.
91+
local replication knizhnik trust
92+
host replication knizhnik 127.0.0.1/32 trust
93+
#host replication knizhnik ::1/128 trust

contrib/multimaster/tests/postgresql.conf.mm

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -547,9 +547,9 @@
547547
# These settings are initialized by initdb, but they can be changed.
548548
lc_messages = 'en_US.UTF-8' # locale for system error message
549549
# strings
550-
lc_monetary = 'ru_RU.UTF-8' # locale for monetary formatting
551-
lc_numeric = 'ru_RU.UTF-8' # locale for number formatting
552-
lc_time = 'ru_RU.UTF-8' # locale for time formatting
550+
lc_monetary = 'en_US.UTF-8' # locale for monetary formatting
551+
lc_numeric = 'en_US.UTF-8' # locale for number formatting
552+
lc_time = 'en_US.UTF-8' # locale for time formatting
553553

554554
# default configuration for text search
555555
default_text_search_config = 'pg_catalog.english'

0 commit comments

Comments
 (0)
0