10000 WIP on binary protocol DTMD with sockhub. · postgrespro/postgres_cluster@bf89fd4 · GitHub
[go: up one dir, main page]

Skip to content

Commit bf89fd4

Browse files
committed
WIP on binary protocol DTMD with sockhub.
1 parent cbd2783 commit bf89fd4

File tree

7 files changed

+206
-337
lines changed

7 files changed

+206
-337
lines changed

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
#define MAX_TRANSACTIONS_PER_CLIENT 1024
55
#define MAX_TRANSACTIONS 1024
66

7-
#define BITS_PER_NODE 4
8-
#define MAX_NODES (1 << BITS_PER_NODE)
7+
#define BUFFER_SIZE (64 * 1024)
8+
#define LISTEN_QUEUE_SIZE 100
9+
#define MAX_STREAMS 128
910

1011
#endif

contrib/pg_dtm/dtmd/include/parser.h

Lines changed: 0 additions & 47 deletions
This file was deleted.

contrib/pg_dtm/dtmd/include/server.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#ifndef SERVER_H
2+
#define SERVER_H
3+
4+
/*
5+
* You should not want to know what is inside those structures.
6+
*/
7+
typedef struct server_data_t *server_t;
8+
typedef struct client_data_t *client_t;
9+
10+
/*
11+
* The server will call this function whenever it gets a message ('len' bytes
12+
* of 'data') from the 'client'.
13+
*/
14+
typedef void (*onmessage_callback_t)(client_t client, size_t len, char *data);
15+
16+
/*
17+
* The server will call this function whenever a new 'client' send the first
18+
* message. This callback gets called before the 'onmessage'.
19+
*/
20+
typedef void (*onconnect_callback_t)(client_t client);
21+
22+
/*
23+
* The server will call this function whenever it considers the 'client'
24+
* disconnected.
25+
*/
26+
typedef void (*ondisconnect_callback_t)(client_t client);
27+
28+
/*
29+
* Creates a new server that will listen on 'host:port' and call the specified
30+
* callbacks. Returns the server handle to use in other methods.
31+
*/
32+
server_t server_init(
33+
char *host,
34+
int port,
35+
onmessage_callback_t onmessage,
36+
onconnect_callback_t onconnect,
37+
ondisconnect_callback_t ondisconnect,
38+
);
39+
40+
/*
41+
* Starts the server. Returns 'true' on success, 'false' otherwise.
42+
*/
43+
bool server_start(server_t server);
44+
45+
/*
46+
* The main server loop. Does not return, so use the callbacks and signal
47+
* handlers to add more logic.
48+
*/
49+
void server_loop(server_t server);
50+
51+
/*
52+
* These two methods allow you to set and get your custom 'userdata' for the
53+
* 'client'. The server does not care about this data and will not free it on
54+
* client disconnection.
55+
*/
56+
void client_set_data(client_t client, void *userdata);
57+
void *client_get_data(client_t client);
58+
59+
/*
60+
* Puts an empty message header into the output buffer of the corresponding
61+
* socket. The message will not be sent until you call the _finish() method.
62+
* A call to this function may lead to a send() call if there is not enough
63+
* space in the buffer.
64+
*
65+
* Returns 'true' on success, 'false' otherwise.
66+
*
67+
* NOTE: Be careful not to call the _message_ methods for other clients until
68+
* you _finish() this message. This limitation is due to the fact that multiple
69+
* clients share the same socket.
70+
*/
71+
bool client_message_start(client_t client);
72+
73+
/*
74+
* Appends 'len' bytes of 'data' to the buffer of the corresponding socket.
75+
* A call to this function may lead to a send() call if there is not enough
76+
* space in the buffer.
77+
*
78+
* Returns 'true' on success, 'false' otherwise.
79+
*/
80+
bool client_message_append(client_t client, size_t len, void *data);
81+
82+
/*
83+
* Finalizes the message. After finalizing the message becomes ready to be sent
84+
* over the corresponding socket, and you may _start() another message.
85+
*
86+
* Returns 'true' on success, 'false' otherwise.
87+
*/
88+
bool client_message_finish(client_t client);
89+
90+
#endif

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 0 additions & 21 deletions
B41A
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include <sys/wait.h>
1010

1111
#include "clog.h"
12-
#include "parser.h"
1312
#include "eventwrap.h"
1413
#include "util.h"
1514
#include "intset.h"
@@ -28,29 +27,25 @@ xid_t prev_gxid, next_gxid;
2827

2928
typedef struct client_data_t {
3029
int id;
31-
parser_t parser;
3230
int snapshots_sent;
3331
xid_t xid;
3432
} client_data_t;
3533

3634
clog_t clg;
3735

3836
#define CLIENT_ID(X) (((client_data_t*)(X))->id)
39-
#define CLIENT_PARSER(X) (((client_data_t*)(X))->parser)
4037
#define CLIENT_SNAPSENT(X) (((client_data_t*)(X))->snapshots_sent)
4138
#define CLIENT_XID(X) (((client_data_t*)(X))->xid)
4239

4340
static client_data_t *create_client_data(int id) {
4441
client_data_t *cd = malloc(sizeof(client_data_t));
4542
cd->id = id;
46-
cd->parser = parser_create();
4743
cd->snapshots_sent = 0;
4844
cd->xid = INVALID_XID;
4945
return cd;
5046
}
5147

5248
static void free_client_data(client_data_t *cd) {
53-
parser_destroy(cd->parser);
5449
free(cd);
5550
}
5651

@@ -547,30 +542,14 @@ static char *oncmd(void *stream, void *clientdata, cmd_t *cmd) {
547542

548543
static char *ondata(void *stream, void *clientdata, size_t len, char *data) {
549544
int i;
550-
parser_t parser = CLIENT_PARSER(clientdata);
551545
char *response = NULL;
552546

553-
// The idea is to feed each character through
554-
// the parser, which will return a cmd from
555-
// time to time.
556547
for (i = 0; i < len; i++) {
557548
if (data[i] == '\n') {
558549
// ignore newlines (TODO: should we ignore them?)
559550
continue;
560551
}
561552

562-
cmd_t *cmd = parser_feed(parser, data[i]);
563-
if (parser_failed(parser)) {
564-
shout(
565-
"[%d] parser failed on character '%c' (%d): %s\n",
566-
CLIENT_ID(clientdata),
567-
data[i], data[i],
568-
parser_errormsg(parser)
569-
);
570-
parser_init(parser);
571-
response = strdup("-");
572-
break;
573-
}
574553
if (cmd) {
575554
char *newresponse = oncmd(stream, clientdata, cmd);
576555
response = destructive_concat(response, newresponse);

contrib/pg_dtm/dtmd/src/parser-test.c

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)
0