Skip to content

Commit d8c0394

Browse files
committed
Add timeouts and vallen to raftable API.
1 parent a088277 commit d8c0394

File tree

7 files changed

+149
-108
lines changed

7 files changed

+149
-108
lines changed

contrib/raftable/README

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,24 @@ The backend can also issue commands to itself through C API.
4040

4141
C API:
4242
/* Gets value by key. Returns the value or NULL if not found. */
43-
char *raftable_get(char *key);
43+
char *raftable_get(const char *key, size_t *vallen);
4444

4545
/*
46-
* Adds/updates value by key. Returns when the value gets replicated on
47-
* current machine. Storing NULL will delete the item from the table.
46+
* Adds/updates value by key. Returns when the value gets replicated.
47+
* Storing NULL will delete the item from the table. Gives up after
48+
* 'timeout_ms' milliseconds.
4849
*/
49-
void raftable_set(char *key, char *value);
50+
void raftable_set(const char *key, const char *value, size_t len, int timeout_ms);
5051

5152
/*
5253
* Iterates over all items in the table, calling func(key, value, arg)
5354
* for each of them.
5455
*/
55-
void raftable_every(void (*func)(char *, char *, void *), void *arg);
56+
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg);
5657

5758
SQL API:
5859
-- set
59-
raftable(key varchar(64), value text, tries int);
60+
raftable(key varchar(64), value text, timeout_ms int);
6061

6162
-- get
6263
raftable(key varchar(64)) returns text;

contrib/raftable/raftable--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ AS 'MODULE_PATHNAME','raftable_sql_get'
88
LANGUAGE C;
99

1010
-- set
11-
CREATE FUNCTION raftable(key varchar(64), value text, tries int)
11+
CREATE FUNCTION raftable(key varchar(64), value text, timeout_ms int)
1212
RETURNS void
1313
AS 'MODULE_PATHNAME','raftable_sql_set'
1414
LANGUAGE C;

contrib/raftable/raftable.c

Lines changed: 110 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "access/htup_details.h"
1313
#include "miscadmin.h"
1414
#include "funcapi.h"
15+
#include "utils/timestamp.h"
1516

1617
#include "raft.h"
1718
#include "util.h"
@@ -54,9 +55,19 @@ static void *get_shared_state(void)
5455

5556
static void select_next_peer(void)
5657
{
57-
do {
58-
*shared.leader = (*shared.leader + 1) % RAFTABLE_PEERS_MAX;
59-
} while (!wcfg.peers[*shared.leader].up);
58+
int orig_leader = *shared.leader;
59+
int i;
60+
for (i = 0; i < RAFTABLE_PEERS_MAX; i++)
61+
{
62+
int idx = (orig_leader + i + 1) % RAFTABLE_PEERS_MAX;
63+
HostPort *hp = wcfg.peers + idx;
64+
if (hp->up)
65+
{
66+
*shared.leader = idx;
67+
return;
68+
}
69+
}
70+
elog(WARNING, "all raftable peers down");
6071
}
6172

6273
static void disconnect_leader(void)
@@ -129,56 +140,116 @@ static bool connect_leader(void)
129140

130141
static int get_connection(void)
131142
{
132-
while (leadersock < 0)
143+
if (leadersock < 0)
133144
{
134-
if (connect_leader()) break;
145+
if (connect_leader()) return leadersock;
135146

136-
int timeout_ms = 1000;
147+
int timeout_ms = 100;
137148
struct timespec timeout = {0, timeout_ms * 1000000};
138149
nanosleep(&timeout, NULL);
139150
}
140151
return leadersock;
141152
}
142153

143-
char *raftable_get(char *key)
154+
char *raftable_get(const char *key, size_t *len)
144155
{
145-
return state_get(shared.state, key);
156+
return state_get(shared.state, key, len);
146157
}
147158

148159
Datum
149160
raftable_sql_get(PG_FUNCTION_ARGS)
150161
{
151162
RaftableEntry *e;
152163
RaftableKey key;
164+
size_t len;
153165
text_to_cstring_buffer(PG_GETARG_TEXT_P(0), key.data, sizeof(key.data));
154166

155167
Assert(shared.state);
156168

157-
char *s = state_get(shared.state, key.data);
169+
char *s = state_get(shared.state, key.data, &len);
158170
if (s)
159171
{
160-
text *t = cstring_to_text(s);
172+
text *t = cstring_to_text_with_len(s, len);
161173
pfree(s);
162174
PG_RETURN_TEXT_P(t);
163175
}
164176
else
165177
PG_RETURN_NULL();
166178
}
167179

