@@ -168,9 +168,12 @@ static bool msg_size_is(raft_msg_t *m, int mlen) {
168
168
169
169
static void raft_send (raft_t * r , int dst , void * m , int mlen ) {
170
170
assert (msg_size_is ((raft_msg_t * )m , mlen ));
171
+ assert (((raft_msg_t * )m )-> msgtype >= 0 );
172
+ assert (((raft_msg_t * )m )-> msgtype < 4 );
171
173
assert (dst >= 0 );
172
174
assert (dst < r -> servernum );
173
175
assert (dst != r -> me );
176
+ assert (((raft_msg_t * )m )-> from == r -> me );
174
177
175
178
raft_server_t * server = r -> servers + dst ;
176
179
@@ -209,7 +212,12 @@ static void raft_beat(raft_t *r, int dst) {
209
212
m .msg .from = r -> me ;
210
213
211
214
if (s -> tosend < r -> log .first + r -> log .size ) {
212
- // TODO: implement snapshot sending
215
+ raft_entry_t * e = & RAFT_LOG (r , s -> tosend );
216
+ if (e -> snapshot ) {
217
+ // TODO: implement snapshot sending
218
+ shout ("tosend = %d, first = %d, size = %d\n" , s -> tosend , r -> log .first , r -> log .size );
219
+ assert (false); // snapshot sending not implemented
220
+ }
213
221
214
222
// the follower is a bit behind: send an update
215
223
m .previndex = s -> tosend - 1 ;
@@ -218,7 +226,7 @@ static void raft_beat(raft_t *r, int dst) {
218
226
} else {
219
227
m .prevterm = -1 ;
220
228
}
221
- m .entry = RAFT_LOG ( r , s -> tosend ) ;
229
+ m .entry = * e ;
222
230
m .empty = false;
223
231
} else {
224
232
// the follower is up to date: send a heartbeat
@@ -313,6 +321,7 @@ static int raft_log_compact(raft_log_t *l, int keep_applied) {
313
321
snap .minarg = min (snap .minarg , e -> argument );
314
322
snap .maxarg = max (snap .maxarg , e -> argument );
315
323
}
324
+ e -> snapshot = false; // FIXME: should not need this, find the code where it is not set on new entry insertion
316
325
compacted ++ ;
317
326
}
318
327
if (compacted ) {
@@ -331,7 +340,7 @@ bool raft_emit(raft_t *r, int action, int argument) {
331
340
if (r -> log .size == RAFT_LOGLEN ) {
332
341
int compacted = raft_log_compact (& r -> log , RAFT_KEEP_APPLIED );
333
342
if (compacted ) {
334
- debug ("compacted %d entries\n" , compacted );
343
+ shout ("compacted %d entries\n" , compacted );
335
344
} else {
336
345
shout (
337
346
"cannot emit new entries, the log is"
@@ -355,7 +364,9 @@ bool raft_emit(raft_t *r, int action, int argument) {
355
364
}
356
365
357
366
static bool log_append (raft_log_t * l , int previndex , int prevterm , raft_entry_t * e ) {
358
- assert (!e -> snapshot );
367
+ if (e -> snapshot ) {
368
+ assert (false);
369
+ }
359
370
debug (
360
371
"log_append(%p, previndex=%d, prevterm=%d,"
361
372
" term=%d, action=%d, argument=%d)\n" ,
@@ -470,11 +481,12 @@ static void raft_handle_update(raft_t *r, raft_msg_update_t *m) {
470
481
471
482
static void raft_refresh_acked (raft_t * r ) {
472
483
// pick each server's acked and see if it is acked on the majority
484
+ // TODO: count 'acked' inside the entry itself to remove the nested loop here
473
485
int i , j ;
474
486
for (i = 0 ; i < r -> servernum ; i ++ ) {
475
487
if (i == r -> me ) continue ;
476
488
int newacked = r -> servers [i ].acked ;
477
- if (newacked < r -> log .acked ) continue ;
489
+ if (newacked <= r -> log .acked ) continue ;
478
490
479
491
int replication = 1 ; // count self as yes
480
492
for (j = 0 ; j < r -> servernum ; j ++ ) {
@@ -486,7 +498,12 @@ static void raft_refresh_acked(raft_t *r) {
486
498
}
487
499
}
488
500
501
+ assert (replication <= r -> servernum );
502
+
489
503
if (replication * 2 > r -> servernum ) {
504
+ #ifdef MAJORITY_IS_NOT_ENOUGH
505
+ if (replication < r -> servernum ) continue ;
506
+ #endif
490
507
r -> log .acked = newacked ;
491
508
}
492
509
}
@@ -524,6 +541,7 @@ static void raft_handle_done(raft_t *r, raft_msg_done_t *m) {
524
541
// the client should have specified the last index it had gotten
525
542
server -> tosend = m -> index + 1 ;
526
543
}
544
+ assert (server -> tosend >= server -> acked ); // FIXME: remove this, because 'tosend' is actually allowed to be less than 'acked' if the follower has restarted
527
545
}
528
546
529
547
if (server -> tosend < r -> log .first + r -> log .size ) {
@@ -609,6 +627,8 @@ void raft_handle_message(raft_t *r, raft_msg_t *m) {
609
627
r -> role = ROLE_FOLLOWER ;
610
628
}
611
629
630
+ assert (m -> msgtype >= 0 );
631
+ assert (m -> msgtype < 4 );
612
632
switch (m -> msgtype ) {
613
633
case RAFT_MSG_UPDATE :
614
634
raft_handle_update (r , (raft_msg_update_t * )m );
0 commit comments