8000 Add timeouts and vallen to raftable API. · postgrespro/postgres_cluster@d8c0394 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d8c0394

Browse files
committed
Add timeouts and vallen to raftable API.
1 parent a088277 commit d8c0394

File tree

7 files changed

+149
-108
lines changed

7 files changed

+149
-108
lines changed

contrib/raftable/README

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,24 @@ The backend can also issue commands to itself through C API.
4040

4141
C API:
4242
/* Gets value by key. Returns the value or NULL if not found. */
43-
char *raftable_get(char *key);
43+
char *raftable_get(const char *key, size_t *vallen);
4444

4545
/*
46-
* Adds/updates value by key. Returns when the value gets replicated on
47-
* current machine. Storing NULL will delete the item from the table.
46+
* Adds/updates value by key. Returns when the value gets replicated.
47+
* Storing NULL will delete the item from the table. Gives up after
48+
* 'timeout_ms' milliseconds.
4849
*/
49-
void raftable_set(char *key, char *value);
50+
void raftable_set(const char *key, const char *value, size_t len, int timeout_ms);
5051

5152
/*
5253
* Iterates over all items in the table, calling func(key, value, arg)
5354
* for each of them.
5455
*/
55-
void raftable_every(void (*func)(char *, char *, void *), void *arg);
56+
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg);
5657

5758
SQL API:
5859
-- set
59-
raftable(key varchar(64), value text, tries int);
60+
raftable(key varchar(64), value text, timeout_ms int);
6061

6162
-- get
6263
raftable(key varchar(64)) returns text;

contrib/raftable/raftable--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ AS 'MODULE_PATHNAME','raftable_sql_get'
88
LANGUAGE C;
99

1010
-- set
11-
CREATE FUNCTION raftable(key varchar(64), value text, tries int)
11+
CREATE FUNCTION raftable(key varchar(64), value text, timeout_ms int)
1212
RETURNS void
1313
AS 'MODULE_PATHNAME','raftable_sql_set'
1414
LANGUAGE C;

contrib/raftable/raftable.c

Lines changed: 110 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "access/htup_details.h"
1313
#include "miscadmin.h"
1414
#include "funcapi.h"
15+
#include "utils/timestamp.h"
1516

1617
#include "raft.h"
1718
#include "util.h"
@@ -54,9 +55,19 @@ static void *get_shared_state(void)
5455

5556
static void select_next_peer(void)
5657
{
57-
do {
58-
*shared.leader = (*shared.leader + 1) % RAFTABLE_PEERS_MAX;
59-
} while (!wcfg.peers[*shared.leader].up);
58+
int orig_leader = *shared.leader;
59+
int i;
60+
for (i = 0; i < RAFTABLE_PEERS_MAX; i++)
61+
{
62+
int idx = (orig_leader + i + 1) % RAFTABLE_PEERS_MAX;
63+
HostPort *hp = wcfg.peers + idx;
64+
if (hp->up)
65+
{
66+
*shared.leader = idx;
67+
return;
68+
}
69+
}
70+
elog(WARNING, "all raftable peers down");
6071
}
6172

6273
static void disconnect_leader(void)
@@ -129,56 +140,116 @@ static bool connect_leader(void)
129140

130141
static int get_connection(void)
131142
{
132-
while (leadersock < 0)
143+
if (leadersock < 0)
133144
{
134-
if (connect_leader()) break;
145+
if (connect_leader()) return leadersock;
135146

136-
int timeout_ms = 1000;
147+
int timeout_ms = 100;
137148
struct timespec timeout = {0, timeout_ms * 1000000};
138149
nanosleep(&timeout, NULL);
139150
}
140151
return leadersock;
141152
}
142153

143-
char *raftable_get(char *key)
154+
char *raftable_get(const char *key, size_t *len)
144155
{
145-
return state_get(shared.state, key);
156+
return state_get(shared.state, key, len);
146157
}
147158