168-
bool raftable_set(char *key, char *value, int tries)
180+
static void start_timer(TimestampTz *timer)
169181
{
170-
RaftableUpdate *ru;
171-
size_t size = sizeof(RaftableUpdate);
172-
int keylen, vallen = 0;
173-
bool ok = false;
182+
*timer -= GetCurrentTimestamp();
183+
}
184+
185+
static void stop_timer(TimestampTz *timer)
186+
{
187+
*timer += GetCurrentTimestamp();
188+
}
189+
190+
static long msec(TimestampTz timer)
191+
{
192+
long sec;
193+
int usec;
194+
TimestampDifference(0, timer, &sec, &usec);
195+
return sec * 1000 + usec / 1000;
196+
}
197+
198+
static bool try_sending_update(RaftableUpdate *ru, size_t size)
199+
{
200+
int s = get_connection();
174201

175-
if (tries <= 0)
202+
if (s < 0) return false;
203+
204+
int sent = 0, recved = 0;
205+
int status;
206+
207+
if (write(s, &size, sizeof(size)) != sizeof(size))
208+
{
209+
disconnect_leader();
210+
elog(WARNING, "failed to send the update size to the leader");
211+
return false;
212+
}
213+
214+
while (sent < size)
176215
{
177-
elog(ERROR, "raftable set should be called with 'tries' > 0");
216+
int newbytes = write(s, (char *)ru + sent, size - sent);
217+
if (newbytes == -1)
218+
{
219+
disconnect_leader();
220+
elog(WARNING, "failed to send the update to the leader");
221+
return false;
222+
}
223+
sent += newbytes;
178224
}
179225

226+
recved = read(s, &status, sizeof(status));
227+
if (recved != sizeof(status))
228+
{
229+
disconnect_leader();
230+
elog(WARNING, "failed to recv the update status from the leader");
231+
return false;
232+
}
233+
234+
if (status != 1)
235+
{
236+
disconnect_leader();
237+
elog(WARNING, "leader returned %d", status);
238+
return false;
239+
}
240+
241+
return true;
242+
}
243+
244+
bool raftable_set(const char *key, const char *value, size_t vallen, int timeout_ms)
245+
{
246+
RaftableUpdate *ru;
247+
size_t size = sizeof(RaftableUpdate);
248+
size_t keylen = 0;
249+
TimestampTz now;
250+
int elapsed_ms;
251+
180252
keylen = strlen(key) + 1;
181-
if (value) vallen = strlen(value) + 1;
182253

183254
size += sizeof(RaftableField) - 1;
184255
size += keylen;
@@ -194,84 +265,54 @@ bool raftable_set(char *key, char *value, int tries)
194265
memcpy(f->data, key, keylen);
195266
memcpy(f->data + keylen, value, vallen);
196267

197-
tryagain:
198-
if (tries--)
268+
elapsed_ms = 0;
269+
now = GetCurrentTimestamp();
270+
while ((elapsed_ms <= timeout_ms) || (timeout_ms == -1))
199271
{
200-
int s = get_connection();
201-
int sent = 0, recved = 0;
202-
int status;
203-
204-
if (write(s, &size, sizeof(size)) != sizeof(size))
272+
TimestampTz past = now;
273+
if (try_sending_update(ru, size))
205274
{
206-
disconnect_leader();
207-
elog(WARNING, "failed[%d] to send the update size to the leader", tries);
208-
goto tryagain;
209-
}
210-
211-
while (sent < size)
212-
{
213-
int newbytes = write(s, (char *)ru + sent, size - sent);
214-
if (newbytes == -1)
215-
{
216-
disconnect_leader();
217-
elog(WARNING, "failed[%d] to send the update to the leader", tries);
218-
goto tryagain;
219-
}
220-
sent += newbytes;
221-
}
222-
223-
recved = read(s, &status, sizeof(status));
224-
if (recved != sizeof(status))
225-
{
226-
disconnect_leader();
227-
elog(WARNING, "failed to recv the update status from the leader\n");
228-
goto tryagain;
275+
pfree(ru);
276+
return true;
229277
}
230-
goto success;
231-
}
232-
else
233-
{
234-
goto failure;
278+
now = GetCurrentTimestamp();
279+
elapsed_ms += msec(now - past);
235280
}
236281

237-
failure:
238-
elog(WARNING, "failed all tries to set raftable value\n");
239282
pfree(ru);
283+
elog(WARNING, "failed to set raftable value after %d ms", timeout_ms);
240284
return false;
241-
242-
success:
243-
pfree(ru);
244-
return true;
245285
}
246286

