8000 Implement Raft (without snapshot sending) and a standalone program fo… · postgrespro/postgres_cluster@9e4385a · GitHub
[go: up one dir, main page]

Skip to content

Commit 9e4385a

Browse files
committed
Implement Raft (without snapshot sending) and a standalone program for debugging.
1 parent 030fdd8 commit 9e4385a

File tree

12 files changed

+940
-295
lines changed

12 files changed

+940
-295
lines changed

contrib/pg_dtm/dtmd/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ bin/dtmd: obj/server.o obj/main.o obj/clog.o obj/clogfile.o obj/util.o obj/trans
2222
obj/snapshot.o \
2323
$(SOCKHUB_LDFLAGS)
2424

25-
bin/heart: obj/heart.o obj/util.o | bindir objdir
25+
bin/heart: obj/heart.o obj/raft.o obj/util.o | bindir objdir
2626
$(CC) -o bin/heart $(CFLAGS) \
27-
obj/heart.o obj/util.o
27+
obj/heart.o obj/raft.o obj/util.o
2828

2929
obj/server.o: src/server.c | objdir
3030
$(CC) -c -o obj/server.o $(CFLAGS) $(SOCKHUB_CFLAGS) src/server.c
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef DTMD_LIMITS_H
2+
#define DTMD_LIMITS_H
3+
4+
#define MAX_TRANSACTIONS 4096
5+
6+
#define BUFFER_SIZE (64 * 1024)
7+
#define LISTEN_QUEUE_SIZE 100
8+
#define MAX_STREAMS 4096
9+
10+
#define MAX_SERVERS 16
11+
#define HEARTBEAT_TIMEOUT_MS 20
12+
#define ELECTION_TIMEOUT_MS_MIN 150
13+
#define ELECTION_TIMEOUT_MS_MAX 300
14+
#define RAFT_LOGLEN 128
15+
#define RAFT_KEEP_APPLIED 16 // how many applied entries to keep during compaction
16+
17+
#endif

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 0 additions & 10 deletions
This file was deleted.

contrib/pg_dtm/dtmd/include/raft.h

