8000 Fix the corruption of the raft update messages. · postgrespro/postgres_cluster@d069365 · GitHub
[go: up one dir, main page]

Skip to content

Commit d069365

Browse files
committed
Fix the corruption of the raft update messages.
1 parent d29857c commit d069365

File tree

4 files changed

+38
-22
lines changed

4 files changed

+38
-22
lines changed

contrib/pg_dtm/dtmd/include/dtmdlimits.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#define HEARTBEAT_TIMEOUT_MS 20
1212
#define ELECTION_TIMEOUT_MS_MIN 150
1313
#define ELECTION_TIMEOUT_MS_MAX 300
14-
#define RAFT_LOGLEN 128
15-
#define RAFT_KEEP_APPLIED 16 // how many applied entries to keep during compaction
14+
#define RAFT_LOGLEN 10240
15+
#define RAFT_KEEP_APPLIED 4096 // how many applied entries to keep during compaction
1616

1717
#endif

contrib/pg_dtm/dtmd/include/raft.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
#define NOBODY -1
99

10+
#define MAJORITY_IS_NOT_ENOUGH // wait for unanimous ack for applying a new entry
11+
1012
#define DEFAULT_LISTENHOST "0.0.0.0"
1113
#define DEFAULT_LISTENPORT 5431
1214

