8000 Merge branch 'master' of github.com:postgrespro/postgres_cluster · postgrespro/postgres_cluster@6a36538 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6a36538

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents c0ec3bf + 1a8c8c5 commit 6a36538

File tree

9 files changed

+184
-36
lines changed

9 files changed

+184
-36
lines changed

contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o
33

44
override CPPFLAGS += -I../raftable
55

contrib/mmts/arbiter.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,15 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
239239
static void MtmSetSocketOptions(int sd)
240240
{
241241
#ifdef TCP_NODELAY
242-
int optval = 1;
243-
if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval)) < 0) {
242+
int on = 1;
243+
if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&on, sizeof(on)) < 0) {
244244
elog(WARNING, "Failed to set TCP_NODELAY: %m");
245245
}
246246
#endif
247+
if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char const*)&on, sizeof(on)) < 0) {
248+
elog(WARNING, "Failed to set SO_KEEPALIVE: %m");
249+
}
250+
247251
if (tcp_keepalives_idle) {
248252
#ifdef TCP_KEEPIDLE
249253
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPIDLE,

contrib/mmts/multimaster.c

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -865,10 +865,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
865865

866866
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
867867
{
868-
MtmLock(LW_EXCLUSIVE);
869-
MtmSyncClock(globalSnapshot);
870-
MtmUnlock();
871-
868+
if (globalSnapshot != INVALID_CSN) {
869+
MtmLock(LW_EXCLUSIVE);
870+
MtmSyncClock(globalSnapshot);
871+
MtmUnlock();
872+
} else {
873+
globalSnapshot = MtmTx.snapshot;
874+
}
872875
if (!TransactionIdIsValid(gtid->xid)) {
873876
/* In case of recovery InvalidTransactionId is passed */
874877
Assert(Mtm->status == MTM_RECOVERY);
@@ -1877,6 +1880,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
18771880
}
18781881
}
18791882
}
1883+
static void
1884+
MtmOnProcExit(int code, Datum arg)
1885+
{
1886+
if (MtmReplicationNodeId >= 0) {
1887+
elog(WARNING, "WAL-sender to %d is terminated", MtmReplicationNodeId);
1888+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1889+
}
1890+
}
18801891

18811892
static void
18821893
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
@@ -1923,13 +1934,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
19231934
elog(NOTICE, "Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
19241935
}
19251936
MtmUnlock();
1937+
on_proc_exit(MtmOnProcExit, 0);
19261938
}
19271939

19281940
static void
19291941
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
19301942
{
1931-
elog(WARNING, "Logical replication to node %d is stopped", MtmReplicationNodeId);
1932-
MtmOnNodeDisconnect(MtmReplicationNodeId);
1943+
if (MtmReplicationNodeId >= 0) {
1944+
elog(WARNING, "Logical replication to node %d is stopped", MtmReplicationNodeId);
1945+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1946+
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
1947+
}
19331948
}
19341949

19351950
static bool
@@ -2159,14 +2174,34 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
21592174

21602175
*errmsg = palloc0(errlen);
21612176

2162-
/* Strip "ERROR:\t" from beginning and "\n" from end of error string */
2177+
/* Strip "ERROR: " from beginning and "\n" from end of error string */
21632178
strncpy(*errmsg, errstr + 8, errlen - 1 - 8);
21642179
}
21652180

21662181
PQclear(result);
21672182
return ret;
21682183
}
21692184

