8000 Support replication of DDL · postgrespro/postgres_cluster@1db67dc · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 1db67dc

Browse files
committed
Support replication of DDL
1 parent 29c7901 commit 1db67dc

File tree

4 files changed

+258
-87
lines changed

4 files changed

+258
-87
lines changed

contrib/multimaster/dtmd/src/server.c

Lines changed: 116 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -16,53 +16,61 @@
1616
#include <netinet/tcp.h>
1717
#include <netinet/in.h>
1818

19+
#ifdef USE_EPOLL
20+
#include <sys/epoll.h>
21+
#endif
22+
1923
#include "server.h"
2024
#include "limits.h"
2125
#include "util.h"
2226
#include "sockhub.h"
2327

2428
typedef struct buffer_t {
25-
int ready; // number of bytes that are ready to be sent/processed
29+
int ready; /* number of bytes that are ready to be sent/processed */
2630
ShubMessageHdr *curmessage;
27-
char *data; // dynamically allocated buffer
31+
char *data; /* dynamically allocated buffer */
2832
} buffer_t;
2933

3034
typedef struct stream_data_t *stream_t;
3135

3236
typedef struct client_data_t {
33-
stream_t stream; // NULL: client value is empty
37+
stream_t stream; /* NULL: client value is empty */
3438
void *userdata;
3539
unsigned int chan;
3640
} client_data_t;
3741

3842
typedef struct stream_data_t {
3943
int fd;
40-
bool good; // 'false': stop serving this stream and disconnect when possible
44+
bool good; /* 'false': stop serving this stream and disconnect when possible */
4145
buffer_t input;
4246
buffer_t output;
4347

44-
// a map: 'chan' -> client_data_t
45-
// 'chan' is expected to be < MAX_FDS which is pretty low
46-
client_data_t *clients; // dynamically allocated
48+
/* a map: 'chan' -> client_data_t */
49+
/* 'chan' is expected to be < MAX_FDS which is pretty low */
50+
client_data_t *clients; /* dynamically allocated */
51+
struct stream_data_t* next;
4752
} stream_data_t;
4853

4954
typedef struct server_data_t {
5055
char *host;
5156
int port;
5257

53-
int listener; // the listening socket
54-
fd_set all; // all sockets including the listener
58+
int listener; /* the listening socket */
59+
#ifdef USE_EPOLL
60+
int epollfd;
61+
#else
62+
fd_set all; /* all sockets including the listener */
5563
int maxfd;
56-
57-
int streamsnum;
58-
stream_data_t streams[MAX_STREAMS];
64+
#endif
65+
stream_t used_chain;
66+
stream_t free_chain;
5967

6068
onmessage_callback_t onmessage;
6169
onconnect_callback_t onconnect;
6270
ondisconnect_callback_t ondisconnect;
6371
} server_data_t;
6472