@@ -67,7 +69,7 @@ typedef struct raft_t {
6769
int vote; // who received our vote in current term
6870
int role;
6971
int me; // my id
70-
int votes; // how votes are for me (if candidate)
72+
int votes; // how many votes are for me (if candidate)
7173
int leader; // the id of the leader
7274
raft_log_t log;
7375

contrib/pg_dtm/dtmd/src/heart.c

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,6 @@ static int mstimer_reset(mstimer_t *t) {
6363
return ms;
6464
}
6565

66-
void usr1(int signum) {
67-
static int arg = 0;
68-
if (raft.role == ROLE_LEADER) {
69-
int action = rand() % 9 + 1;
70-
shout("got an USR1, state[%d] := %d\n", arg, action);
71-
raft_emit(&raft, action, arg);
72-
arg++;
73-
} else {
74-
shout("got an USR1 while not a leader, ignoring\n");
75-
}
76-
}
77-
7866
static void main_loop() {
7967
mstimer_t t;
8068
mstimer_reset(&t);
@@ -85,7 +73,7 @@ static void main_loop() {
8573
die(EXIT_FAILURE);
8674
}
8775

88-
//keep listening for data
76+
int arg = 0;
8977
while (true) {
9078
int ms = mstimer_reset(&t);
9179
raft_tick(&raft, ms);
@@ -98,6 +86,13 @@ static void main_loop() {
9886
if (m) {
9987
raft_handle_message(&raft, m);
10088
}
89+
90+
if ((raft.role == ROLE_LEADER) && (rand() % 10 == 0)) {
91+
int action = rand() % 9 + 1;
92+
shout("set state[%d] = %d\n", arg, action);
93+
raft_emit(&raft, action, arg);
94+
arg++;
95+
}
10196
}
10297

10398
close(s);
@@ -172,7 +167,6 @@ int main(int argc, char **argv) {
172167

173168
signal(SIGTERM, die);
174169
signal(SIGINT, die);
175-
signal(SIGUSR1, usr1);
176170

177171
main_loop();
178172

contrib/pg_dtm/dtmd/src/raft.c

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,12 @@ static bool msg_size_is(raft_msg_t *m, int mlen) {
168168

169169
static void raft_send(raft_t *r, int dst, void *m, int mlen) {
170170
assert(msg_size_is((raft_msg_t*)m, mlen));
171+
assert(((raft_msg_t*)m)->msgtype >= 0);
172+
assert(((raft_msg_t*)m)->msgtype < 4);
171173
assert(dst >= 0);
172174
assert(dst < r->servernum);
173175
assert(dst != r->me);
176+
assert(((raft_msg_t*)m)->from == r->me);
174177

175178
raft_server_t *server = r->servers + dst;
176179

@@ -209,7 +212,12 @@ static void raft_beat(raft_t *r, int dst) {
209212
m.msg.from = r->me;
210213

211214
if (s->tosend < r->log.first + r->log.size) {
212-
// TODO: implement snapshot sending
215+
raft_entry_t *e = &RAFT_LOG(r, s->tosend);
216+
if (e->snapshot) {
217+
// TODO: implement snapshot sending
218+
shout("tosend = %d, first = %d, size = %d\n", s->tosend, r->log.first, r->log.size);
219+
assert(false); // snapshot sending not implemented
220+
}
213221

214222
// the follower is a bit behind: send an update
215223
m.previndex = s->tosend - 1;
@@ -218,7 +226,7 @@ static void raft_beat(raft_t *r, int dst) {
218226
} else {
219227
m.prevterm = -1;
220228
}
221-
m.entry = RAFT_LOG(r, s->tosend);
229+
m.entry = *e;
222230
m.empty = false;
223231
} else {
224232
// the follower is up to date: send a heartbeat
@@ -313,6 +321,7 @@ static int raft_log_compact(raft_log_t *l, int keep_applied) {
313321
snap.minarg = min(snap.minarg, e->argument);
314322
snap.maxarg = max(snap.maxarg, e->argument);
315323
}
324+
e->snapshot = false; // FIXME: should not need this, find the code where it is not set on new entry insertion
316325
compacted++;
317326
}
318327
if (compacted) {
@@ -331,7 +340,7 @@ bool raft_emit(raft_t *r, int action, int argument) {
331340
if (r->log.size == RAFT_LOGLEN) {
332341
int compacted = raft_log_compact(&r->log, RAFT_KEEP_APPLIED);
333342
if (compacted) {
334-
debug("compacted %d entries\n", compacted);
343+
shout("compacted %d entries\n", compacted);
335344
} else {
336345
shout(
337346
"cannot emit new entries, the log is"
@@ -355,7 +364,9 @@ bool raft_emit(raft_t *r, int action, int argument) {
355364
}
356365

357366
static bool log_append(raft_log_t *l, int previndex, int prevterm, raft_entry_t *e) {
358-
assert(!e->snapshot);
367+
if (e->snapshot) {
368+
assert(false);
369+
}
359370
debug(
360371
"log_append(%p, previndex=%d, prevterm=%d,"
361372
" term=%d, action=%d, argument=%d)\n",
@@ -470,11 +481,12 @@ static void raft_handle_update(raft_t *r, raft_msg_update_t *m) {
470481

471482
static void raft_refresh_acked(raft_t *r) {
472483
// pick each server's acked and see if it is acked on the majority
484+
// TODO: count 'acked' inside the entry itself to remove the nested loop here
473485
int i, j;
474486
for (i = 0; i < r->servernum; i++) {
475487
if (i == r->me) continue;
476488
int newacked = r->servers[i].acked;
477-
if (newacked < r->log.acked) continue;
489+
if (newacked <= r->log.acked) continue;
478490

479491
int replication = 1; // count self as yes
480492
for (j = 0; j < r->servernum; j++) {
@@ -486,7 +498,12 @@ static void raft_refresh_acked(raft_t *r) {
486498
}
487499
}
488500

501+
assert(replication <= r->servernum);
502+
489503
if (replication * 2 > r->servernum) {
504+
#ifdef MAJORITY_IS_NOT_ENOUGH
505+
if (replication < r->servernum) continue;
506+
#endif
490507
r->log.acked = newacked;
491508
}
492509
}
@@ -524,6 +541,7 @@ static void raft_handle_done(raft_t *r, raft_msg_done_t *m) {
524541
// the client should have specified the last index it had gotten
525542
server->tosend = m->index + 1;
526543
}
544+
assert(server->tosend >= server->acked); // FIXME: remove this, because 'tosend' is actually allowed to be less than 'acked' if the follower has restarted
527545
}
528546

529547
if (server->tosend < r->log.first + r->log.size) {
@@ -609,6 +627,8 @@ void raft_handle_message(raft_t *r, raft_msg_t *m) {
609627
r->role = ROLE_FOLLOWER;
610628
}
611629

630+
assert(m->msgtype >= 0);
631+
assert(m->msgtype < 4);
612632
switch (m->msgtype) {
613633
case RAFT_MSG_UPDATE:
614634
raft_handle_update(r, (raft_msg_update_t *)m);

0 commit comments

Comments
 (0)
0