2185+
static void
2186+
MtmNoticeReceiver(void *i, const PGresult *res)
2187+
{
2188+
char *notice = PQresultErrorMessage(res);
2189+
char *stripped_notice;
2190+
int len = strlen(notice);
2191+
2192+
/* Skip notices from other nodes */
2193+
if ( (*(int *)i) != MtmNodeId - 1)
2194+
return;
2195+
2196+
stripped_notice = palloc0(len);
2197+
2198+
/* Strip "NOTICE: " from beginning and "\n" from end of error string */
2199+
strncpy(stripped_notice, notice + 9, len - 1 - 9);
2200+
2201+
elog(NOTICE, stripped_notice);
2202+
pfree(stripped_notice);
2203+
}
2204+
21702205
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
21712206
{
21722207
int i = 0;
@@ -2195,6 +2230,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
21952230
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[i].con.connStr, failedNode);
21962231
}
21972232
}
2233+
PQsetNoticeReceiver(conns[i], MtmNoticeReceiver, &i);
21982234
}
21992235
}
22002236
Assert(i == MtmNodes);
@@ -2211,9 +2247,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22112247
}
22122248
if (!MtmRunUtilityStmt(conns[i], sql, &utility_errmsg) && !ignoreError)
22132249
{
2214-
// errorMsg = "Failed to run EF5E command at node %d";
2215-
// XXX: add check for our node
2216-
errorMsg = utility_errmsg;
2250+
if (i + 1 == MtmNodeId)
2251+
errorMsg = utility_errmsg;
2252+
else
2253+
errorMsg = "Failed to run command at node %d";
22172254

22182255
failedNode = i;
22192256
break;
@@ -2418,6 +2455,23 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24182455
skipCommand = stmt->relation->relpersistence == RELPERSISTENCE_TEMP;
24192456
}
24202457
break;
2458+
case T_IndexStmt:
2459+
{
2460+
Oid relid;
2461+
Relation rel;
2462+
IndexStmt *stmt = (IndexStmt *) parsetree;
2463+
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
2464+
2465+
if (stmt->concurrent)
2466+
PreventTransactionChain(isTopLevel,
2467+
"CREATE INDEX CONCURRENTLY");
2468+
2469+
relid = RelnameGetRelid(stmt->relation->relname);
2470+
rel = heap_open(relid, ShareLock);
2471+
skipCommand = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
2472+
heap_close(rel, NoLock);
2473+
}
2474+
break;
24212475
default:
24222476
skipCommand = false;
24232477
break;

contrib/mmts/multimaster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
#include "pglogical_output/hooks.h"
99

1010
#define MTM_TUPLE_TRACE(fmt, ...)
11-
#if 1
11+
#if 0
1212
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1414
#else
15-
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
15+
#define MTM_INFO(fmt, ...)
1616
#define MTM_TRACE(fmt, ...)
1717
#endif
1818

contrib/mmts/pglogical_apply.c

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "parser/parse_relation.h"
5050

5151
#include "multimaster.h"
52+
#include "pglogical_relid_map.h"
5253

5354
typedef struct TupleData
5455
{
@@ -451,19 +452,28 @@ read_rel(StringInfo s, LOCKMODE mode)
451452
int relnamelen;
452453
int nspnamelen;
453454
RangeVar* rv;
454-
Oid relid;
455-
456-
rv = makeNode(RangeVar);
457-
458-
nspnamelen = pq_getmsgbyte(s);
459-
rv->schemaname = (char *) pq_getmsgbytes(s, nspnamelen);
460-
461-
relnamelen = pq_getmsgbyte(s);
462-
rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
463-
464-
relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);
465-
466-
return heap_open(relid, NoLock);
455+
Oid remote_relid = pq_getmsgint(s, 4);
456+
Oid local_relid;
457+
458+
local_relid = pglogical_relid_map_get(remote_relid);
459+
if (local_relid == InvalidOid) {
460+
rv = makeNode(RangeVar);
461+
462+
nspnamelen = pq_getmsgbyte(s);
463+
rv->schemaname = (char *) pq_getmsgbytes(s, nspnamelen);
464+
465+
relnamelen = pq_getmsgbyte(s);
466+
rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
467+
468+
local_relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);
469+
pglogical_relid_map_put(remote_relid, local_relid);
470+
} else {
471+
nspnamelen = pq_getmsgbyte(s);
472+
s->cursor += nspnamelen;
473+
relnamelen = pq_getmsgbyte(s);
474+
s->cursor += relnamelen;
475+
}
476+
return heap_open(local_relid, NoLock);
467477
}
468478

469479
static void

contrib/mmts/pglogical_proto.c

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "utils/typcache.h"
3737

3838
#include "multimaster.h"
39+
#include "pglogical_relid_map.h"
3940

4041
static bool MtmIsFilteredTxn;
4142

@@ -71,13 +72,15 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7172
uint8 nspnamelen;
7273
const char *relname;
7374
uint8 relnamelen;
74-
75+
Oid relid;
7576
if (MtmIsFilteredTxn) {
7677
return;
7778
}
7879

