8000 WIP. Compiles :) · postgrespro/postgres_cluster@04372e7 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 04372e7

Browse files
committed
WIP. Compiles :)
1 parent 4b9dedc commit 04372e7

File tree

Collapse file tree

9 files changed

+318
-247
lines changed

9 files changed

+318
-247
lines changed

contrib/raftable/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = raftable
2-
OBJS = raftable.o worker.o state.o blockmem.o timeout.o raft/obj/raft.o raft/obj/util.o
2+
OBJS = raftable.o worker.o client.o state.o blockmem.o timeout.o raft/obj/raft.o raft/obj/util.o
33
EXTENSION = raftable
44
DATA = raftable--1.0.sql
55

contrib/raftable/buffer.c

Lines changed: 0 additions & 82 deletions
This file was deleted.

contrib/raftable/buffer.h

Lines changed: 0 additions & 18 deletions
This file was deleted.

contrib/raftable/client.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#include <stdbool.h>
2+
#include <sys/types.h>
3+
#include <sys/socket.h>
4+
#include <errno.h>
5+
6+
#include <postgres.h>
7+
8+
#include "client.h"
9+
10+
static bool continue_recv(int socket, void *dst, size_t len, size_t *done)
11+
{
12+
while (*done < len)
13+
{
14+
ssize_t recved = recv(socket, ((char *)dst) + *done, len - *done, MSG_DONTWAIT);
15+
if (recved == 0) return false;
16+
if (recved < 0)
17+
{
18+
switch (errno)
19+
{
20+
case EAGAIN:
21+
#if EAGAIN != EWOULDBLOCK
22+
case EWOULDBLOCK:
23+
#endif
24+
return true; /* try again later */
25+
case EINTR:
26+
continue; /* try again now */
27+
default:
28+
return false;
29+
}
30+
}
31+
*done += recved;
32+
}
33+
return true;
34+
}
35+
36+
static bool continue_send(int socket, void *src, size_t len, size_t *done)
37+
{
38+
while (*done < len)
39+
{
40+
ssize_t sent = send(socket, ((char *)src) + *done, len - *done, MSG_DONTWAIT);
41+
if (sent == 0) return false;
42+
if (sent < 0)
43+
{
44+
switch (errno)
45+
{
46+
case EAGAIN:
47+
#if EAGAIN != EWOULDBLOCK
48+
case EWOULDBLOCK:
49+
#endif
50+
return true; /* try again later */
51+
case EINTR:
52+
continue; /* try again now */
53+
default:
54+
return false;
55+
}
56+
}
57+
*done += sent;
58+
}
59+
return true;
60+
}
61+
62+
void client_recv(Client *client)
63+
{
64+
Assert(client->state == CLIENT_SENDING);
65+
66+
if (!client->msg)
67+
{
68+
if (client->cursor < sizeof(client->msglen))
69+
{
70+
if (!continue_recv(client->socket, &client->msglen, sizeof(client->msglen), &client->cursor))
71+
goto failure;
72+
}
73+
if (client->cursor < sizeof(client->msglen)) return; /* continue later */
74+
75+
client->msg = malloc(sizeof(Message) + client->msglen);
76+
client->msg->len = client->msglen;
77+
client->cursor = 0;
78+
}
79+
80+
if (client->cursor < client->msg->len)
81+
{
82+
if (!continue_recv(client->socket, client->msg->data, client->msg->len, &client->cursor))
83+
goto failure;
84+
}
85+
86+
return;
87+
failure:
88+
client->state = CLIENT_SICK;
89+
}
90+
91+
void client_send(Client *client)
92+
{
93+
size_t totallen;
94+
Assert(client->state == CLIENT_RECVING);
95+
Assert(client->msg != NULL);
96+
97+
totallen = client->msg->len + sizeof(client->msg->len);
98+
if (client->cursor < client->msg->len)
99+
{
100+
if (!continue_send(client->socket, client->msg, totallen, &client->cursor))
101+
goto failure;
102+
}
103+
104+
return;
105+
failure:
106+
client->state = CLIENT_SICK;
107+
}

contrib/raftable/client.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#ifndef CLIENT_H
2+
#define CLIENT_H
3+
4+
#include <stdlib.h>
5+
6+
#define BUFLEN 1024
7+
8+
/* Client state machine:
9+
*
10+
* ┏━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
11+
* ──>┃ Dead ┃<──┨ Sick ┃
12+
* ┗━━━━┯━━━━┛ ┗━━━━┯━━━━━━━━━━━━━┯━━━━━━━━━━━━━┯━━━━┛
13+
* │conn ^ fail ^ fail ^ fail
14+
* └───────>┏━━━━┷━━━━┓fin┏━━━━┷━━━━┓fin┏━━━━┷━━━━┓fin
15+
* ┃ Sending ┠──>┃ Waiting ┠──>┃ Recving ┠─┐
16+
* ┌─>┗━━━━━━━━━┛ ┗━━━━━━━━━┛ ┗━━━━━━━━━┛ │
17+
* └──────────────────────────────────────────┘
18+
*/
19+
20+
typedef enum ClientState {
21+
CLIENT_SICK = -1,
22+
CLIENT_DEAD = 0,
23+
CLIENT_SENDING,
24+
CLIENT_WAITING,
25+
CLIENT_RECVING
26+
} ClientState;
27+
28+
typedef struct Message {
29+
size_t len;
30+
char data[1];
31+
} Message;
32+
33+
typedef struct Client {
34+
ClientState state;
35+
int socket;
36+
size_t msglen;
37+
size_t cursor;
38+
Message *msg; /* the message that is currently being sent or received */
39+
int expect;
40+
} Client;
41+
42+
void client_recv(Client *client);
43+
void client_send(Client *client);
44+
void client_switch(Client *client, ClientState state);
45+
46+
#endif

