8000 Implement getting raftable entries directly from the leader. · postgrespro/postgres_cluster@d20a473 · GitHub
[go: up one dir, main page]

Skip to content

Commit d20a473

Browse files
committed
Implement getting raftable entries directly from the leader.
1 parent 04372e7 commit d20a473

File tree

5 files changed

+83
-38
lines changed

5 files changed

+83
-38
lines changed

contrib/raftable/client.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ static bool continue_recv(int socket, void *dst, size_t len, size_t *done)
2929
}
3030
}
3131
*done += recved;
32+
Assert(*done <= len);
3233
}
3334
return true;
3435
}
@@ -55,6 +56,7 @@ static bool continue_send(int socket, void *src, size_t len, size_t *done)
5556
}
5657
}
5758
*done += sent;
59+
Assert(*done <= len);
5860
}
5961
return true;
6062
}
@@ -72,7 +74,7 @@ void client_recv(Client *client)
7274
}
7375
if (client->cursor < sizeof(client->msglen)) return; /* continue later */
7476

75-
client->msg = malloc(sizeof(Message) + client->msglen);
77+
client->msg = palloc(sizeof(Message) + client->msglen);
7678
client->msg->len = client->msglen;
7779
client->cursor = 0;
7880
}
@@ -95,7 +97,7 @@ void client_send(Client *client)
9597
Assert(client->msg != NULL);
9698

9799
totallen = client->msg->len + sizeof(client->msg->len);
98-
if (client->cursor < client->msg->len)
100+
if (client->cursor < totallen)
99101
{
100102
if (!continue_send(client->socket, client->msg, totallen, &client->cursor))
101103
goto failure;

contrib/raftable/raftable.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ char *raftable_get(const char *key, size_t *len, int timeout_ms)
422422
if (*len)
423423
{
424424
value = palloc(*len);
425-
memcpy(value, f->data, *len);
425+
memcpy(value, f->data + f->keylen, *len);
426426
}
427427
}
428428
else
@@ -463,19 +463,20 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
463463
Datum
464464
raftable_sql_set(PG_FUNCTION_ARGS)
465465
{
466+
bool ok;
466467
char *key = text_to_cstring(PG_GETARG_TEXT_P(0));
467468
int timeout_ms = PG_GETARG_INT32(2);
468469
if (PG_ARGISNULL(1))
469-
raftable_set(key, NULL, 0, timeout_ms);
470+
ok = raftable_set(key, NULL, 0, timeout_ms);
470471
else
471472
{
472473
char *value = text_to_cstring(PG_GETARG_TEXT_P(1));
473-
raftable_set(key, value, strlen(value), timeout_ms);
474+
ok = raftable_set(key, value, strlen(value), timeout_ms);
474475
pfree(value);
475476
}
476477
pfree(key);
477478

478-
PG_RETURN_VOID();
479+
PG_RETURN_BOOL(ok);
479480
}
480481

481482
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg)

contrib/raftable/state.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ void *state_make_snapshot(StateP state, size_t *size)
205205
LWLockAcquire(state->lock, LW_SHARED);
206206

207207
*size = state_estimate_size(state);
208-
message = malloc(*size);
208+
message = malloc(*size); /* this is later freed by raft with a call to plain free() */
209209
cursor = (char *)message;
210210

211211
state_foreach_entry(state, agg_snapshot, &cursor);

contrib/raftable/t/000_basic.pl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,22 @@ sub start_nodes
8181
while (my ($key, $value) = each(%tests))
8282
{
8383
my ($rc, $stdout, $stderr) = $baker->psql('postgres', "select raftable('$key');");
84-
is($rc, 0, "Baker returns '$key'");
85-
is($stdout, $value, "Baker has the proper value for '$key'");
84+
is($stdout, $value, "Baker has the proper value for '$key' in the local state");
85+
86+
($rc, $stdout, $stderr) = $baker->psql('postgres', "select raftable('$key', $timeout_ms);");
87+
is($stdout, $value, "Baker gets the proper value for '$key' from the leader");
8688
}
8789

8890
my $ok = 1;
8991
my ($rc, $stdout, $stderr) = $baker->psql('postgres', "select raftable();");
90-
is($rc, 0, "Baker returns everything");
92+
is($rc, 0, "Baker has everything in the local state");
9193
while ($stdout =~ /\((\w+),(\w+)\)/g)
9294
{
9395
if (!exists $tests{$1}) { $ok = 0; last; }
9496
my $val = delete $tests{$1};
9597
if ($val ne $2) { $ok = 0; last; }
9698
}
9799
if (keys %tests > 0) { $ok = 0; }
98-
is($ok, 1, "Baker has the proper value for everything");
100+
is($ok, 1, "Baker has the proper value for everything in the local state");
99101

100102
exit(0);

contrib/raftable/worker.c

Lines changed: 67 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ typedef struct Server {
3030
Client clients[MAX_CLIENTS];
3131
} Server;
3232

33+
static StateP state;
3334
static Server server;
3435
static raft_t raft;
3536

@@ -211,17 +212,73 @@ static bool accept_client(void)
211212

