6
6
#include <unistd.h>
<
9E88
td data-grid-cell-id="diff-51a316648705cc6214db5217ef2de40483c035bbb06feac36e9e9e8ee262a5cf-7-7-0" data-selected="false" role="gridcell" style="background-color:var(--bgColor-default);text-align:center" tabindex="-1" valign="top" class="focusable-grid-cell diff-line-number position-relative diff-line-number-neutral left-side">7
7
#include <stdlib.h>
8
8
#include <assert.h>
9
+ #include <time.h>
9
10
10
11
#include "libdtm.h"
11
12
#include "dtmd/include/proto.h"
13
+ #include "dtmd/include/dtmdlimits.h"
12
14
#include "sockhub/sockhub.h"
13
15
14
16
#ifdef TEST
@@ -24,23 +26,36 @@ typedef struct DTMConnData *DTMConn;
24
26
25
27
typedef struct DTMConnData
26
28
{
29
+ char * host ; // use unix socket if host is NULL
30
+ int port ;
27
31
int sock ;
28
32
} DTMConnData ;
29
33
30
- static char * dtmhost = NULL ;
31
- static int dtmport = 0 ;
32
- static char * dtm_unix_sock_dir ;
34
+ static bool connected = false;
35
+ static int leader = 0 ;
36
+ static int connum = 0 ;
37
+ static DTMConnData conns [MAX_SERVERS ];
38
+ static char * dtm_unix_sock_dir ;
33
39
34
40
typedef unsigned xid_t ;
35
41
42
+ static void DiscardConnection ()
43
+ {
44
+ connected = false;
45
+ leader = (leader + 1 ) % connum ;
46
+ fprintf (stderr , "next candidate is %s:%d\n" , conns [leader ].host , conns [leader ].port );
47
+ }
48
+
36
49
// Connects to the specified DTM.
37
- static DTMConn DtmConnect (char * host , int port )
50
+ static bool DtmConnect (DTMConn conn )
38
51
{
39
- DTMConn dtm ;
40
52
int sd ;
41
53
42
- if (host == NULL )
54
+ if (conn -> host == NULL )
43
55
{
56
+ perror ("unix socket not supported yet" );
57
+ * (int * )0 = 0 ;
58
+ /*
44
59
// use a UNIX socket
45
60
struct sockaddr sock;
46
61
int len = offsetof(struct sockaddr, sa_data) + snprintf(sock.sa_data, sizeof(sock.sa_data), "%s/p%u", dtm_unix_sock_dir, port);
@@ -49,17 +64,20 @@ static DTMConn DtmConnect(char *host, int port)
49
64
sd = socket(AF_UNIX, SOCK_STREAM, 0);
50
65
if (sd == -1)
51
66
{
67
+ DiscardConnection();
52
68
perror("failed to create a unix socket");
69
+ return false;
53
70
}
54
71
if (connect(sd, &sock, len) == -1)
55
72
{
73
+ DiscardConnection();
56
74
perror("failed to connect to the address");
57
75
close(sd);
58
- return NULL ;
76
+ return false ;
59
77
}
60
- dtm = malloc ( sizeof ( DTMConnData )) ;
61
- dtm -> sock = sd ;
62
- return dtm ;
78
+ conn->sock = sd ;
79
+ return (connected = true) ;
80
+ */
63
81
}
64
82
else
65
83
{
@@ -72,27 +90,16 @@ static DTMConn DtmConnect(char *host, int port)
72
90
memset (& hint , 0 , sizeof (hint ));
73
91
hint .ai_socktype = SOCK_STREAM ;
74
92
hint .ai_family = AF_INET ;
75
- snprintf (portstr , 6 , "%d" , port );
93
+ snprintf (portstr , 6 , "%d" , conn -> port );
76
94
hint .ai_protocol = getprotobyname ("tcp" )-> p_proto ;
77
95
78
- while (1 )
96
+ while (true )
79
97
{
80
- char * sep = strchr (host , ',' );
81
- if (sep != NULL )
82
- {
83
- * sep = '\0' ;
84
- }
85
- if (getaddrinfo (host , portstr , & hint , & addrs ))
98
+ if (getaddrinfo (conn -> host , portstr , & hint , & addrs ))
86
99
{
100
+ DiscardConnection ();
87
101
perror ("failed to resolve address" );
88
- if (sep == NULL )
89
- {
90
- return NULL ;
91
- }
92
- else
93
- {
94
- goto TryNextHost ;
95
- }
102
+ return false;
96
103
}
97
104
98
105
for (a = addrs ; a != NULL ; a = a -> ai_next )
@@ -102,39 +109,28 @@ static DTMConn DtmConnect(char *host, int port)
102
109
if (sd == -1 )
103
110
{
104
111
perror ("failed to create a socket" );
105
- goto TryNextHost ;
112
+ continue ;
106
113
}
107
114
setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , & one , sizeof (one ));
108
115
109
116
if (connect (sd , a -> ai_addr , a -> ai_addrlen ) == -1 )
110
117
{
111
118
perror ("failed to connect to an address" );
112
119
close (sd );
113
- goto TryNextHost ;
120
+ continue ;
114
121
}
115
122
116
123
// success
117
124
freeaddrinfo (addrs );
118
- dtm = malloc (sizeof (DTMConnData ));
119
- dtm -> sock = sd ;
120
- if (sep != NULL )
121
- {
122
- * sep = ',' ;
123
- }
124
- return dtm ;
125
+ conn -> sock = sd ;
126
+ return (connected = true);
125
127
}
126
128
freeaddrinfo (addrs );
127
- TryNextHost :
128
- if (sep == NULL )
129
- {
130
- break ;
131
- }
132
- * sep = ',' ;
133
- host = sep + 1 ;
134
129
}
135
130
}
131
+ DiscardConnection ();
136
132
fprintf (stderr , "could not connect\n" );
137
- return NULL ;
133
+ return false ;
138
134
}
139
135
140
136
static int dtm_recv_results (DTMConn dtm , int maxlen , xid_t * results )
@@ -150,11 +146,13 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
150
146
int newbytes = read (dtm -> sock , (char * )& msg + recved , needed - recved );
151
147
if (newbytes == -1 )
152
148
{
149
+ DiscardConnection ();
153
150
elog (ERROR , "Failed to recv results header from arbiter" );
154
151
return 0 ;
155
152
}
156
153
if (newbytes == 0 )
157
154
{
155
+ DiscardConnection ();
158
156
elog (ERROR , "Arbiter closed connection during recv" );
159
157
return 0 ;
160
158
}
@@ -174,11 +172,13 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
174
172
int newbytes = read (dtm -> sock , (char * )results + recved , needed - recved );
175
173
if (newbytes == -1 )
176
174
{
175
+ DiscardConnection ();
177
176
elog (ERROR , "Failed to recv results body from arbiter" );
178
177
return 0 ;
179
178
}
180
179
if (newbytes == 0 )
181
180
{
181
+ DiscardConnection ();
182
182
elog (ERROR , "Arbiter closed connection during recv" );
183
183
return 0 ;
184
184
}
@@ -223,6 +223,7 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
223
223
int newbytes = write (dtm -> sock , buf + sent , datasize - sent );
224
224
if (newbytes == -1 )
225
225
{
226
+ DiscardConnection ();
226
227
elog (ERROR , "Failed to send a command to arbiter" );
227
228
return false;
228
229
}
@@ -231,39 +232,75 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
231
232
return true;
232
233
}
233
234
234
- void DtmGlobalConfig (char * host , int port , char * sock_dir ) {
235
- if (dtmhost )
236
- {
237
- free (dtmhost );
238
- dtmhost = NULL ;
235
+ void DtmGlobalConfig (char * servers , char * sock_dir )
236
+ {
237
+ char * hstate , * pstate ;
238
+ char * hostport , * host , * portstr ;
239
+ int port ;
240
+
241
+ while (connum -- > 0 ) {
242
+ if (conns [connum ].host )
243
+ free (conns [connum ].host );
239
244
}
240
- if (host )
245
+
246
+ hostport = strtok_r (servers , " " , & hstate );
247
+ while (hostport )
241
248
{
242
- dtmhost = strdup (host );
249
+ //fprintf(stderr, "hostport = '%s'\n", hostport); sleep(1);
250
+ host = strtok_r (hostport , ":" , & pstate );
251
+ //fprintf(stderr, "host = '%s'\n", hostport); sleep(1);
252
+ if (!host ) break ;
253
+
254
+ portstr = strtok_r (NULL , ":" , & pstate );
255
+ //fprintf(stderr, "portstr = '%s'\n", portstr); sleep(1);
256
+ if (portstr )
257
+ port = atoi (portstr );
258
+ else
259
+ port = 5431 ;
260
+ //fprintf(stderr, "host = %d\n", port); sleep(1);
261
+
262
+ if (!sock_dir ) {
263
+ conns [connum ].host = strdup (host );
264
+ } else {
265
+ conns [connum ].host = NULL ;
266
+ }
267
+ conns [connum ].port = port ;
268
+ connum ++ ;
269
+
270
+ hostport = strtok_r (NULL , " " , & hstate );
243
271
}
244
- dtmport = port ;
272
+
245
273
dtm_unix_sock_dir = sock_dir ;
246
274
}
247
275
248
276
static DTMConn GetConnection ()
249
277
{
250
- static DTMConn dtm = NULL ;
251
- if ( dtm == NULL )
278
+ int tries = 3 * connum ;
279
+ while (! connected && ( tries > 0 ) )
252
280
{
253
- dtm = DtmConnect ( dtmhost , dtmport ) ;
254
- if (dtm == NULL )
281
+ DTMConn c = conns + leader ;
282
+ if (! DtmConnect ( c ) )
255
283
{
256
- if (dtmhost )
284
+ int timeout_ms = 100 ;
285
+ struct timespec timeout = {0 , timeout_ms * 1000000 };
286
+ nanosleep (& timeout , NULL );
287
+
288
+ tries -- ;
289
+ if (c -> host )
257
290
{
258
- elog (ERROR , "Failed to connect to DTMD at tcp %s:%d" , dtmhost , dtmport );
291
+ elog (ERROR , "Failed to connect to DTMD at tcp %s:%d" , c -> host , c -> port );
259
292
}
260
293
else
261
294
{
262
- elog (ERROR , "Failed to connect to DTMD at unix %d" , dtmport );
295
+ elog (ERROR , "Failed to connect to DTMD at unix %d" , c -> port );
263
296
}
264
297
}
265
298
}
266
- return dtm ;
299
+ if (!tries )
300
+ {
301
+ return NULL ;
302
+ }
303
+ return conns + leader ;
267
304
}
268
305
269
306
void DtmInitSnapshot (Snapshot snapshot )
@@ -333,6 +370,7 @@ TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
333
370
334
371
return xid ;
335
372
failure :
373
+ DiscardConnection ();
336
374
fprintf (stderr , "DtmGlobalStartTransaction: transaction failed to start\n" );
337
375
return INVALID_XID ;
338
376
}
@@ -368,6 +406,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *g
368
406
369
407
return ;
370
408
failure :
409
+ DiscardConnection ();
371
410
fprintf (
372
411
stderr ,
373
412
"DtmGlobalGetSnapshot: failed to"
@@ -414,6 +453,7 @@ XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait
414
453
goto failure ;
415
454
}
416
455
failure :
456
+ DiscardConnection ();
417
457
fprintf (
418
458
stderr ,
419
459
"DtmGlobalSetTransStatus: failed to vote"
@@ -453,6 +493,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
453
493
goto failure ;
454
494
}
455
495
failure :
496
+ DiscardConnection ();
456
497
fprintf (
457
498
stderr ,
458
499
"DtmGlobalGetTransStatus: failed to get"
@@ -491,6 +532,7 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
491
532
Assert (count >= nXids );
492
533
return count ;
493
534
failure :
535
+ DiscardConnection ();
494
536
fprintf (
495
537
stderr ,
496
538
"DtmGlobalReserve: failed to reserve"
0 commit comments