8000 Prepare to start raftable from multimaster · m99coder/postgres_cluster@cbcfff6 · GitHub
[go: up one dir, main page]

Skip to content

Commit cbcfff6

Browse files
committed
Prepare to start raftable from multimaster
1 parent 454fc31 commit cbcfff6

File tree

3 files changed

+57
-3
lines changed

3 files changed

+57
-3
lines changed

contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ ifndef RAFTABLE_PATH
55
RAFTABLE_PATH = ../raftable
66
endif
77

8-
override CPPFLAGS += -I$(RAFTABLE_PATH)
8+
override CPPFLAGS += -I$(RAFTABLE_PATH) -I$(RAFTABLE_PATH)/raft/include
99

1010
EXTRA_INSTALL = contrib/raftable contrib/mmts
1111

contrib/mmts/arbiter.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,16 @@ static void MtmOpenConnections()
488488
}
489489
for (i = 0; i < nNodes; i++) {
490490
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
491-
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, MtmArbiterPort + i + 1, MtmConnectTimeout);
491+
int arbiterPort;
492+
char const* arbiterPortStr = strstr(Mtm->nodes[i].con.connStr, "arbiterport=");
493+
if (arbiterPortStr != NULL) {
494+
if (sscanf(arbiterPortStr+12, "%d", &arbiterPort) != 1) {
495+
elog(ERROR, "Invalid arbiter port: %s", arbiterPortStr+12);
496+
}
497+
} else {
498+
arbiterPort = MtmArbiterPort + i + 1;
499+
}
500+
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, arbiterPort, MtmConnectTimeout);
492501
if (sockets[i] < 0) {
493502
MtmOnNodeDisconnect(i+1);
494503
}

contrib/mmts/multimaster.c

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "ddd.h"
6262
#include "raftable_wrapper.h"
6363
#include "raftable.h"
64+
#include "worker.h"
6465

6566
typedef struct {
6667
TransactionId xid; /* local transaction ID */
@@ -196,6 +197,7 @@ int MtmNodes;
196197
int MtmNodeId;
197198
int MtmReplicationNodeId;
198199
int MtmArbiterPort;
200+
int MtmRaftablePort;
199201
int MtmConnectTimeout;
200202
int MtmReconnectTimeout;
201203
int MtmNodeDisableDelay;
@@ -1796,6 +1798,33 @@ static void MtmSplitConnStrs(void)
17961798
pfree(copy);
17971799
}
17981800

1801+
static void MtmRaftableInitialize()
1802+
{
1803+
int i;
1804+
WorkerConfig wcfg;
1805+
1806+
for (i = 0; i < RAFTABLE_PEERS_MAX; i++)
1807+
{
1808+
wcfg.peers[i].up = false;
1809+
}
1810+
1811+
for (i = 0; i < MtmNodes; i++)
1812+
{
1813+
char const* raftport = strstr(MtmConnections[i].connStr, "raftport=");
1814+
if (raftport != NULL) {
1815+
if (sscanf(raftport+9, "%d", &wcfg.peers[i].port) != 1) {
1816+
elog(ERROR, "Invalid raftable port: %s", raftport+9);
1817+
}
1818+
} else {
1819+
wcfg.peers[i].port = MtmRaftablePort + i;
1820+
}
1821+
wcfg.peers[i].up = true;
1822+
strncpy(wcfg.peers[i].host, MtmConnections[i].hostName, sizeof(wcfg.peers[i].host));
1823+
}
1824+
wcfg.id = MtmNodeId-1;
1825+
worker_register(&wcfg);
1826+
}
1827+
17991828
void
18001829
_PG_init(void)
18011830
{
@@ -2052,7 +2081,22 @@ _PG_init(void)
20522081
"Base value for assigning arbiter ports",
20532082
NULL,
20542083
&MtmArbiterPort,
2055-
54321,
2084+
54320,
2085+
0,
2086+
INT_MAX,
2087+
PGC_BACKEND,
2088+
0,
2089+
NULL,
2090+
NULL,
2091+
NULL
2092+
);
2093+
2094+
DefineCustomIntVariable(
2095+
"multimaster.raftable_port",
2096+
"Base value for assigning raftable ports",
2097+
NULL,
2098+
&MtmRaftablePort,
2099+
6543,
20562100
0,
20572101
INT_MAX,
20582102
PGC_BACKEND,
@@ -2133,6 +2177,7 @@ _PG_init(void)
21332177

21342178
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
21352179

2180+
//MtmRaftableInitialize();
21362181
MtmArbiterInitialize();
21372182

21382183
/*

0 commit comments

Comments
 (0)
0