212213
static void on_message_recv(Client *c)
213214
{
215+
int index;
216+
raft_update_t u;
217+
RaftableMessage *rm;
218+
214219
Assert(c->state == CLIENT_SENDING);
215220
Assert(c->msg != NULL);
216221
Assert(c->cursor == c->msg->len);
217-
c->state = CLIENT_RECVING;
222+
Assert(c->expect == -1);
223+
224+
rm = (RaftableMessage *)c->msg->data;
225+
if (rm->meaning == MEAN_SET)
226+
{
227+
u.len = c->msg->len;
228+
u.data = (char *)rm;
229+
index = raft_emit(raft, u); /* raft will copy the data */
230+
pfree(c->msg);
231+
c->msg = NULL;
232+
if (index < 0)
233+
{
234+
c->expect = index;
235+
c->state = CLIENT_WAITING;
236+
}
237+
else
238+
{
239+
fprintf(stderr, "failed to emit a raft update\n");
240+
c->state = CLIENT_SICK;
241+
}
242+
}
243+
else if (rm->meaning == MEAN_GET)
244+
{
245+
char *key;
246+
char *value;
247+
size_t vallen;
248+
size_t answersize;
249+
RaftableField *f;
250+
RaftableMessage *answer;
251+
Assert(rm->fieldnum == 1);
252+
f = (RaftableField *)rm->data;
253+
Assert(f->vallen == 0);
254+
key = f->data;
255+
256+
value = state_get(state, key, &vallen);
257+
fprintf(stderr, "query='%s' answer='%.*s'\n", key, (int)vallen, value);
258+
answer = make_single_value_message(key, value, vallen, &answersize);
259+
answer->meaning = MEAN_OK;
260+
pfree(value);
261+
pfree(c->msg);
262+
c->msg = palloc(sizeof(Message) + answersize);
263+
c->msg->len = answersize;
264+
memcpy(c->msg->data, answer, answersize);
265+
pfree(answer);
266+
c->cursor = 0;
267+
c->state = CLIENT_RECVING;
268+
}
269+
else
270+
{
271+
fprintf(stderr, "unknown meaning %d of the client's message\n", rm->meaning);
272+
c->state = CLIENT_SICK;
273+
}
218274
}
219275

220276
static void on_message_send(Client *c)
221277
{
222278
Assert(c->state == CLIENT_RECVING);
223279
Assert(c->msg != NULL);
224-
Assert(c->cursor == c->msg->len);
280+
Assert(c->cursor == c->msg->len + sizeof(c->msg->len));
281+
fprintf(stderr, "freeing msg = %p\n", c->msg);
225282
pfree(c->msg);
226283
c->msg = NULL;
227284
c->state = CLIENT_SENDING;
@@ -251,44 +308,27 @@ static void attend(Client *c)
251308
default:
252309
Assert(false); // should not happen
253310
}
254-
255-
// while (get_new_message(c))
256-
// {
257-
// int index;
258-
// raft_update_t u;
259-
// RaftableUpdate *ru = (RaftableUpdate *)c->msg;
260-
//
261-
// Assert(c->expect == -1); /* client shouldn't send multiple updates at once */
262-
//
263-
// u.len = c->msglen;
264-
// u.data = c->msg;
265-
// index = raft_emit(raft, u);
266-
// if (index >= 0)
267-
// c->expect = index;
268-
// else
269-
// c->good = false;
270-
// pfree(c->msg);
271-
// c->msg = NULL;
272-
// }
273311
}
274312

275313
static void notify(void)
276314
{
277315
int i = 0;
278316
for (i = 0; i < MAX_CLIENTS; i++)
279317
{
280-
size_t size;
318+
size_t answersize;
281319
RaftableMessage *answer;
282320
Client *c = server.clients + i;
283321
if (c->state != CLIENT_WAITING) continue;
284322
Assert(c->expect >= 0);
285323
if (!raft_applied(raft, server.id, c->expect)) continue;
286324

287-
answer = make_single_value_message("", NULL, 0, &size);
325+
answer = make_single_value_message("", NULL, 0, &answersize);
288326
answer->meaning = MEAN_OK;
289-
c->msg = malloc(sizeof(Message) + size);
290-
c->msg->len = size;
291-
memcpy(c->msg->data, answer, size);
327+
c->msg = palloc(sizeof(Message) + answersize);
328+
fprintf(stderr, "allocated msg = %p\n", c->msg);
329+
c->msg->len = answersize;
330+
memcpy(c->msg->data, answer, answersize);
331+
c->cursor = 0;
292332
pfree(answer);
293333
c->state = CLIENT_RECVING;
294334
c->expect = -1;
@@ -412,7 +452,7 @@ static void worker_main(Datum arg)
412452
sigset_t sset;
413453
mstimer_t t;
414454
WorkerConfig *cfg = (WorkerConfig *)(arg);
415-
StateP state = (StateP)cfg->getter();
455+
state = (StateP)cfg->getter();
416456

417457
cfg->raft_config.userdata = state;
418458
cfg->raft_config.applier = applier;

0 commit comments

Comments
 (0)
0