148159
Datum
149160
raftable_sql_get(PG_FUNCTION_ARGS)
150161
{
151162
RaftableEntry *e;
152163
RaftableKey key;
164+
size_t len;
153165
text_to_cstring_buffer(PG_GETARG_TEXT_P(0), key.data, sizeof(key.data));
154166

155167
Assert(shared.state);
156168

157-
char *s = state_get(shared.state, key.data);
169+
char *s = state_get(shared.state, key.data, &len);
158170
if (s)
159171
{
160-
text *t = cstring_to_text(s);
172+
text *t = cstring_to_text_with_len(s, len);
161173
pfree(s);
162174
PG_RETURN_TEXT_P(t);
163175
}
164176
else
165177
PG_RETURN_NULL();
166178
}
167179

168-
bool raftable_set(char *key, char *value, int tries)
180+
static void start_timer(TimestampTz *timer)
169181
{
170-
RaftableUpdate *ru;
171-
size_t size = sizeof(RaftableUpdate);
172-
int keylen, vallen = 0;
173-
bool ok = false;
182+
*timer -= GetCurrentTimestamp();
183+
}
184+
185+
static void stop_timer(TimestampTz *timer)
186+
{
187+
*timer += GetCurrentTimestamp();
188+
}
189+
190+
static long msec(TimestampTz timer)
191+
{
192+
long sec;
193+
int usec;
194+
TimestampDifference(0, timer, &sec, &usec);
195+
return sec * 1000 + usec / 1000;
196+
}
197+
198+
static bool try_sending_update(RaftableUpdate *ru, size_t size)
199+
{
200+
int s = get_connection();
174201

175-
if (tries <= 0)
202+
if (s < 0) return false;
203+
204+
int sent = 0, recved = 0;
205+
int status;
206+
207+
if (write(s, &size, sizeof(size)) != sizeof(size))
208+
{
209+
disconnect_leader();
210+
elog(WARNING, "failed to send the update size to the leader");
211+
return false;
212+
}
213+
214+
while (sent < size)
176215
{
177-
elog(ERROR, "raftable set should be called with 'tries' > 0");
216+
int newbytes = write(s, (char *)ru + sent, size - sent);
217+
if (newbytes == -1)
218+
{
219+
disconnect_leader();
220+
elog(WARNING, "failed to send the update to the leader");
221+
return false;
222+
}
223+
sent += newbytes;
178224
}
179225

226+
recved = read(s, &status, sizeof(status));
227+
if (recved != sizeof(status))
228+
{
229+
disconnect_leader();
230+
elog(WARNING, "failed to recv the update status from the leader");
231+
return false;
232+
}
233+
234+
if (status != 1)
235+
{
236+
disconnect_leader();
237+
elog(WARNING, "leader returned %d", status);
238+
return false;
239+
}
240+
241+
return true;
242+
}
243+
244+
bool raftable_set(const char *key, const char *value, size_t vallen, int timeout_ms)
245+
{
246+
RaftableUpdate *ru;
247+
size_t size = sizeof(RaftableUpdate);
248+
size_t keylen = 0;
249+
TimestampTz now;
250+
int elapsed_ms;
251+
180252
keylen = strlen(key) + 1;
181-
if (value) vallen = strlen(value) + 1;
182253

183254
size += sizeof(RaftableField) - 1;
184255
size += keylen;
@@ -194,84 +265,54 @@ bool raftable_set(char *key, char *value, int tries)
194265
memcpy(f->data, key, keylen);
195266
memcpy(f->data + keylen, value, vallen);
196267

197-
tryagain:
198-
if (tries--)
268+
elapsed_ms = 0;
269+
now = GetCurrentTimestamp();
270+
while ((elapsed_ms <= timeout_ms) || (timeout_ms == -1))
199271
{
200-
int s = get_connection();
201-
int sent = 0, recved = 0;
202-
int status;
203-
204-
if (write(s, &size, sizeof(size)) != sizeof(size))
272+
TimestampTz past = now;
273+
if (try_sending_update(ru, size))
205274
{
206-
disconnect_leader();
207-
elog(WARNING, "failed[%d] to send the update size to the leader", tries);
208-
goto tryagain;
209-
}
210-
211-
while (sent < size)
212-
{
213-
int newbytes = write(s, (char *)ru + sent, size - sent);
214-
if (newbytes == -1)
215-
{
216-
disconnect_leader();
217-
elog(WARNING, "failed[%d] to send the update to the leader", tries);
218-
goto tryagain;
219-
}
220-
sent += newbytes;
221-
}
222-
223-
recved = read(s, &status, sizeof(status));
224-
if (recved != sizeof(status))
225-
{
226-
disconnect_leader();
227-
elog(WARNING, "failed to recv the update status from the leader\n");
228-
goto tryagain;
275+
pfree(ru);
276+
return true;
229277
}
230-
goto success;
231-
}
232-
else
233-
{
234-
goto failure;
278+
now = GetCurrentTimestamp();
279+
elapsed_ms += msec(now - past);
235280
}
236281

237-
failure:
238-
elog(WARNING, "failed all tries to set raftable value\n");
239282
pfree(ru);
283+
elog(WARNING, "failed to set raftable value after %d ms", timeout_ms);
240284
return false;
241-
242-
success:
243-
pfree(ru);
244-
return true;
245285
}
246286

