Skip to content

Commit 1552662

Browse files
committed
Fix timeouts and other fatal bugs in raftable :)
1 parent 3e6d19c commit 1552662

File tree

8 files changed

+177
-139
lines changed

8 files changed

+177
-139
lines changed

contrib/raftable/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = raftable
2-
OBJS = raftable.o worker.o state.o blockmem.o raft/obj/raft.o raft/obj/util.o
2+
OBJS = raftable.o worker.o state.o blockmem.o timeout.o raft/obj/raft.o raft/obj/util.o
33
EXTENSION = raftable
44
DATA = raftable--1.0.sql
55

contrib/raftable/blockmem.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ block_fill(void *origin, int id, void *src, size_t len)
5858
return len;
5959
}
6060

61-
void
61+
static void
6262
block_clear(void *origin, int id)
6363
{
6464
TAIL(origin, id) = 0;

contrib/raftable/raftable.c

Lines changed: 63 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
#include "access/htup_details.h"
1313
#include "miscadmin.h"
1414
#include "funcapi.h"
15-
#include "utils/timestamp.h"
1615

1716
#include "raft.h"
1817
#include "util.h"
1918

2019
#include "raftable.h"
2120
#include "worker.h"
2221
#include "state.h"
22+
#include "timeout.h"
2323

2424
#include <poll.h>
2525
#include <sys/socket.h>
@@ -81,42 +81,33 @@ static void disconnect_leader(void)
8181
leadersock = -1;
8282
}
8383

84-
static bool poll_until_writable(int sock, int timeout_ms)
84+
85+
static bool poll_until_writable(int sock, timeout_t *timeout)
8586
{
8687
struct pollfd pfd = {sock, POLLOUT, 0};
87-
int r = poll(&pfd, 1, timeout_ms);
88+
int r = poll(&pfd, 1, timeout_remaining_ms(timeout));
8889
if (r != 1) return false;
8990
return (pfd.revents & POLLOUT) != 0;
9091
}
9192

92-
static bool poll_until_readable(int sock, int timeout_ms)
93+
static bool poll_until_readable(int sock, timeout_t *timeout)
9394
{
9495
struct pollfd pfd = {sock, POLLIN, 0};
95-
int r = poll(&pfd, 1, timeout_ms);
96+
int remain = timeout_remaining_ms(timeout);
97+
int r = poll(&pfd, 1, remain);
9698
if (r != 1) return false;
9799
return (pfd.revents & POLLIN) != 0;
98100
}
99101

