8000 Add a crude support of multiple DTMDs into pg_dtm. Fix some bugs. · postgrespro/postgres_cluster@e7d1e2f · GitHub
[go: up one dir, main page]

Skip to content

Commit e7d1e2f

Browse files
committed
Add a crude support of multiple DTMDs into pg_dtm. Fix some bugs.
1 parent 3ce1ab5 commit e7d1e2f

File tree

6 files changed

+191
-126
lines changed

6 files changed

+191
-126
lines changed

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,13 @@ static void onvote(cl 8000 ient_t client, int argc, xid_t *argv, int vote) {
417417
}
418418
return;
419419
case DOUBT:
420-
if (!wait) {
420+
if (wait) {
421+
CHECK(
422+
queue_for_transaction_finish(client, xid, 's'),
423+
client,
424+
"VOTE: couldn't queue for transaction finish"
425+
);
426+
} else {
421427
client_message_shortcut(client, RES_TRANSACTION_INPROGRESS);
422428
}
423429
return;
@@ -783,25 +789,32 @@ int main(int argc, char **argv) {
783789
mstimer_reset(&t);
784790
while (true) {
785791
int ms = mstimer_reset(&t);
786-
raft_tick(&raft, ms);
787-
788-
// The client interaction is done in server_loop.
789792
raft_msg_t *m = NULL;
793+
794+
if (use_raft) {
795+
raft_tick(&raft, ms);
796+
}
797+
798+
// The client interaction is done in server_tick.
790799
if (server_tick(server, HEARTBEAT_TIMEOUT_MS)) {
791800
m = raft_recv_message(&raft);
792801
assert(m); // m should not be NULL, because the message should be ready to recv
793802
}
794803

795-
int applied = raft_apply(&raft, apply_clog_update);
796-
if (applied) {
797-
shout("applied %d updates\n", applied);
798-
}
804+
if (use_raft) {
805+
int applied = raft_apply(&raft, apply_clog_update);
806+
if (applied) {
807+
shout("applied %d updates\n", applied);
808+
}
799809

800-
if (m) {
801-
raft_handle_message(&raft, m);
802-
}
810+
if (m) {
811+
raft_handle_message(&raft, m);
812+
}
803813

804-
server_set_enabled(server, raft.role == ROLE_LEADER);
814+
server_set_enabled(server, raft.role == ROLE_LEADER);
815+
} else {
816+
server_set_enabled(server, true);
817+
}
805818
}
806819

807820
clog_close(clg);

contrib/pg_dtm/libdtm.c

Lines changed: 101 additions & 59 deletions
< 9E88 td data-grid-cell-id="diff-51a316648705cc6214db5217ef2de40483c035bbb06feac36e9e9e8ee262a5cf-7-7-0" data-selected="false" role="gridcell" style="background-color:var(--bgColor-default);text-align:center" tabindex="-1" valign="top" class="focusable-grid-cell diff-line-number position-relative diff-line-number-neutral left-side">7
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
#include <unistd.h>
7
#include <stdlib.h>
88
#include <assert.h>
9+
#include <time.h>
910

1011
#include "libdtm.h"
1112
#include "dtmd/include/proto.h"
13+
#include "dtmd/include/dtmdlimits.h"
1214
#include "sockhub/sockhub.h"
1315

1416
#ifdef TEST
@@ -24,23 +26,36 @@ typedef struct DTMConnData *DTMConn;
2426

2527
typedef struct DTMConnData
2628
{
29+
char *host; // use unix socket if host is NULL
30+
int port;
2731
int sock;
2832
} DTMConnData;
2933

30-
static char *dtmhost = NULL;
31-
static int dtmport = 0;
32-
static char* dtm_unix_sock_dir;
34+
static bool connected = false;
35+
static int leader = 0;
36+
static int connum = 0;
37+
static DTMConnData conns[MAX_SERVERS];
38+
static char *dtm_unix_sock_dir;
3339

3440
typedef unsigned xid_t;
3541

42+
static void DiscardConnection()
43+
{
44+
connected = false;
45+
leader = (leader + 1) % connum;
46+
fprintf(stderr, "next candidate is %s:%d\n", conns[leader].host, conns[leader].port);
47+
}
48+
3649
// Connects to the specified DTM.
37-
static DTMConn DtmConnect(char *host, int port)
50+
static bool DtmConnect(DTMConn conn)
3851
{
39-
DTMConn dtm;
4052
int sd;
4153

42-
if (host == NULL)
54+
if (conn->host == NULL)
4355
{
56+
perror("unix socket not supported yet");
57+
*(int*)0 = 0;
58+
/*
4459
// use a UNIX socket
4560
struct sockaddr sock;
4661
int len = offsetof(struct sockaddr, sa_data) + snprintf(sock.sa_data, sizeof(sock.sa_data), "%s/p%u", dtm_unix_sock_dir, port);
@@ -49,17 +64,20 @@ static DTMConn DtmConnect(char *host, int port)
4964
sd = socket(AF_UNIX, SOCK_STREAM, 0);
5065
if (sd == -1)
5166
{
67+
DiscardConnection();
5268
perror("failed to create a unix socket");
69+
return false;
5370
}
5471
if (connect(sd, &sock, len) == -1)
5572
{
73+
DiscardConnection();
5674
perror("failed to connect to the address");
5775
close(sd);
58-
return NULL;
76+
return false;
5977
}
60-
dtm = malloc(sizeof(DTMConnData));
61-
dtm->sock = sd;
62-
return dtm;
78+
conn->sock = sd;
79+
return (connected = true);
80+
*/
6381
}
6482
else
6583
{
@@ -72,27 +90,16 @@ static DTMConn DtmConnect(char *host, int port)
7290
memset(&hint, 0, sizeof(hint));
7391
hint.ai_socktype = SOCK_STREAM;
7492
hint.ai_family = AF_INET;
75-
snprintf(portstr, 6, "%d", port);
93+
snprintf(portstr, 6, "%d", conn->port);
7694
hint.ai_protocol = getprotobyname("tcp")->p_proto;
7795

78-
while (1)
96+
while (true)
7997
{
80-
char* sep = strchr(host, ',');
81-
if (sep != NULL)
82-
{
83-
*sep = '\0';
84-
}
85-
if (getaddrinfo(host, portstr, &hint, &addrs))
98+
if (getaddrinfo(conn->host, portstr, &hint, &addrs))
8699
{
100+
DiscardConnection();
87101
perror("failed to resolve address");
88-
if (sep == NULL)
89-
{
90-
return NULL;
91-
}
92-
else
93-
{
94-
goto TryNextHost;
95-
}
102+
return false;
96103
}
97104

98105
for (a = addrs; a != NULL; a = a->ai_next)
@@ -102,39 +109,28 @@ static DTMConn DtmConnect(char *host, int port)
102109
if (sd == -1)
103110
{
104111
perror("failed to create a socket");
105-
goto TryNextHost;
112+
continue;
106113
}
107114
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
108115

109116
if (connect(sd, a->ai_addr, a->ai_addrlen) == -1)
110117
{
111118
perror("failed to connect to an address");
112119
close(sd);
113-
goto TryNextHost;
120+
continue;
114121
}
115122

116123
// success
117124
freeaddrinfo(addrs);
118-
dtm = malloc(sizeof(DTMConnData));
119-
dtm->sock = sd;
120-
if (sep != NULL)
121-
{
122-
*sep = ',';
123-
}
124-
return dtm;
125+
conn->sock = sd;
126+
return (connected = true);
125127
}
126128
freeaddrinfo(addrs);
127-
TryNextHost:
128-
if (sep == NULL)
129-
{
130-
break;
131-
}
132-
*sep = ',';
133-
host = sep + 1;
134129
}
135130
}
131+
DiscardConnection();
136132
fprintf(stderr, "could not connect\n");
137-
return NULL;
133+
return false;
138134
}
139135

140136
static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
@@ -150,11 +146,13 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
150146
int newbytes = read(dtm->sock, (char*)&msg + recved, needed - recved);
151147
if (newbytes == -1)
152148
{
149+
DiscardConnection();
153150
elog(ERROR, "Failed to recv results header from arbiter");
154151
return 0;
155152
}
156153
if (newbytes == 0)
157154
{
155+
DiscardConnection();
158156
elog(ERROR, "Arbiter closed connection during recv");
159157
return 0;
160158
}
@@ -174,11 +172,13 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
174172
int newbytes = read(dtm->sock, (char*)results + recved, needed - recved);
175173
if (newbytes == -1)
176174
{
175+
DiscardConnection();
177176
elog(ERROR, "Failed to recv results body from arbiter");
178177
return 0;
179178
}
180179
if (newbytes == 0)
181180
{
181+
DiscardConnection();
182182
elog(ERROR, "Arbiter closed connection during recv");
183183
return 0;
184184
}
@@ -223,6 +223,7 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
223223
int newbytes = write(dtm->sock, buf + sent, datasize - sent);
224224
if (newbytes == -1)
225225
{
226+
DiscardConnection();
226227
elog(ERROR, "Failed to send a command to arbiter");
227228
return false;
228229
}
@@ -231,39 +232,75 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
231232
return true;
232233
}
233234