247287
Datum
248288
raftable_sql_set(PG_FUNCTION_ARGS)
249289
{
250290
char *key = text_to_cstring(PG_GETARG_TEXT_P(0));
251-
int tries = PG_GETARG_INT32(2);
291+
int timeout_ms = PG_GETARG_INT32(2);
252292
if (PG_ARGISNULL(1))
253-
raftable_set(key, NULL, tries);
293+
raftable_set(key, NULL, 0, timeout_ms);
254294
else
255295
{
256296
char *value = text_to_cstring(PG_GETARG_TEXT_P(1));
257-
raftable_set(key, value, tries);
297+
raftable_set(key, value, strlen(value), timeout_ms);
258298
pfree(value);
259299
}
260300
pfree(key);
261301

262302
PG_RETURN_VOID();
263303
}
264304

265-
void raftable_every(void (*func)(char *, char *, void *), void *arg)
305+
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg)
266306
{
267307
void *scan;
268308
char *key, *value;
309+
size_t len;
269310
Assert(shared.state);
270311

271312
scan = state_scan(shared.state);
272-
while (state_next(shared.state, scan, &key, &value))
313+
while (state_next(shared.state, scan, &key, &value, &len))
273314
{
274-
func(key, value, arg);
315+
func(key, value, len, arg);
275316
pfree(key);
276317
pfree(value);
277318
}
@@ -281,6 +322,7 @@ Datum
281322
raftable_sql_list(PG_FUNCTION_ARGS)
282323
{
283324
char *key, *value;
325+
size_t len;
284326
FuncCallContext *funcctx;
285327
MemoryContext oldcontext;
286328

@@ -309,14 +351,14 @@ raftable_sql_list(PG_FUNCTION_ARGS)
309351

310352
funcctx = SRF_PERCALL_SETUP();
311353

312-
if (state_next(shared.state, funcctx->user_fctx, &key, &value))
354+
if (state_next(shared.state, funcctx->user_fctx, &key, &value, &len))
313355
{
314356
HeapTuple tuple;
315357
Datum vals[2];
316358
bool isnull[2];
317359

318-
vals[0] = CStringGetTextDatum(key);
319-
vals[1] = CStringGetTextDatum(value);
360+
vals[0] = PointerGetDatum(cstring_to_text(key));
361+
vals[1] = PointerGetDatum(cstring_to_text_with_len(value, len));
320362
isnull[0] = isnull[1] = false;
321363

322364
tuple = heap_form_tuple(funcctx->tuple_desc, vals, isnull);

contrib/raftable/raftable.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
#define __RAFTABLE_H__
33

44
/* Gets value by key. Returns the value or NULL if not found. */
5-
char *raftable_get(char *key);
5+
char *raftable_get(const char *key, size_t *vallen);
66

77
/*
88
* Adds/updates value by key. Returns when the value gets replicated.
9-
* Storing NULL will delete the item from the table. Give up after 'tries'
10-
* tries have failed.
9+
* Storing NULL will delete the item from the table. Gives up after 'timeout_ms'
10+
* milliseconds.
1111
*/
12-
bool raftable_set(char *key, char *value, int tries);
12+
bool raftable_set(const char *key, const char *value, size_t vallen, int timeout_ms);
1313

1414
/*
1515
* Iterates over all items in the table, calling func(key, value, arg)
1616
* for each of them.
1717
*/
18-
void raftable_every(void (*func)(char *, char *, void *), void *arg);
18+
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg);
1919

2020
#endif

0 commit comments

Comments
 (0)