247287
Datum
248288
raftable_sql_set(PG_FUNCTION_ARGS)
249289
{
250290
char *key = text_to_cstring(PG_GETARG_TEXT_P(0));
251-
int tries = PG_GETARG_INT32(2);
291+
int timeout_ms = PG_GETARG_INT32(2);
252292
if (PG_ARGISNULL(1))
253-
raftable_set(key, NULL, tries);
293+
raftable_set(key, NULL, 0, timeout_ms);
254294
else
255295
{
256296
char *value = text_to_cstring(PG_GETARG_TEXT_P(1));
257-
raftable_set(key, value, tries);
297+
raftable_set(key, value, strlen(value), timeout_ms);
258298
pfree(value);
259299
}
260300
pfree(key);
261301

262302
PG_RETURN_VOID();
263303
}
264304

265-
void raftable_every(void (*func)(char *, char *, void *), void *arg)
305+
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg)
266306
{
267307
void *scan;
268308
char *key, *value;
309+
size_t len;
269310
Assert(shared.state);
270311

271312
scan = state_scan(shared.state);
272-
while (state_next(shared.state, scan, &key, &value))
313+
while (state_next(shared.state, scan, &key, &value, &len))
273314
{
274-
func(key, value, arg);
315+
func(key, value, len, arg);
275316
pfree(key);
276317
pfree(value);
277318
}
@@ -281,6 +322,7 @@ Datum
281322
raftable_sql_list(PG_FUNCTION_ARGS)
282323
{
283324
char *key, *value;
325+
size_t len;
284326
FuncCallContext *funcctx;
285327
MemoryContext oldcontext;
286328

@@ -309,14 +351,14 @@ raftable_sql_list(PG_FUNCTION_ARGS)
309351

310352
funcctx = SRF_PERCALL_SETUP();
311353

312-
if (state_next(shared.state, funcctx->user_fctx, &key, &value))
354+
if (state_next(shared.state, funcctx->user_fctx, &key, &value, &len))
313355
{
314356
HeapTuple tuple;
315357
Datum vals[2];
316358
bool isnull[2];
317359

318-
vals[0] = CStringGetTextDatum(key);
319-
vals[1] = CStringGetTextDatum(value);
360+
vals[0] = PointerGetDatum(cstring_to_text(key));
361+
vals[1] = PointerGetDatum(cstring_to_text_with_len(value, len));
320362
isnull[0] = isnull[1] = false;
321363

322364
tuple = heap_form_tuple(funcctx->tuple_desc, vals, isnull);

contrib/raftable/raftable.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
#define __RAFTABLE_H__
33

44
/* Gets value by key. Returns the value or NULL if not found. */
5-
char *raftable_get(char *key);
5+
char *raftable_get(const char *key, size_t *vallen);
66

77
/*
88
* Adds/updates value by key. Returns when the value gets replicated.
9-
* Storing NULL will delete the item from the table. Give up after 'tries'
10-
* tries have failed.
9+
* Storing NULL will delete the item from the table. Gives up after 'timeout_ms'
10+
* milliseconds.
1111
*/
12-
bool raftable_set(char *key, char *value, int tries);
12+
bool raftable_set(const char *key, const char *value, size_t vallen, int timeout_ms);
1313

1414
/*
1515
* Iterates over all items in the table, calling func(key, value, arg)
1616
* for each of them.
1717
*/
18-
void raftable_every(void (*func)(char *, char *, void *), void *arg);
18+
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg);
1919

2020
#endif

0 commit comments

Comments
 (0)
0