234-
void DtmGlobalConfig(char *host, int port, char* sock_dir) {
235-
if (dtmhost)
236-
{
237-
free(dtmhost);
238-
dtmhost = NULL;
235+
void DtmGlobalConfig(char *servers, char *sock_dir)
236+
{
237+
char *hstate, *pstate;
238+
char *hostport, *host, *portstr;
239+
int port;
240+
241+
while (connum-- > 0) {
242+
if (conns[connum].host)
243+
free(conns[connum].host);
239244
}
240-
if (host)
245+
246+
hostport = strtok_r(servers, " ", &hstate);
247+
while (hostport)
241248
{
242-
dtmhost = strdup(host);
249+
//fprintf(stderr, "hostport = '%s'\n", hostport); sleep(1);
250+
host = strtok_r(hostport, ":", &pstate);
251+
//fprintf(stderr, "host = '%s'\n", hostport); sleep(1);
252+
if (!host) break;
253+
254+
portstr = strtok_r(NULL, ":", &pstate);
255+
//fprintf(stderr, "portstr = '%s'\n", portstr); sleep(1);
256+
if (portstr)
257+
port = atoi(portstr);
258+
else
259+
port = 5431;
260+
//fprintf(stderr, "host = %d\n", port); sleep(1);
261+
262+
if (!sock_dir) {
263+
conns[connum].host = strdup(host);
264+
} else {
265+
conns[connum].host = NULL;
266+
}
267+
conns[connum].port = port;
268+
connum++;
269+
270+
hostport = strtok_r(NULL, " ", &hstate);
243271
}
244-
dtmport = port;
272+
245273
dtm_unix_sock_dir = sock_dir;
246274
}
247275

248276
static DTMConn GetConnection()
249277
{
250-
static DTMConn dtm = NULL;
251-
if (dtm == NULL)
278+
int tries = 3 * connum;
279+
while (!connected && (tries > 0))
252280
{
253-
dtm = DtmConnect(dtmhost, dtmport);
254-
if (dtm == NULL)
281+
DTMConn c = conns + leader;
282+
if (!DtmConnect(c))
255283
{
256-
if (dtmhost)
284+
int timeout_ms = 100;
285+
struct timespec timeout = {0, timeout_ms * 1000000};
286+
nanosleep(&timeout, NULL);
287+
288+
tries--;
289+
if (c->host)
257290
{
258-
elog(ERROR, "Failed to connect to DTMD at tcp %s:%d", dtmhost, dtmport);
291+
elog(ERROR, "Failed to connect to DTMD at tcp %s:%d", c->host, c->port);
259292
}
260293
else
261294
{
262-
elog(ERROR, "Failed to connect to DTMD at unix %d", dtmport);
295+
elog(ERROR, "Failed to connect to DTMD at unix %d", c->port);
263296
}
264297
}
265298
}
266-
return dtm;
299+
if (!tries)
300+
{
301+
return NULL;
302+
}
303+
return conns + leader;
267304
}
268305

269306
void DtmInitSnapshot(Snapshot snapshot)
@@ -333,6 +370,7 @@ TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
333370

334371
return xid;
335372
failure:
373+
DiscardConnection();
336374
fprintf(stderr, "DtmGlobalStartTransaction: transaction failed to start\n");
337375
return INVALID_XID;
338376
}
@@ -368,6 +406,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *g
368406

369407
return;
370408
failure:
409+
DiscardConnection();
371410
fprintf(
372411
stderr,
373412
"DtmGlobalGetSnapshot: failed to"
@@ -414,6 +453,7 @@ XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait
414453
goto failure;
415454
}
416455
failure:
456+
DiscardConnection();
417457
fprintf(
418458
stderr,
419459
"DtmGlobalSetTransStatus: failed to vote"
@@ -453,6 +493,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
453493
goto failure;
454494
}
455495
failure:
496+
DiscardConnection();
456497
fprintf(
457498
stderr,
458499
"DtmGlobalGetTransStatus: failed to get"
@@ -491,6 +532,7 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
491532
Assert(count >= nXids);
492533
return count;
493534
failure:
535+
DiscardConnection();
494536
fprintf(
495537
stderr,
496538
"DtmGlobalReserve: failed to reserve"

contrib/pg_dtm/libdtm.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88

99
#define INVALID_XID 0
1010

11-
// Sets up the host and port for DTM connection.
12-
// The defaults are "127.0.0.1" and 5431.
13-
void DtmGlobalConfig(char *host, int port, char* sock_dir);
11+
// Sets up the servers and the unix sockdir for DTM connections.
12+
void DtmGlobalConfig(char *servers, char *sock_dir);
1413

1514
void DtmInitSnapshot(Snapshot snapshot);
1615

0 commit comments

Comments
 (0)
0