100-
static long msec(TimestampTz timer)
101-
{
102-
long sec;
103-
int usec;
104-
TimestampDifference(0, timer, &sec, &usec);
105-
return sec * 1000 + usec / 1000;
106-
}
107-
108-
static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
102+
static bool timed_write(int sock, void *data, size_t len, timeout_t *timeout)
109103
{
110-
TimestampTz start, now;
111104
int sent = 0;
112105

113-
now = start = GetCurrentTimestamp();
114-
115106
while (sent < len)
116107
{
117108
int newbytes;
118-
now = GetCurrentTimestamp();
119-
if ((timeout_ms != -1) && (msec(now - start) > timeout_ms)) {
109+
if (timeout_happened(timeout))
110+
{
120111
elog(WARNING, "write timed out");
121112
return false;
122113
}
@@ -125,12 +116,11 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
125116
if (newbytes == -1)
126117
{
127118
if (errno == EAGAIN) {
128-
int remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - msec(now - start);
129-
if (poll_until_writable(sock, remaining_ms)) {
119+
if (poll_until_writable(sock, timeout)) {
130120
continue;
131121
}
132122
}
133-
elog(WARNING, "failed to write: %s", strerror(errno));
123+
elog(WARNING, "failed to write: error %d: %s", errno, strerror(errno));
134124
return false;
135125
}
136126
sent += newbytes;
@@ -139,17 +129,15 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
139129
return true;
140130
}
141131

142-
static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
132+
static bool timed_read(int sock, void *data, size_t len, timeout_t *timeout)
143133
{
144134
int recved = 0;
145-
TimestampTz start, now;
146-
now = start = GetCurrentTimestamp();
147135

148136
while (recved < len)
149137
{
150138
int newbytes;
151-
now = GetCurrentTimestamp();
152-
if ((timeout_ms != -1) && (msec(now - start) > timeout_ms)) {
139+
if (timeout_happened(timeout))
140+
{
153141
elog(WARNING, "read timed out");
154142
return false;
155143
}
@@ -158,12 +146,11 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
158146
if (newbytes == -1)
159147
{
160148
if (errno == EAGAIN) {
161-
int remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - msec(now - start);
162-
if (poll_until_readable(sock, remaining_ms)) {
149+
if (poll_until_readable(sock, timeout)) {
163150
continue;
164151
}
165152
}
166-
elog(WARNING, "failed to read: %s", strerror(errno));
153+
elog(WARNING, "failed to read: error %d: %s", errno, strerror(errno));
167154
return false;
168155
}
169156
recved += newbytes;
@@ -172,16 +159,14 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
172159
return true;
173160
}
174161

175-
static bool connect_leader(int timeout_ms)
162+
static bool connect_leader(timeout_t *timeout)
176163
{
177164
struct addrinfo *addrs = NULL;
178165
struct addrinfo hint;
179166
char portstr[6];
180167
struct addrinfo *a;
181168
int rc;
182-
183-
TimestampTz now;
184-
int elapsed_ms;
169+
int sd;
185170

186171
HostPort *leaderhp;
187172

@@ -198,23 +183,21 @@ static bool connect_leader(int timeout_ms)
198183
if ((rc = getaddrinfo(leaderhp->host, portstr, &hint, &addrs)))
199184
{
200185
disconnect_leader();
201-
fprintf(stderr, "failed to resolve address '%s:%d': %s",
202-
leaderhp->host, leaderhp->port,
203-
gai_strerror(rc));
186+
elog(WARNING, "failed to resolve address '%s:%d': %s",
187+
leaderhp->host, leaderhp->port,
188+
gai_strerror(rc));
204189
return false;
205190
}
206191

207-
fprintf(stderr, "trying [%d] %s:%d\n", *shared.leader, leaderhp->host, leaderhp->port);
208-
elapsed_ms = 0;
209-
now = GetCurrentTimestamp();
192+
elog(WARNING, "trying [%d] %s:%d", *shared.leader, leaderhp->host, leaderhp->port);
210193
for (a = addrs; a != NULL; a = a->ai_next)
211194
{
212195
int one = 1;
213196

214-
int sd = socket(a->ai_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
197+
sd = socket(a->ai_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
215198
if (sd == -1)
216199
{
217-
perror("failed to create a socket");
200+
elog(WARNING, "failed to create a socket: %s", strerror(errno));
218201
continue;
219202
}
220203
setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
@@ -223,54 +206,54 @@ static bool connect_leader(int timeout_ms)
223206
{
224207
if (errno == EINPROGRESS)
225208
{
226-
while ((elapsed_ms <= timeout_ms) || (timeout_ms == -1))
209+
TIMEOUT_LOOP_START(timeout);
227210
{
228-
TimestampTz past = now;
229-
int remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - elapsed_ms;
230-
231-
if (poll_until_writable(sd, remaining_ms))
211+
if (poll_until_writable(sd, timeout))
232212
{
233213
int err;
234214
socklen_t optlen = sizeof(err);
235215
getsockopt(sd, SOL_SOCKET, SO_ERROR, &err, &optlen);
236-
if (err == 0)
237-
{
238-
// success
239-
break;
240-
}
216+
if (err == 0) goto success;
241217
}
242-
243-
now = GetCurrentTimestamp();
244-
elapsed_ms += msec(now - past);
245218
}
219+
TIMEOUT_LOOP_END(timeout);
220+
elog(WARNING, "connect timed out");
221+
goto failure;
246222
}
247223
else
248224
{
249-
perror("failed to connect to an address");
225+
elog(WARNING, "failed to connect to an address: %s", strerror(errno));
250226
close(sd);
251227
continue;
252228
}
253229
}
254230

255-
/* success */
256-
freeaddrinfo(addrs);
257-
leadersock = sd;
258-
return true;
231+
goto success;
259232
}
233+
failure:
260234
freeaddrinfo(addrs);
261235
disconnect_leader();
262-
fprintf(stderr, "could not connect\n");
236+
elog(WARNING, "could not connect");
263237
return false;
238+
success:
239+
freeaddrinfo(addrs);
240+
leadersock = sd;
241+
return true;
242+
}
243+
244+
static void wait_ms(int ms)
245+
{
246+
struct timespec ts = {0, ms * 1000000};
247+
nanosleep(&ts, NULL);
264248
}
265249

266-
static int get_connection(int timeout_ms)
250+
static int get_connection(timeout_t *timeout)
267251
{
268252
if (leadersock < 0)
269253
{
270-
if (connect_leader(timeout_ms)) return leadersock;
271-
// int timeout_ms = 100;
272-
// struct timespec timeout = {0, timeout_ms * 1000000};
273-
// nanosleep(&timeout, NULL);
254+
if (connect_leader(timeout)) return leadersock;
255+
elog(WARNING, "update: connect_leader() failed");
256+
wait_ms(100);
274257
}
275258
return leadersock;
276259
}
@@ -302,66 +285,37 @@ raftable_sql_get(PG_FUNCTION_ARGS)
302285
PG_RETURN_NULL();
303286
}
304287

305-
static bool try_sending_update(RaftableUpdate *ru, size_t size, int timeout_ms)
288+
static bool try_sending_update(RaftableUpdate *ru, size_t size, timeout_t *timeout)
306289
{
307-
int s, status, remaining_ms;
308-
TimestampTz start, now;
290+
int s, status;
309291

310-
now = start = GetCurrentTimestamp();
311-
312-
s = get_connection(timeout_ms - (now - start));
292+
s = get_connection(timeout);
313293
if (s < 0) return false;
314294

315-
now = GetCurrentTimestamp();
316-
remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - msec(now - start);
317-
if ((timeout_ms != -1) && (msec(now - start) > timeout_ms))
295+
if (timeout_happened(timeout))
318296
{
319-
elog(WARNING, "update: connect() timed out");
297+
elog(WARNING, "update: get_connection() timed out");
320298
return false;
321299
}
322300

323-
if (!timed_write(s, &size, sizeof(size), remaining_ms))
301+
if (!timed_write(s, &size, sizeof(size), timeout))
324302
{
325303
elog(WARNING, "failed to send the update size to the leader");
326304
return false;
327305
}
328306

329-
now = GetCurrentTimestamp();
330-
remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - msec(now - start);
331-
if ((timeout_ms != -1) && (msec(now - start) > timeout_ms))
332-
{
333-
elog(WARNING, "update: send(size) timed out");
334-
return false;
335-
}
336-
337-
if (!timed_write(s, ru, size, remaining_ms))
307+
if (!timed_write(s, ru, size, timeout))
338308
{
339309
elog(WARNING, "failed to send the update to the leader");
340310
return false;
341311
}
342312

343-
now = GetCurrentTimestamp();
344-
remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - msec(now - start);
345-
if ((timeout_ms != -1) && (msec(now - start) > timeout_ms))
346-
{
347-
elog(WARNING, "update: send(body) timed out");
348-
return false;
349-
}
350-
351-
if (!timed_read(s, &status, sizeof(status), remaining_ms))
313+
if (!timed_read(s, &status, sizeof(status), timeout))
352314
{
353315
elog(WARNING, "failed to recv the update status from the leader");
354316
return false;
355317
}
356318

357-
now = GetCurrentTimestamp();
358-
remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - msec(now - start);
359-
if ((timeout_ms != -1) && (msec(now - start) > timeout_ms))
360-
{
361-
elog(WARNING, "update: recv(status) timed out");
362-
return false;
363-
}
364-
365319
if (status != 1)
366320
{
367321
elog(WARNING, "update: leader returned status = %d", status);
@@ -377,8 +331,8 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
377331
RaftableUpdate *ru;
378332
size_t size = sizeof(RaftableUpdate);
379333
size_t keylen = 0;
380-
TimestampTz now;
381-
int elapsed_ms;
334+
timeout_t timeout;
335+
timeout_start(&timeout, timeout_ms);
382336

383337
Assert(wcfg.id >= 0);
384338

@@ -398,27 +352,20 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
398352
memcpy(f->data, key, keylen);
399353
memcpy(f->data + keylen, value, vallen);
400354

401-
elapsed_ms = 0;
402-
now = GetCurrentTimestamp();
403-
while ((elapsed_ms <= timeout_ms) || (timeout_ms == -1))
355+
TIMEOUT_LOOP_START(&timeout);
404356
{
405-
TimestampTz past = now;
406-
int remaining_ms = (timeout_ms == -1) ? -1 : timeout_ms - elapsed_ms;
407-
if (try_sending_update(ru, size, remaining_ms))
357+
if (try_sending_update(ru, size, &timeout))
408358
{
409359
pfree(ru);
410360
return true;
411361
}
412362
else
413-
{
414363
disconnect_leader();
415-
}
416-
now = GetCurrentTimestamp();
417-
elapsed_ms += msec(now - past);
418364
}
365+
TIMEOUT_LOOP_END(&timeout);
419366

420367
pfree(ru);
421-
elog(WARNING, "failed to set raftable value after %d ms", elapsed_ms);
368+
elog(WARNING, "failed to set raftable value after %d ms", timeout_elapsed_ms(&timeout));
422369
return false;
423370
}
424371

0 commit comments

Comments
 (0)