65-
// Returns the created socket, or -1 if failed.
73+
/* Returns the created socket, or -1 if failed. */
6674
static int create_listening_socket(const char *host, int port) {
6775
int s = socket(AF_INET, SOCK_STREAM, 0);
6876
if (s == -1) {
@@ -113,32 +121,56 @@ server_t server_init(
113121
return server;
114122
}
115123

124+
bool register_socket(server_t server, int fd, stream_t stream)
125+
{
126+
#ifdef USE_EPOLL
127+
struct epoll_event ev;
128+
ev.events = EPOLLIN;
129+
ev.data.ptr = (void*)stream;
130+
if (epoll_ctl(server->epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
131+
return false;
132+
}
133+
#else
134+
FD_SET(fd, &server->all);
135+
if (fd > server->maxfd) {
136+
server->maxfd = fd;
137+
}
138+
#endif
139+
return true;
140+
}
141+
116142
bool server_start(server_t server) {
117143
debug("starting the server\n");
118-
server->streamsnum = 0;
119-
144+
server->free_chain = NULL;
145+
server->used_chain = NULL;
146+
120147
server->listener = create_listening_socket(server->host, server->port);
121148
if (server->listener == -1) {
122149
return false;
123150
}
124151

152+
#ifdef USE_EPOLL
153+
server->epollfd = epoll_create(MAX_EVENTS);
154+
if (server->epollfd < 0) {
155+
return false;
156+
}
157+
#else
125158
FD_ZERO(&server->all);
126-
FD_SET(server->listener, &server->all);
127-
server->maxfd = server->listener;
128-
129-
return true;
159+
server->maxfd = 0;
160+
#endif
161+
return register_socket(server, server->listener, NULL);
130162
}
131163

132164
static bool stream_flush(stream_t stream) {
133165
int tosend = stream->output.ready;
134166
if (tosend == 0) {
135-
// nothing to do
167+
/* nothing to do */
136168
return true;
137169
}
138170

139171
char *cursor = stream->output.data;
140172
while (tosend > 0) {
141-
// repeat sending until we send everything
173+
/* repeat sending until we send everything */
142174
int sent = send(stream->fd, cursor, tosend, 0);
143175
if (sent == -1) {
144176
shout("failed to flush the stream\n");
@@ -153,7 +185,7 @@ static bool stream_flush(stream_t stream) {
153185
stream->output.ready = 0;
154186
ShubMessageHdr *msg = stream->output.curmessage;
155187
if (msg) {
156-
// move the unfinished message to the start of the buffer
188+
/* move the unfinished message to the start of the buffer */
157189
memmove(stream->output.data, msg, msg->size + sizeof(ShubMessageHdr));
158190
stream->output.curmessage = (ShubMessageHdr*)stream->output.data;
159191
}
@@ -163,10 +195,9 @@ static bool stream_flush(stream_t stream) {
163195

164196
static void server_flush(server_t server) {
165197
debug("flushing the streams\n");
166-
int i;
167-
for (i = 0; i < server->streamsnum; i++) {
168-
stream_t stream = server->streams + i;
169-
stream_flush(stream);
198+
stream_t s;
199+
for (s = server->used_chain; s != NULL; s = s->next) {
200+
stream_flush(s);
170201
}
171202
}
172203

@@ -187,7 +218,7 @@ static void stream_init(stream_t stream, int fd) {
187218

188219
stream->clients = malloc(MAX_TRANSACTIONS * sizeof(client_data_t));
189220
assert(stream->clients);
190-
// mark all clients as empty
221+
/* mark all clients as empty */
191222
for (i = 0; i < MAX_TRANSACTIONS; i++) {
192223
stream->clients[i].stream = NULL;
193224
}
@@ -207,36 +238,28 @@ static void server_stream_destroy(server_t server, stream_t stream) {
207238
}
208239
}
209240
}
210-
211-
FD_CLR(stream->fd, &server->all);
241+
#ifdef USE_EPOLL
242+
epoll_ctl(server->epollfd, EPOLL_CTL_DEL, stream->fd, NULL);
243+
#else
244+
FD_CLR(stream->fd, &server->all);
245+
#endif
212246
close(stream->fd);
213247
free(stream->clients);
214248
free(stream->input.data);
215249
free(stream->output.data);
216250
}
217251

218-
static void stream_move(stream_t dst, stream_t src) {
219-
int i;
220-
*dst = *src;
221-
for (i = 0; i < MAX_TRANSACTIONS; i++) {
222-
if (dst->clients[i].stream) {
223-
dst->clients[i].stream = dst;
224-
}
225-
}
226-
}
227-
228252
static void server_close_bad_streams(server_t server) {
229-
int i;
230-
for (i = server->streamsnum - 1; i >= 0; i--) {
231-
stream_t stream = server->streams + i;
232-
if (!stream->good) {
233-
server_stream_destroy(server, stream);
234-
if (i != server->streamsnum - 1) {
235-
// move the last one here
236-
*stream = server->streams[server->streamsnum - 1];
237-
stream_move(stream, server->streams + server->streamsnum - 1);
238-
}
239-
server->streamsnum--;
253+
stream_t s, next, *spp;
254+
for (spp = &server->used_chain; (s = *spp) != NULL; s = next) {
255+
next = s->next;
256+
if (!s->good) {
257+
server_stream_destroy(server, s);
258+
*spp = next;
259+
s->next = server->free_chain;
260+
server->free_chain = s;
261+
} else {
262+
spp = &s->next;
240263
}
241264
}
242265
}
@@ -279,7 +302,7 @@ static bool stream_message_append(stream_t stream, size_t len, void *data) {
279302

280303
int newsize = stream->output.curmessage->size + sizeof(ShubMessageHdr) + len;
281304
if (newsize > BUFFER_SIZE) {
282-
// the flushing will not help here
305+
/* the flushing will not help here */
283306
shout("the message cannot be bigger than the buffer size\n");
284307
stream->good = false;
285308
return false;
@@ -326,7 +349,8 @@ bool client_message_finish(client_t client) {
326349
return stream_message_finish(client->stream);
327350
}
328351

329-
bool client_message_shortcut(client_t client, xid_t arg) {
352+
bool client_message_shortcut(client_t client, xid_t arg)
353+
{
330354
if (!stream_message_start(client->stream, client->chan)) {
331355
return false;
332356
}
@@ -348,36 +372,33 @@ static bool server_accept(server_t server) {
348372
return false;
349373
}
350374
debug("a new connection accepted\n");
351-
352-
if (server->streamsnum >= MAX_STREAMS) {
353-
shout("streams limit hit, disconnecting the accepted connection\n");
354-
close(fd);
355-
return false;
375+
376+
stream_t s = server->free_chain;
377+
if (s == NULL) {
378+
s = malloc(sizeof(stream_data_t));
379+
} else {
380+
server->free_chain = s->next;
356381
}
382+
/* add new stream */
383+
s->next = server->used_chain;
384+
server->used_chain = s;
357385

358-
// add new stream
359-
stream_t s = server->streams + server->streamsnum++;
360386
stream_init(s, fd);
361387

362-
FD_SET(fd, &server->all);
363-
if (fd > server->maxfd) {
364-
server->maxfd = fd;
365-
}
366-
367-
return true;
388+
return register_socket(server, fd, s);
368389
}
369390

370391
static client_t stream_get_client(stream_t stream, unsigned int chan, bool *isnew) {
371392
assert(chan < MAX_TRANSACTIONS);
372393
client_t client = stream->clients + chan;
373394
if (client->stream == NULL) {
374-
// client is new
395+
/* client is new */
375396
client->stream = stream;
376397
client->chan = chan;
377398
*isnew = true;
378399
client->userdata = NULL;
379400
} else {
380-
// collisions should not happen
401+
/* collisions should not happen */
381402
assert(client->chan == chan);
382403
*isnew = false;
383404
}
@@ -412,7 +433,7 @@ static bool server_stream_handle(server_t server, stream_t stream) {
412433
ShubMessageHdr *msg = (ShubMessageHdr*)cursor;
413434
int header_and_data = sizeof(ShubMessageHdr) + msg->size;
414435
if (header_and_data <= toprocess) {
415-
// handle message
436+
/* handle message */
416437
bool isnew;
417438
client_t client = stream_get_client(stream, msg->chan, &isnew);
418439
if (isnew) {
@@ -457,9 +478,30 @@ static bool server_stream_handle(server_t server, stream_t stream) {
457478
void server_loop(server_t server) {
458479
while (1) {
459480
int i;
481+
int numready;
482+
#ifdef USE_EPOLL
483+
struct epoll_event events[MAX_EVENTS];
484+
numready = epoll_wait(server->epollfd, events, MAX_EVENTS, -1);
485+
if (numready < 0) {
486+
shout("failed to select: %s\n", strerror(errno));
487+
return;
488+
}
489+
for (i = 0; i < numready; i++) {
490+
stream_t stream = (stream_t)events[i].data.ptr;
491+
if (stream == NULL) {
492+
server_accept(server);
493+
} else {
494+
if (events[i].events & EPOLLERR) {
495+
stream->good = false;
496+
} else if (events[i].events & EPOLLIN) {
497+
server_stream_handle(server, stream);
498+
}
499+
}
500+
}
501+
#else
460502
fd_set readfds = server->all;
461-
debug("selecting\n");
462503
int numready = select(server->maxfd + 1, &readfds, NULL, NULL, NULL);
504+
stream_t s;
463505
if (numready == -1) {
464506
shout("failed to select: %s\n", strerror(errno));
465507
return;
@@ -470,14 +512,13 @@ void server_loop(server_t server) {
470512
server_accept(server);
471513
}
472514

473-
for (i = 0; (i < server->streamsnum) && (numready > 0); i++) {
474-
stream_t stream = server->streams + i;
475-
if (FD_ISSET(stream->fd, &readfds)) {
476-
server_stream_handle(server, stream);
515+
for (s = server_used_chain; s != NULL && numready > 0; s = s->next) {
516+
if (FD_ISSET(s->fd, &readfds)) {
517+
server_stream_handle(server, s);
477518
numready--;
478519
}
479520
}
480-
521+
#endif
481522
server_close_bad_streams(server);
482523
server_flush(server);
483524
}
@@ -501,7 +542,7 @@ unsigned client_get_ip_addr(client_t client)
501542
}
502543

503544
#if 0
504-
// usage example
545+
/* usage example */
505546

506547
void test_onconnect(client_t client) {
507548
char *name = "hello";

0 commit comments

Comments
 (0)
0