contrib/raftable/raftable.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ raftable_sql_get_local(PG_FUNCTION_ARGS)
277277

278278
Assert(shared.state);
279279

280-
s = raftable_get_local(shared.state, key.data, &len);
280+
s = raftable_get_local(key.data, &len);
281281
if (s)
282282
{
283283
text *t = cstring_to_text_with_len(s, len);
@@ -294,8 +294,9 @@ raftable_sql_get(PG_FUNCTION_ARGS)
294294
RaftableKey key;
295295
size_t len;
296296
char *s;
297+
int timeout_ms;
297298
text_to_cstring_buffer(PG_GETARG_TEXT_P(0), key.data, sizeof(key.data));
298-
int timeout_ms = PG_GETARG_INT32(1);
299+
timeout_ms = PG_GETARG_INT32(1);
299300

300301
s = raftable_get(key.data, &len, timeout_ms);
301302
if (s)
@@ -346,8 +347,8 @@ static RaftableMessage *raftable_try_query(RaftableMessage *msg, size_t size, si
346347
return NULL;
347348
}
348349

349-
answer = (RaftableMessage *)palloc(*size);
350-
if (!timed_read(s, answer, *size, timeout))
350+
answer = (RaftableMessage *)palloc(*rsize);
351+
if (!timed_read(s, answer, *rsize, timeout))
351352
{
352353
elog(WARNING, "query: failed to recv the answer from the leader");
353354
pfree(answer);
@@ -359,6 +360,7 @@ static RaftableMessage *raftable_try_query(RaftableMessage *msg, size_t size, si
359360

360361
static RaftableMessage *raftable_query(RaftableMessage *msg, size_t size, size_t *rsize, int timeout_ms)
361362
{
363+
RaftableMessage *answer;
362364
timeout_t timeout;
363365

364366
if (timeout_ms < 0)
@@ -404,8 +406,7 @@ char *raftable_get(const char *key, size_t *len, int timeout_ms)
404406
msg = make_single_value_message(key, NULL, 0, &size);
405407

406408
Assert(wcfg.id >= 0);
407-
msg->expector = wcfg.id;
408-
msg->action = ACTION_GET;
409+
msg->meaning = MEAN_GET;
409410

410411
answer = raftable_query(msg, size, &rsize, timeout_ms);
411412
pfree(msg);
@@ -425,7 +426,7 @@ char *raftable_get(const char *key, size_t *len, int timeout_ms)
425426
}
426427
}
427428
else
428-
assert(answer->meaning == MEAN_FAIL);
429+
Assert(answer->meaning == MEAN_FAIL);
429430
pfree(answer);
430431
}
431432
return value;
@@ -443,8 +444,7 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
443444
msg = make_single_value_message(key, value, vallen, &size);
444445

445446
Assert(wcfg.id >= 0);
446-
msg->expector = wcfg.id;
447-
msg->action = ACTION_SET;
447+
msg->meaning = MEAN_SET;
448448

449449
answer = raftable_query(msg, size, &rsize, timeout_ms);
450450
pfree(msg);
@@ -454,7 +454,7 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
454454
if (answer->meaning == MEAN_OK)
455455
ok = true;
456456
else
457-
assert(answer->meaning == MEAN_FAIL);
457+
Assert(answer->meaning == MEAN_FAIL);
458458
pfree(answer);
459459
}
460460
return ok;

contrib/raftable/state.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ void state_update(StateP state, RaftableMessage *msg, bool clear)
117117
{
118118
RaftableField *f;
119119
int i;
120-
char *cursor = message->data;
120+
char *cursor = msg->data;
121121

122122
Assert(state);
123123
LWLockAcquire(state->lock, LW_EXCLUSIVE);
124124

125125
if (clear) state_clear(state);
126126

127-
for (i = 0; i < message->fieldnum; i++) {
127+
for (i = 0; i < msg->fieldnum; i++) {
128128
char *key, *value;
129129
f = (RaftableField *)cursor;
130130
cursor = f->data;

contrib/raftable/state.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,25 @@ typedef struct RaftableEntry
1414
int block;
1515
} RaftableEntry;
1616

17-
typedef struct RaftableField {
17+
typedef struct RaftableField
18+
{
1819
size_t keylen;
1920
size_t vallen;
2021
bool isnull;
2122
char data[1];
2223
} RaftableField;
2324

24-
#define MEAN_FAIL 0
25-
#define MEAN_OK 1
26-
#define MEAN_GET 2
27-
#define MEAN_SET 3
25+
#define MEAN_FAIL '!'
26+
#define MEAN_OK '.'
27+
#define MEAN_GET '?'
28+
#define MEAN_SET '='
2829

29-
typedef struct RaftableMessage {
30-
int meaning;
30+
typedef struct RaftableMessage
31+
{
32+
char meaning;
3133
int fieldnum;
3234
char data[1];
33-
} RaftableQuery;
35+
} RaftableMessage;
3436

3537
typedef struct State *StateP;
3638

0 commit comments

Comments
 (0)
0