12
12
#include "access/htup_details.h"
13
13
#include "miscadmin.h"
14
14
#include "funcapi.h"
15
- #include "utils/timestamp.h"
16
15
17
16
#include "raft.h"
18
17
#include "util.h"
19
18
20
19
#include "raftable.h"
21
20
#include "worker.h"
22
21
#include "state.h"
22
+ #include "timeout.h"
23
23
24
24
#include <poll.h>
25
25
#include <sys/socket.h>
@@ -81,42 +81,33 @@ static void disconnect_leader(void)
81
81
leadersock = -1 ;
82
82
}
83
83
84
- static bool poll_until_writable (int sock , int timeout_ms )
84
+
85
+ static bool poll_until_writable (int sock , timeout_t * timeout )
85
86
{
86
87
struct pollfd pfd = {sock , POLLOUT , 0 };
87
- int r = poll (& pfd , 1 , timeout_ms );
88
+ int r = poll (& pfd , 1 , timeout_remaining_ms ( timeout ) );
88
89
if (r != 1 ) return false;
89
90
return (pfd .revents & POLLOUT ) != 0 ;
90
91
}
91
92
92
- static bool poll_until_readable (int sock , int timeout_ms )
93
+ static bool poll_until_readable (int sock , timeout_t * timeout )
93
94
{
94
95
struct pollfd pfd = {sock , POLLIN , 0 };
95
- int r = poll (& pfd , 1 , timeout_ms );
96
+ int remain = timeout_remaining_ms (timeout );
97
+ int r = poll (& pfd , 1 , remain );
96
98
if (r != 1 ) return false;
97
99
return (pfd .revents & POLLIN ) != 0 ;
98
100
}
99
101
100
- static long msec (TimestampTz timer )
101
- {
102
- long sec ;
103
- int usec ;
104
- TimestampDifference (0 , timer , & sec , & usec );
105
- return sec * 1000 + usec / 1000 ;
106
- }
107
-
108
- static bool timed_write (int sock , void * data , size_t len , int timeout_ms )
102
+ static bool timed_write (int sock , void * data , size_t len , timeout_t * timeout )
109
103
{
110
- TimestampTz start , now ;
111
104
int sent = 0 ;
112
105
113
- now = start = GetCurrentTimestamp ();
114
-
115
106
while (sent < len )
116
107
{
117
108
int newbytes ;
118
- now = GetCurrentTimestamp ();
119
- if (( timeout_ms != -1 ) && ( msec ( now - start ) > timeout_ms )) {
109
+ if ( timeout_happened ( timeout ))
110
+ {
120
111
elog (WARNING , "write timed out" );
121
112
return false;
122
113
}
@@ -125,12 +116,11 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
125
116
if (newbytes == -1 )
126
117
{
127
118
if (errno == EAGAIN ) {
128
- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
129
- if (poll_until_writable (sock , remaining_ms )) {
119
+ if (poll_until_writable (sock , timeout )) {
130
120
continue ;
131
121
}
132
122
}
133
- elog (WARNING , "failed to write: %s" , strerror (errno ));
123
+ elog (WARNING , "failed to write: error %d: %s" , errno , strerror (errno ));
134
124
return false;
135
125
}
136
126
sent += newbytes ;
@@ -139,17 +129,15 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
139
129
return true;
140
130
}
141
131
142
- static bool timed_read (int sock , void * data , size_t len , int timeout_ms )
132
+ static bool timed_read (int sock , void * data , size_t len , timeout_t * timeout )
143
133
{
144
134
int recved = 0 ;
145
- TimestampTz start , now ;
146
- now = start = GetCurrentTimestamp ();
147
135
148
136
while (recved < len )
149
137
{
150
138
int newbytes ;
151
- now = GetCurrentTimestamp ();
152
- if (( timeout_ms != -1 ) && ( msec ( now - start ) > timeout_ms )) {
139
+ if ( timeout_happened ( timeout ))
140
+ {
153
141
elog (WARNING , "read timed out" );
154
142
return false;
155
143
}
@@ -158,12 +146,11 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
158
146
if (newbytes == -1 )
159
147
{
160
148
if (errno == EAGAIN ) {
161
- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
162
- if (poll_until_readable (sock , remaining_ms )) {
149
+ if (poll_until_readable (sock , timeout )) {
163
150
continue ;
164
151
}
165
152
}
166
- elog (WARNING , "failed to read: %s" , strerror (errno ));
153
+ elog (WARNING , "failed to read: error %d: %s" , errno , strerror (errno ));
167
154
return false;
168
155
}
169
156
recved += newbytes ;
@@ -172,16 +159,14 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
172
159
return true;
173
160
}
174
161
175
- static bool connect_leader (int timeout_ms )
162
+ static bool connect_leader (timeout_t * timeout )
176
163
{
177
164
struct addrinfo * addrs = NULL ;
178
165
struct addrinfo hint ;
179
166
char portstr [6 ];
180
167
struct addrinfo * a ;
181
168
int rc ;
182
-
183
- TimestampTz now ;
184
- int elapsed_ms ;
169
+ int sd ;
185
170
186
171
HostPort * leaderhp ;
187
172
@@ -198,23 +183,21 @@ static bool connect_leader(int timeout_ms)
198
183
if ((rc = getaddrinfo (leaderhp -> host , portstr , & hint , & addrs )))
199
184
{
200
185
disconnect_leader ();
201
- fprintf ( stderr , "failed to resolve address '%s:%d': %s" ,
202
- leaderhp -> host , leaderhp -> port ,
203
- gai_strerror (rc ));
186
+ elog ( WARNING , "failed to resolve address '%s:%d': %s" ,
187
+ leaderhp -> host , leaderhp -> port ,
188
+ gai_strerror (rc ));
204
189
return false;
205
190
}
206
191
207
- fprintf (stderr , "trying [%d] %s:%d\n" , * shared .leader , leaderhp -> host , leaderhp -> port );
208
- elapsed_ms = 0 ;
209
- now = GetCurrentTimestamp ();
192
+ elog (WARNING , "trying [%d] %s:%d" , * shared .leader , leaderhp -> host , leaderhp -> port );
210
193
for (a = addrs ; a != NULL ; a = a -> ai_next )
211
194
{
212
195
int one = 1 ;
213
196
214
- int sd = socket (a -> ai_family , SOCK_STREAM | SOCK_NONBLOCK , 0 );
197
+ sd = socket (a -> ai_family , SOCK_STREAM | SOCK_NONBLOCK , 0 );
215
198
if (sd == -1 )
216
199
{
217
- perror ( "failed to create a socket" );
200
+ elog ( WARNING , "failed to create a socket: %s" , strerror ( errno ) );
218
201
continue ;
219
202
}
220
203
setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , & one , sizeof (one ));
@@ -223,54 +206,54 @@ static bool connect_leader(int timeout_ms)
223
206
{
224
207
if (errno == EINPROGRESS )
225
208
{
226
- while (( elapsed_ms <= timeout_ms ) || ( timeout_ms == -1 ))
209
+ TIMEOUT_LOOP_START ( timeout );
227
210
{
228
- TimestampTz past = now ;
229
- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - elapsed_ms ;
230
-
231
- if (poll_until_writable (sd , remaining_ms ))
211
+ if (poll_until_writable (sd , timeout ))
232
212
{
233
213
int err ;
234
214
socklen_t optlen = sizeof (err );
235
215
getsockopt (sd , SOL_SOCKET , SO_ERROR , & err , & optlen );
236
- if (err == 0 )
237
- {
238
- // success
239
- break ;
240
- }
216
+ if (err == 0 ) goto success ;
241
217
}
242
-
243
- now = GetCurrentTimestamp ();
244
- elapsed_ms += msec (now - past );
245
218
}
219
+ TIMEOUT_LOOP_END (timeout );
220
+ elog (WARNING , "connect timed out" );
221
+ goto failure ;
246
222
}
247
223
else
248
224
{
249
- perror ( "failed to connect to an address" );
225
+ elog ( WARNING , "failed to connect to an address: %s" , strerror ( errno ) );
250
226
close (sd );
251
227
continue ;
252
228
}
253
229
}
254
230
255
- /* success */
256
- freeaddrinfo (addrs );
257
- leadersock = sd ;
258
- return true;
231
+ goto success ;
259
232
}
233
+ failure :
260
234
freeaddrinfo (addrs );
261
235
disconnect_leader ();
262
- fprintf ( stderr , "could not connect\n " );
236
+ elog ( WARNING , "could not connect" );
263
237
return false;
238
+ success :
239
+ freeaddrinfo (addrs );
240
+ leadersock = sd ;
241
+ return true;
242
+ }
243
+
244
+ static void wait_ms (int ms )
245
+ {
246
+ struct timespec ts = {0 , ms * 1000000 };
247
+ nanosleep (& ts , NULL );
264
248
}
265
249
266
- static int get_connection (int timeout_ms )
250
+ static int get_connection (timeout_t * timeout )
267
251
{
268
252
if (leadersock < 0 )
269
253
{
270
- if (connect_leader (timeout_ms )) return leadersock ;
271
- // int timeout_ms = 100;
272
- // struct timespec timeout = {0, timeout_ms * 1000000};
273
- // nanosleep(&timeout, NULL);
254
+ if (connect_leader (timeout )) return leadersock ;
255
+ elog (WARNING , "update: connect_leader() failed" );
256
+ wait_ms (100 );
274
257
}
275
258
return leadersock ;
276
259
}
@@ -302,66 +285,37 @@ raftable_sql_get(PG_FUNCTION_ARGS)
302
285
PG_RETURN_NULL ();
303
286
}
304
287
305
- static bool try_sending_update (RaftableUpdate * ru , size_t size , int timeout_ms )
288
+ static bool try_sending_update (RaftableUpdate * ru , size_t size , timeout_t * timeout )
306
289
{
307
- int s , status , remaining_ms ;
308
- TimestampTz start , now ;
290
+ int s , status ;
309
291
310
- now = start = GetCurrentTimestamp ();
311
-
312
- s = get_connection (timeout_ms - (now - start ));
292
+ s = get_connection (timeout );
313
293
if (s < 0 ) return false;
314
294
315
- now = GetCurrentTimestamp ();
316
- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
317
- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
295
+ if (timeout_happened (timeout ))
318
296
{
319
- elog (WARNING , "update: connect () timed out" );
297
+ elog (WARNING , "update: get_connection () timed out" );
320
298
return false;
321
299
}
322
300
323
- if (!timed_write (s , & size , sizeof (size ), remaining_ms ))
301
+ if (!timed_write (s , & size , sizeof (size ), timeout ))
324
302
{
325
303
elog (WARNING , "failed to send the update size to the leader" );
326
304
return false;
327
305
}
328
306
329
- now = GetCurrentTimestamp ();
330
- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
331
- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
332
- {
333
- elog (WARNING , "update: send(size) timed out" );
334
- return false;
335
- }
336
-
337
- if (!timed_write (s , ru , size , remaining_ms ))
307
+ if (!timed_write (s , ru , size , timeout ))
338
308
{
339
309
elog (WARNING , "failed to send the update to the leader" );
340
310
return false;
341
311
}
342
312
343
- now = GetCurrentTimestamp ();
344
- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
345
- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
346
- {
347
- elog (WARNING , "update: send(body) timed out" );
348
- return false;
349
- }
350
-
351
- if (!timed_read (s , & status , sizeof (status ), remaining_ms ))
313
+ if (!timed_read (s , & status , sizeof (status ), timeout ))
352
314
{
353
315
elog (WARNING , "failed to recv the update status from the leader" );
354
316
return false;
355
317
}
356
318
357
- now = GetCurrentTimestamp ();
358
- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
359
- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
360
- {
361
- elog (WARNING , "update: recv(status) timed out" );
362
- return false;
363
- }
364
-
365
319
if (status != 1 )
366
320
{
367
321
elog (WARNING , "update: leader returned status = %d" , status );
@@ -377,8 +331,8 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
377
331
RaftableUpdate * ru ;
378
332
size_t size = sizeof (RaftableUpdate );
379
333
size_t keylen = 0 ;
380
- TimestampTz now ;
381
- int elapsed_ms ;
334
+ timeout_t timeout ;
335
+ timeout_start ( & timeout , timeout_ms ) ;
382
336
383
337
Assert (wcfg .id >= 0 );
384
338
@@ -398,27 +352,20 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
398
352
memcpy (f -> data , key , keylen );
399
353
memcpy (f -> data + keylen , value , vallen );
400
354
401
- elapsed_ms = 0 ;
402
- now = GetCurrentTimestamp ();
403
- while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
355
+ TIMEOUT_LOOP_START (& timeout );
404
356
{
405
- TimestampTz past = now ;
406
- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - elapsed_ms ;
407
- if (try_sending_update (ru , size , remaining_ms ))
357
+ if (try_sending_update (ru , size , & timeout ))
408
358
{
409
359
pfree (ru );
410
360
return true;
411
361
}
412
362
else
413
- {
414
363
disconnect_leader ();
415
- }
416
- now = GetCurrentTimestamp ();
417
- elapsed_ms += msec (now - past );
418
364
}
365
+ TIMEOUT_LOOP_END (& timeout );
419
366
420
367
pfree (ru );
421
- elog (WARNING , "failed to set raftable value after %d ms" , elapsed_ms );
368
+ elog (WARNING , "failed to set raftable value after %d ms" , timeout_elapsed_ms ( & timeout ) );
422
369
return false;
423
370
}
424
371
0 commit comments