12
12
#include "access/htup_details.h"
13
13
#include "miscadmin.h"
14
14
#include "funcapi.h"
15
+ #include "utils/timestamp.h"
15
16
16
17
#include "raft.h"
17
18
#include "util.h"
@@ -54,9 +55,19 @@ static void *get_shared_state(void)
54
55
55
56
static void select_next_peer (void )
56
57
{
57
- do {
58
- * shared .leader = (* shared .leader + 1 ) % RAFTABLE_PEERS_MAX ;
59
- } while (!wcfg .peers [* shared .leader ].up );
58
+ int orig_leader = * shared .leader ;
59
+ int i ;
60
+ for (i = 0 ; i < RAFTABLE_PEERS_MAX ; i ++ )
61
+ {
62
+ int idx = (orig_leader + i + 1 ) % RAFTABLE_PEERS_MAX ;
63
+ HostPort * hp = wcfg .peers + idx ;
64
+ if (hp -> up )
65
+ {
66
+ * shared .leader = idx ;
67
+ return ;
68
+ }
69
+ }
70
+ elog (WARNING , "all raftable peers down" );
60
71
}
61
72
62
73
static void disconnect_leader (void )
@@ -129,56 +140,116 @@ static bool connect_leader(void)
129
140
130
141
static int get_connection (void )
131
142
{
132
- while (leadersock < 0 )
143
+ if (leadersock < 0 )
133
144
{
134
- if (connect_leader ()) break ;
145
+ if (connect_leader ()) return leadersock ;
135
146
136
- int timeout_ms = 1000 ;
147
+ int timeout_ms = 100 ;
137
148
struct timespec timeout = {0 , timeout_ms * 1000000 };
138
149
nanosleep (& timeout , NULL );
139
150
}
140
151
return leadersock ;
141
152
}
142
153
143
- char * raftable_get (char * key )
154
+ char * raftable_get (const char * key , size_t * len )
144
155
{
145
- return state_get (shared .state , key );
156
+ return state_get (shared .state , key , len );
146
157
}
147
158
148
159
Datum
149
160
raftable_sql_get (PG_FUNCTION_ARGS )
150
161
{
151
162
RaftableEntry * e ;
152
163
RaftableKey key ;
164
+ size_t len ;
153
165
text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
154
166
155
167
Assert (shared .state );
156
168
157
- char * s = state_get (shared .state , key .data );
169
+ char * s = state_get (shared .state , key .data , & len );
158
170
if (s )
159
171
{
160
- text * t = cstring_to_text ( s );
172
+ text * t = cstring_to_text_with_len ( s , len );
161
173
pfree (s );
162
174
PG_RETURN_TEXT_P (t );
163
175
}
164
176
else
165
177
PG_RETURN_NULL ();
166
178
}
167
179
168
- bool raftable_set ( char * key , char * value , int tries )
180
+ static void start_timer ( TimestampTz * timer )
169
181
{
170
- RaftableUpdate * ru ;
171
- size_t size = sizeof (RaftableUpdate );
172
- int keylen , vallen = 0 ;
173
- bool ok = false;
182
+ * timer -= GetCurrentTimestamp ();
183
+ }
184
+
185
+ static void stop_timer (TimestampTz * timer )
186
+ {
187
+ * timer += GetCurrentTimestamp ();
188
+ }
189
+
190
+ static long msec (TimestampTz timer )
191
+ {
192
+ long sec ;
193
+ int usec ;
194
+ TimestampDifference (0 , timer , & sec , & usec );
195
+ return sec * 1000 + usec / 1000 ;
196
+ }
197
+
198
+ static bool try_sending_update (RaftableUpdate * ru , size_t size )
199
+ {
200
+ int s = get_connection ();
174
201
175
- if (tries <= 0 )
202
+ if (s < 0 ) return false;
203
+
204
+ int sent = 0 , recved = 0 ;
205
+ int status ;
206
+
207
+ if (write (s , & size , sizeof (size )) != sizeof (size ))
208
+ {
209
+ disconnect_leader ();
210
+ elog (WARNING , "failed to send the update size to the leader" );
211
+ return false;
212
+ }
213
+
214
+ while (sent < size )
176
215
{
177
- elog (ERROR , "raftable set should be called with 'tries' > 0" );
216
+ int newbytes = write (s , (char * )ru + sent , size - sent );
217
+ if (newbytes == -1 )
218
+ {
219
+ disconnect_leader ();
220
+ elog (WARNING , "failed to send the update to the leader" );
221
+ return false;
222
+ }
223
+ sent += newbytes ;
178
224
}
179
225
226
+ recved = read (s , & status , sizeof (status ));
227
+ if (recved != sizeof (status ))
228
+ {
229
+ disconnect_leader ();
230
+ elog (WARNING , "failed to recv the update status from the leader" );
231
+ return false;
232
+ }
233
+
234
+ if (status != 1 )
235
+ {
236
+ disconnect_leader ();
237
+ elog (WARNING , "leader returned %d" , status );
238
+ return false;
239
+ }
240
+
241
+ return true;
242
+ }
243
+
244
+ bool raftable_set (const char * key , const char * value , size_t vallen , int timeout_ms )
245
+ {
246
+ RaftableUpdate * ru ;
247
+ size_t size = sizeof (RaftableUpdate );
248
+ size_t keylen = 0 ;
249
+ TimestampTz now ;
250
+ int elapsed_ms ;
251
+
180
252
keylen = strlen (key ) + 1 ;
181
- if (value ) vallen = strlen (value ) + 1 ;
182
253
183
254
size += sizeof (RaftableField ) - 1 ;
184
255
size += keylen ;
@@ -194,84 +265,54 @@ bool raftable_set(char *key, char *value, int tries)
194
265
memcpy (f -> data , key , keylen );
195
266
memcpy (f -> data + keylen , value , vallen );
196
267
197
- tryagain :
198
- if (tries -- )
268
+ elapsed_ms = 0 ;
269
+ now = GetCurrentTimestamp ();
270
+ while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
199
271
{
200
- int s = get_connection ();
201
- int sent = 0 , recved = 0 ;
202
- int status ;
203
-
204
- if (write (s , & size , sizeof (size )) != sizeof (size ))
272
+ TimestampTz past = now ;
273
+ if (try_sending_update (ru , size ))
205
274
{
206
- disconnect_leader ();
207
- elog (WARNING , "failed[%d] to send the update size to the leader" , tries );
208
- goto tryagain ;
209
- }
210
-
211
- while (sent < size )
212
- {
213
- int newbytes = write (s , (char * )ru + sent , size - sent );
214
- if (newbytes == -1 )
215
- {
216
- disconnect_leader ();
217
- elog (WARNING , "failed[%d] to send the update to the leader" , tries );
218
- goto tryagain ;
219
- }
220
- sent += newbytes ;
221
- }
222
-
223
- recved = read (s , & status , sizeof (status ));
224
- if (recved != sizeof (status ))
225
- {
226
- disconnect_leader ();
227
- elog (WARNING , "failed to recv the update status from the leader\n" );
228
- goto tryagain ;
275
+ pfree (ru );
276
+ return true;
229
277
}
230
- goto success ;
231
- }
232
- else
233
- {
234
- goto failure ;
278
+ now = GetCurrentTimestamp ();
279
+ elapsed_ms += msec (now - past );
235
280
}
236
281
237
- failure :
238
- elog (WARNING , "failed all tries to set raftable value\n" );
239
282
pfree (ru );
283
+ elog (WARNING , "failed to set raftable value after %d ms" , timeout_ms );
240
284
return false;
241
-
242
- success :
243
- pfree (ru );
244
- return true;
245
285
}
246
286
247
287
Datum
248
288
raftable_sql_set (PG_FUNCTION_ARGS )
249
289
{
250
290
char * key = text_to_cstring (PG_GETARG_TEXT_P (0 ));
251
- int tries = PG_GETARG_INT32 (2 );
291
+ int timeout_ms = PG_GETARG_INT32 (2 );
252
292
if (PG_ARGISNULL (1 ))
253
- raftable_set (key , NULL , tries );
293
+ raftable_set (key , NULL , 0 , timeout_ms );
254
294
else
255
295
{
256
296
char * value = text_to_cstring (PG_GETARG_TEXT_P (1 ));
257
- raftable_set (key , value , tries );
297
+ raftable_set (key , value , strlen ( value ), timeout_ms );
258
298
pfree (value );
259
299
}
260
300
pfree (key );
261
301
262
302
PG_RETURN_VOID ();
263
303
}
264
304
265
- void raftable_every (void (* func )(char * , char * , void * ), void * arg )
305
+ void raftable_every (void (* func )(const char * , const char * , size_t , void * ), void * arg )
266
306
{
267
307
void * scan ;
268
308
char * key , * value ;
309
+ size_t len ;
269
310
Assert (shared .state );
270
311
271
312
scan = state_scan (shared .state );
272
- while (state_next (shared .state , scan , & key , & value ))
313
+ while (state_next (shared .state , scan , & key , & value , & len ))
273
314
{
274
- func (key , value , arg );
315
+ func (key , value , len , arg );
275
316
pfree (key );
276
317
pfree (value );
277
318
}
@@ -281,6 +322,7 @@ Datum
281
322
raftable_sql_list (PG_FUNCTION_ARGS )
282
323
{
283
324
char * key , * value ;
325
+ size_t len ;
284
326
FuncCallContext * funcctx ;
285
327
MemoryContext oldcontext ;
286
328
@@ -309,14 +351,14 @@ raftable_sql_list(PG_FUNCTION_ARGS)
309
351
310
352
funcctx = SRF_PERCALL_SETUP ();
311
353
312
- if (state_next (shared .state , funcctx -> user_fctx , & key , & value ))
354
+ if (state_next (shared .state , funcctx -> user_fctx , & key , & value , & len ))
313
355
{
314
356
HeapTuple tuple ;
315
357
Datum vals [2 ];
316
358
bool isnull [2 ];
317
359
318
- vals [0 ] = CStringGetTextDatum ( key );
319
- vals [1 ] = CStringGetTextDatum ( value );
360
+ vals [0 ] = PointerGetDatum ( cstring_to_text ( key ) );
361
+ vals [1 ] = PointerGetDatum ( cstring_to_text_with_len ( value , len ) );
320
362
isnull [0 ] = isnull [1 ] = false;
321
363
322
364
tuple = heap_form_tuple (funcctx -> tuple_desc , vals , isnull );
0 commit comments