79-
pq_sendbyte(out, 'R'); /* sending RELATION */
80-
80+
relid = RelationGetRelid(rel);
81+
pq_sendbyte(out, 'R'); /* sending RELATION */
82+
pq_sendint(out, relid, sizeof relid); /* use Oid as relation identifier */
83+
8184
nspname = get_namespace_name(rel->rd_rel->relnamespace);
8285
if (nspname == NULL)
8386
elog(ERROR, "cache lookup failed for namespace %u",
@@ -86,10 +89,10 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
8689

8790
relname = NameStr(rel->rd_rel->relname);
8891
relnamelen = strlen(relname) + 1;
89-
92+
9093
pq_sendbyte(out, nspnamelen); /* schema name length */
9194
pq_sendbytes(out, nspname, nspnamelen);
92-
95+
9396
pq_sendbyte(out, relnamelen); /* table name length */
9497
pq_sendbytes(out, relname, relnamelen);
9598
}

contrib/mmts/pglogical_relid_map.c

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* pglogical_relid_map.c
4+
* Logical Replication map of local Oids to to remote
5+
*
6+
* Copyright (c) 2012-2015, PostgreSQL Global Development Group
7+
*
8+
* IDENTIFICATION
9+
* pglogical_relid_map.c
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
#include "postgres.h"
14+
#include "utils/hsearch.h"
15+
#include "pglogical_relid_map.h"
16+
17+
static HTAB *relid_map;
18+
19+
static void
20+
pglogical_relid_map_init(void)
21+
{
22+
HASHCTL ctl;
23+
int hash_flags = HASH_ELEM;
24+
25+
Assert(relid_map == NULL);
26+
27+
MemSet(&ctl, 0, sizeof(ctl));
28+
ctl.keysize = sizeof(Oid);
29+
ctl.entrysize = sizeof(PGLRelidMapEntry);
30+
31+
#if PG_VERSION_NUM >= 90500
32+
hash_flags |= HASH_BLOBS;
33+
#else
34+
ctl.hash = tag_hash;
35+
hash_flags |= HASH_FUNCTION;
36+
#endif
37+
38+
relid_map = hash_create("pglogical_relid_map", PGL_INIT_RELID_MAP_SIZE, &ctl, hash_flags);
39+
40+
Assert(relid_map != NULL);
41+
}
42+
43+
Oid pglogical_relid_map_get(Oid relid)
44+
{
45+
if (relid_map != NULL) {
46+
PGLRelidMapEntry* entry = (PGLRelidMapEntry*)hash_search(relid_map, &relid, HASH_FIND, NULL);
47+
return entry ? entry->local_relid : InvalidOid;
48+
}
49+
return InvalidOid;
50+
}
51+
52+
bool pglogical_relid_map_put(Oid remote_relid, Oid local_relid)
53+
{
54+
bool found;
55+
PGLRelidMapEntry* entry;
56+
if (relid_map == NULL) {
57+
pglogical_relid_map_init();
58+
}
59+
entry = hash_search(relid_map, &remote_relid, HASH_ENTER, &found);
60+
if (found) {
61+
Assert(entry->local_relid == local_relid);
62+
return false;
63+
}
64+
entry->local_relid = local_relid;
65+
return true;
66+
}

contrib/mmts/pglogical_relid_map.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#ifndef PGLOGICAL_RELID_MAP
2+
#define PGLOGICAL_RELID_MAP
3+
4+
#define PGL_INIT_RELID_MAP_SIZE 256
5+
6+
typedef struct PGLRelidMapEntry {
7+
Oid remote_relid;
8+
Oid local_relid;
9+
} PGLRelidMapEntry;
10+
11+
extern Oid pglogical_relid_map_get(Oid relid);
12+
extern bool pglogical_relid_map_put(Oid remote_relid, Oid local_relid);
13+
14+
#endif

src/backend/replication/logical/decode.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,9 +541,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541541
(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
542542
FilterByOrigin(ctx, origin_id))
543543
{
544-
elog(WARNING, "%d: WAL-SENDER ignore record %lx with origin %d: SnapBuildXactNeedsSkip=%d, FilterByOrigin=%d",
545-
getpid(), buf->origptr, origin_id,
546-
SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr), FilterByOrigin(ctx, origin_id));
547544
for (i = 0; i < parsed->nsubxacts; i++)
548545
{
549546
ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);

0 commit comments

Comments
 (0)
0