Lines changed: 142 additions & 0 deletions
ED4F
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#ifndef RAFT_H
2+
#define RAFT_H
3+
4+
#include <arpa/inet.h>
5+
#include <stdbool.h>
6+
#include "dtmdlimits.h"
7+
8+
#define NOBODY -1
9+
10+
#define DEFAULT_LISTENHOST "0.0.0.0"
11+
#define DEFAULT_LISTENPORT 5431
12+
13+
#define ROLE_FOLLOWER 0
14+
#define ROLE_CANDIDATE 1
15+
#define ROLE_LEADER 2
16+
17+
#if RAFT_KEEP_APPLIED >= RAFT_LOGLEN
18+
#error please ensure RAFT_KEEP_APPLIED < RAFT_LOGLEN
19+
#endif
20+
21+
#if HEARTBEAT_TIMEOUT_MS >= ELECTION_TIMEOUT_MS_MIN
22+
#error please ensure HEARTBEAT_TIMEOUT_MS < ELECTION_TIMEOUT_MS_MIN (considerably)
23+
#endif
24+
25+
#if ELECTION_TIMEOUT_MS_MIN >= ELECTION_TIMEOUT_MS_MAX
26+
#error please ensure ELECTION_TIMEOUT_MS_MIN < ELECTION_TIMEOUT_MS_MAX
27+
#endif
28+
29+
// raft module does not care what you mean by action and argument
30+
typedef struct raft_entry_t {
31+
int term;
32+
bool snapshot; // true if this is a snapshot entry
33+
union {
34+
struct { // snapshot == false
35+
int action;
36+
int argument;
37+
};
38+
struct { // snapshot == true
39+
int minarg;
40+
int maxarg;
41+
};
42+
};
43+
} raft_entry_t;
44+
45+
typedef void (*raft_applier_t)(int action, int argument);
46+
47+
typedef struct raft_log_t {
48+
int first;
49+
int size; // number of entries past first
50+
int acked; // number of entries replicated to the majority of servers
51+
int applied; // number of entries applied to the state machine
52+
raft_entry_t entries[RAFT_LOGLEN]; // wraps around
53+
} raft_log_t;
54+
55+
typedef struct raft_server_t {
56+
int seqno; // the rpc sequence number
57+
int tosend; // index of the next entry to send
58+
int acked; // index of the highest entry known to be replicated
59+
60+
char *host;
61+
int port;
62+
struct sockaddr_in addr;
63+
} raft_server_t;
64+
65+
typedef struct raft_t {
66+
int term; // current term (latest term we have seen)
67+
int vote; // who received our vote in current term
68+
int role;
69+
int me; // my id
70+
int votes; // how votes are for me (if candidate)
71+
int leader; // the id of the leader
72+
raft_log_t log;
73+
74+
int sock;
75+
76+
int servernum;
77+
raft_server_t servers[MAX_SERVERS];
78+
79+
int timer;
80+
81+
raft_applier_t applier;
82+
} raft_t;
83+
84+
#define RAFT_LOG(RAFT, INDEX) ((RAFT)->log.entries[(INDEX) % (RAFT_LOGLEN)])
85+
86+
#define RAFT_MSG_UPDATE 0 // append entry
87+
#define RAFT_MSG_DONE 1 // entry appended
88+
#define RAFT_MSG_CLAIM 2 // vote for me
89+
#define RAFT_MSG_VOTE 3 // my vote
90+
91+
typedef struct raft_msg_t {
92+
int msgtype;
93+
int term;
94+
int from;
95+
int seqno;
96+
} raft_msg_t;
97+
98+
typedef struct raft_msg_update_t {
99+
raft_msg_t msg;
100+
int previndex; // the index of the preceding log entry
101+
int prevterm; // the term of the preceding log entry
102+
103+
bool empty; // the message is just a heartbeat if empty
104+
raft_entry_t entry;
105+
106+
int acked; // the leader's acked number
107+
} raft_msg_update_t;
108+
109+
typedef struct raft_msg_done_t {
110+
raft_msg_t msg;
111+
int index; // the index of the appended entry
112+
int term; // the term of the appended entry
113+
bool success;
114+
} raft_msg_done_t;
115+
116+
typedef struct raft_msg_claim_t {
117+
raft_msg_t msg;
118+
int index; // the index of my last entry
119+
int term; // the term of my last entry
120+
} raft_msg_claim_t;
121+
122+
typedef struct raft_msg_vote_t {
123+
raft_msg_t msg;
124+
bool granted;
125+
} raft_msg_vote_t;
126+
127+
// configuration
128+
void raft_init(raft_t *r);
129+
bool raft_add_server(raft_t *r, char *host, int port);
130+
bool raft_set_myid(raft_t *r, int myid);
131+
132+
// log actions
133+
bool raft_emit(raft_t *r, int action, int argument);
134+
int raft_apply(raft_t *r, raft_applier_t applier);
135+
136+
// control
137+
void raft_tick(raft_t *r, int msec);
138+
void raft_handle_message(raft_t *r, raft_msg_t *m);
139+
raft_msg_t *raft_recv_message(raft_t *r);
140+
int raft_create_udp_socket(raft_t *r);
141+
142+
#endif

contrib/pg_dtm/dtmd/include/snapshot.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#define SNAPSHOT_H
33

44
#include "int.h"
5-
#include "limits.h"
5+
#include "dtmdlimits.h"
66

77
typedef struct Snapshot {
88
xid_t xmin;

contrib/pg_dtm/dtmd/include/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "int.h"
66
#include "clog.h"
77
#include "snapshot.h"
8-
#include "limits.h"
8+
#include "dtmdlimits.h"
99

1010
#define MAX_SNAPSHOTS_PER_TRANS 8
1111

contrib/pg_dtm/dtmd/include/util.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <sys/stat.h>
1010
#include <fcntl.h>
1111
#include <stdio.h>
12+
#include <stdlib.h>
1213

1314
#include "int.h"
1415

@@ -17,6 +18,18 @@ bool inrange(xid_t min, xid_t x, xid_t max);
1718
int falloc(int fd, off64_t size);
1819
char *destructive_concat(char *a, char *b);
1920

21+
static inline int min(int a, int b) {
22+
return a < b ? a : b;
23+
}
24+
25+
static inline int max(int a, int b) {
26+
return a > b ? a : b;
27+
}
28+
29+
static inline int rand_between(int min, int max) {
30+
return rand() % (max - min + 1) + min;
31+
}
32+
2033
#ifndef DEBUG
2134
#define debug(...)
2235
#else

0 commit comments

Comments
 (0)
0