Skip to content

Commit 75624cd

Browse files
committed
Merge RDMA branch
2 parents 440165f + c761fc3 commit 75624cd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3660
-395
lines changed

configure

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ with_perl
723723
with_tcl
724724
enable_thread_safety
725725
INCLUDES
726+
with_rsocket
726727
autodepend
727728
TAS
728729
GCC
@@ -826,6 +827,7 @@ with_wal_segsize
826827
with_CC
827828
enable_depend
828829
enable_cassert
830+
with_rsocket
829831
enable_thread_safety
830832
with_tcl
831833
with_tclconfig
@@ -1518,6 +1520,7 @@ Optional Packages:
15181520
--with-wal-segsize=SEGSIZE
15191521
set WAL segment size in MB [16]
15201522
--with-CC=CMD set compiler (deprecated)
1523+
--with-rsocket replace socket with rsocket (RDMA socket API)
15211524
--with-tcl build Tcl modules (PL/Tcl)
15221525
--with-tclconfig=DIR tclConfig.sh is in DIR
15231526
--with-perl build Perl modules (PL/Perl)
@@ -5301,6 +5304,41 @@ fi
53015304

53025305

53035306

5307+
#
5308+
# Replace socket with rsocket
5309+
#
5310+
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with rsocket support" >&5
5311+
$as_echo_n "checking whether to build with rsocket support... " >&6; }
5312+
5313+
5314+
5315+
# Check whether --with-rsocket was given.
5316+
if test "${with_rsocket+set}" = set; then :
5317+
withval=$with_rsocket;
5318+
case $withval in
5319+
yes)
5320+
5321+
$as_echo "#define WITH_RSOCKET 1" >>confdefs.h
5322+
5323+
;;
5324+
no)
5325+
:
5326+
;;
5327+
*)
5328+
as_fn_error $? "no argument expected for --with-rsocket option" "$LINENO" 5
5329+
;;
5330+
esac
5331+
5332+
else
5333+
with_rsocket=no
5334+
5335+
fi
5336+
5337+
5338+
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_rsocket" >&5
5339+
$as_echo "$with_rsocket" >&6; }
5340+
5341+
53045342
#
53055343
# Include directories
53065344
#

configure.in

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,16 @@ PGAC_ARG_BOOL(enable, cassert, no, [enable assertion checks (for debugging)],
574574
[Define to 1 to build with assertion checks. (--enable-cassert)])])
575575

576576

577+
#
578+
# Replace socket with rsocket
579+
#
580+
AC_MSG_CHECKING([whether to build with rsocket support])
581+
PGAC_ARG_BOOL(with, rsocket, no, [replace socket with rsocket (RDMA socket API)],
582+
[AC_DEFINE([WITH_RSOCKET], 1,
583+
[Define to 1 to build with rsocket instead socket. (--with-rsocket)])])
584+
AC_MSG_RESULT([$with_rsocket])
585+
AC_SUBST(with_rsocket)
586+
577587
#
578588
# Include directories
579589
#

contrib/mmts/arbiter.c

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "postgres.h"
2222
#include "fmgr.h"
2323
#include "miscadmin.h"
24+
#include "pg_socket.h"
2425
#include "postmaster/postmaster.h"
2526
#include "postmaster/bgworker.h"
2627
#include "storage/s_lock.h"
@@ -58,6 +59,7 @@
5859
#include "tcop/utility.h"
5960
#include "libpq/ip.h"
6061

62+
6163
#ifndef USE_EPOLL
6264
#ifdef __linux__
6365
#define USE_EPOLL 0
@@ -185,7 +187,7 @@ static void MtmUnregisterSocket(int fd)
185187
static void MtmDisconnect(int node)
186188
{
187189
MtmUnregisterSocket(sockets[node]);
188-
close(sockets[node]);
190+
pg_closesocket(sockets[node], MtmUseRDMA);
189191
sockets[node] = -1;
190192
MtmOnNodeDisconnect(node+1);
191193
}
@@ -208,7 +210,7 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
208210
FD_SET(sd, &set);
209211
tv.tv_sec = (deadline - now)/USECS_PER_SEC;
210212
tv.tv_usec = (deadline - now)%USECS_PER_SEC;
211-
} while ((rc = select(sd+1, forWrite ? NULL : &set, forWrite ? &set : NULL, NULL, &tv)) < 0 && errno == EINTR);
213+
} while ((rc = pg_select(sd+1, forWrite ? NULL : &set, forWrite ? &set : NULL, NULL, &tv, MtmUseRDMA)) < 0 && errno == EINTR);
212214

213215
return rc;
214216
}
@@ -219,7 +221,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
219221
while (size != 0) {
220222
int rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
221223
if (rc == 1) {
222-
while ((rc = send(sd, src, size, 0)) < 0 && errno == EINTR);
224+
while ((rc = pg_send(sd, src, size, 0, MtmUseRDMA)) < 0 && errno == EINTR);
223225
if (rc < 0) {
224226
if (errno == EINPROGRESS) {
225227
continue;
@@ -238,11 +240,11 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
238240
static int MtmReadSocket(int sd, void* buf, int buf_size)
239241
{
240242
int rc;
241-
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
243+
while ((rc = pg_recv(sd, buf, buf_size, 0, MtmUseRDMA)) < 0 && errno == EINTR);
242244
if (rc <= 0 && (errno == EAGAIN || errno == EINPROGRESS)) {
243245
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
244246
if (rc == 1) {
245-
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
247+
while ((rc = pg_recv(sd, buf, buf_size, 0, MtmUseRDMA)) < 0 && errno == EINTR);
246248
}
247249
}
248250
return rc;
@@ -254,25 +256,25 @@ static void MtmSetSocketOptions(int sd)
254256
{
255257
#ifdef TCP_NODELAY
256258
int on = 1;
257-
if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&on, sizeof(on)) < 0) {
259+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&on, sizeof(on), MtmUseRDMA) < 0) {
258260
MTM_ELOG(WARNING, "Failed to set TCP_NODELAY: %m");
259261
}
260262
#endif
261-
if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char const*)&on, sizeof(on)) < 0) {
263+
if (pg_setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char const*)&on, sizeof(on), MtmUseRDMA) < 0) {
262264
MTM_ELOG(WARNING, "Failed to set SO_KEEPALIVE: %m");
263265
}
264266

265267
if (tcp_keepalives_idle) {
266268
#ifdef TCP_KEEPIDLE
267-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPIDLE,
268-
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle)) < 0)
269+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPIDLE,
270+
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle), MtmUseRDMA) < 0)
269271
{
270272
MTM_ELOG(WARNING, "Failed to set TCP_KEEPIDLE: %m");
271273
}
272274
#else
273275
#ifdef TCP_KEEPALIVE
274-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPALIVE,
275-
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle)) < 0)
276+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPALIVE,
277+
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle), MtmUseRDMA) < 0)
276278
{
277279
MTM_ELOG(WARNING, "Failed to set TCP_KEEPALIVE: %m");
278280
}
@@ -281,17 +283,17 @@ static void MtmSetSocketOptions(int sd)
281283
}
282284
#ifdef TCP_KEEPINTVL
283285
if (tcp_keepalives_interval) {
284-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPINTVL,
285-
(char *) &tcp_keepalives_interval, sizeof(tcp_keepalives_interval)) < 0)
286+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPINTVL,
287+
(char *) &tcp_keepalives_interval, sizeof(tcp_keepalives_interval), MtmUseRDMA) < 0)
286288
{
287289
MTM_ELOG(WARNING, "Failed to set TCP_KEEPINTVL: %m");
288290
}
289291
}
290292
#endif
291293
#ifdef TCP_KEEPCNT
292294
if (tcp_keepalives_count) {
293-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPCNT,
294-
(char *) &tcp_keepalives_count, sizeof(tcp_keepalives_count)) < 0)
295+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPCNT,
296+
(char *) &tcp_keepalives_count, sizeof(tcp_keepalives_count), MtmUseRDMA) < 0)
295297
{
296298
MTM_ELOG(WARNING, "Failed to set TCP_KEEPCNT: %m");
297299
}
@@ -376,7 +378,7 @@ static void MtmSendHeartbeat()
376378
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
377379
if (BIT_CHECK(SELF_CONNECTIVITY_MASK, i)) {
378380
MTM_LOG1("Force reconnect to node %d", i+1);
379-
close(sockets[i]);
381+
pg_closesocket(sockets[i], MtmUseRDMA);
380382
sockets[i] = -1;
381383
MtmReconnectNode(i+1); /* set reconnect mask to force node reconnent */
382384
}
@@ -431,22 +433,22 @@ static int MtmConnectSocket(int node, int port)
431433
}
432434
BIT_SET(busy_mask, node);
433435

434-
Retry:
436+
Retry:
435437

436-
sd = socket(AF_INET, SOCK_STREAM, 0);
438+
sd = pg_socket(AF_INET, SOCK_STREAM, 0, MtmUseRDMA);
437439
if (sd < 0) {
438440
MTM_ELOG(LOG, "Arbiter failed to create socket: %s", strerror(errno));
439441
goto Error;
440442
}
441-
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
443+
rc = pg_fcntl(sd, F_SETFL, O_NONBLOCK, MtmUseRDMA);
442444
if (rc < 0) {
443445
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %s", strerror(errno));
444446
goto Error;
445447
}
446448
for (addr = addrs; addr != NULL; addr = addr->ai_next)
447449
{
448450
do {
449-
rc = connect(sd, addr->ai_addr, addr->ai_addrlen);
451+
rc = pg_connect(sd, addr->ai_addr, addr->ai_addrlen, MtmUseRDMA);
450452
} while (rc < 0 && errno == EINTR);
451453

452454
if (rc >= 0 || errno == EINPROGRESS) {
@@ -460,7 +462,7 @@ static int MtmConnectSocket(int node, int port)
460462
socklen_t optlen = sizeof(int);
461463
int errcode;
462464

463-
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&errcode, &optlen) < 0) {
465+
if (pg_getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&errcode, &optlen, MtmUseRDMA) < 0) {
464466
MTM_ELOG(WARNING, "Arbiter failed to getsockopt for %s:%d: %s", host, port, strerror(errcode));
465467
goto Error;
466468
}
@@ -490,17 +492,17 @@ static int MtmConnectSocket(int node, int port)
490492
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
491493
if (!MtmWriteSocket(sd, &req, sizeof req)) {
492494
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %s", host, port, strerror(errno));
493-
close(sd);
495+
pg_closesocket(sd, MtmUseRDMA);
494496
goto Retry;
495497
}
496498
if (MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
497499
MTM_ELOG(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: %s", host, port, strerror(errno));
498-
close(sd);
500+
pg_closesocket(sd, MtmUseRDMA);
499501
goto Retry;
500502
}
501503
if (resp.code != MSG_STATUS || resp.dxid != HANDSHAKE_MAGIC) {
502504
MTM_ELOG(WARNING, "Arbiter get unexpected response %d for handshake message from %s:%d", resp.code, host, port);
503-
close(sd);
505+
pg_closesocket(sd, MtmUseRDMA);
504506
goto Retry;
505507
}
506508
if (addrs)
@@ -519,7 +521,7 @@ static int MtmConnectSocket(int node, int port)
519521
Error:
520522
busy_mask = save_mask;
521523
if (sd >= 0) {
522-
close(sd);
524+
pg_closesocket(sd, MtmUseRDMA);
523525
}
524526
if (addrs) {
525527
pg_freeaddrinfo_all(hint.ai_family, addrs);
@@ -567,7 +569,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
567569
*/
568570
if (sockets[node] >= 0 && BIT_CHECK(Mtm->reconnectMask, node)) {
569571
MTM_ELOG(WARNING, "Arbiter is forced to reconnect to node %d", node+1);
570-
close(sockets[node]);
572+
pg_closesocket(sockets[node], MtmUseRDMA);
571573
sockets[node] = -1;
572574
}
573575
#endif
@@ -579,7 +581,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
579581
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
580582
if (sockets[node] >= 0) {
581583
MTM_ELOG(WARNING, "Arbiter fail to write to node %d: %s", node+1, strerror(errno));
582-
close(sockets[node]);
584+
pg_closesocket(sockets[node], MtmUseRDMA);
583585
sockets[node] = -1;
584586
}
585587
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort);
@@ -610,23 +612,23 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
610612

611613
static void MtmAcceptOneConnection()
612614
{
613-
int fd = accept(gateway, NULL, NULL);
615+
int fd = pg_accept(gateway, NULL, NULL, MtmUseRDMA);
614616
if (fd < 0) {
615617
MTM_ELOG(WARNING, "Arbiter failed to accept socket: %s", strerror(errno));
616618
} else {
617619
MtmHandshakeMessage req;
618620
MtmArbiterMessage resp;
619-
int rc = fcntl(fd, F_SETFL, O_NONBLOCK);
621+
int rc = pg_fcntl(fd, F_SETFL, O_NONBLOCK, MtmUseRDMA);
620622
if (rc < 0) {
621623
MTM_ELOG(ERROR, "Arbiter failed to switch socket to non-blocking mode: %s", strerror(errno));
622624
}
623625
rc = MtmReadSocket(fd, &req, sizeof req);
624626
if (rc < sizeof(req)) {
625-
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %s", strerror(errno));
626-
close(fd);
627+
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %d, errno=%d", rc, errno);
628+
pg_closesocket(fd, MtmUseRDMA);
627629
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
628-
MTM_ELOG(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
629-
close(fd);
630+
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %s", strerror(errno));
631+
pg_closesocket(fd, MtmUseRDMA);
630632
} else {
631633
int node = req.hdr.node-1;
632634
Assert(node >= 0 && node < Mtm->nAllNodes && node+1 != MtmNodeId);
@@ -643,7 +645,7 @@ static void MtmAcceptOneConnection()
643645
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con, req.connStr);
644646
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
645647
MTM_ELOG(WARNING, "Arbiter failed to write response for handshake message to node %d", node+1);
646-
close(fd);
648+
pg_closesocket(fd, MtmUseRDMA);
647649
} else {
648650
MTM_LOG1("Arbiter established connection with node %d", node+1);
649651
if (sockets[node] >= 0) {
@@ -673,18 +675,18 @@ static void MtmAcceptIncomingConnections()
673675
sock_inet.sin_addr.s_addr = htonl(INADDR_ANY);
674676
sock_inet.sin_port = htons(MtmArbiterPort);
675677

676-
gateway = socket(sock_inet.sin_family, SOCK_STREAM, 0);
678+
gateway = pg_socket(sock_inet.sin_family, SOCK_STREAM, 0, MtmUseRDMA);
677679
if (gateway < 0) {
678680
MTM_ELOG(ERROR, "Arbiter failed to create socket: %s", strerror(errno));
679681
}
680-
if (setsockopt(gateway, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on) < 0) {
682+
if (pg_setsockopt(gateway, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on, MtmUseRDMA) < 0) {
681683
MTM_ELOG(ERROR, "Arbiter failed to set options for socket: %s", strerror(errno));
682684
}
683685

684-
if (bind(gateway, (struct sockaddr*)&sock_inet, sizeof(sock_inet)) < 0) {
686+
if (pg_bind(gateway, (struct sockaddr*)&sock_inet, sizeof(sock_inet), MtmUseRDMA) < 0) {
685687
MTM_ELOG(ERROR, "Arbiter failed to bind socket: %s", strerror(errno));
686688
}
687-
if (listen(gateway, nNodes) < 0) {
689+
if (pg_listen(gateway, nNodes, MtmUseRDMA) < 0) {
688690
MTM_ELOG(ERROR, "Arbiter failed to listen socket: %s", strerror(errno));
689691
}
690692

@@ -787,7 +789,7 @@ static bool MtmRecovery()
787789
fd_set tryset;
788790
FD_ZERO(&tryset);
789791
FD_SET(sd, &tryset);
790-
if (select(sd+1, &tryset, NULL, NULL, &tm) < 0) {
792+
if (pg_select(sd+1, &tryset, NULL, NULL, &tm, MtmUseRDMA) < 0) {
791793
MTM_ELOG(WARNING, "Arbiter lost connection with node %d", i+1);
792794
MtmDisconnect(i);
793795
recovered = true;
@@ -884,7 +886,7 @@ static void MtmReceiver(Datum arg)
884886
tv.tv_sec = selectTimeout/1000;
885887
tv.tv_usec = selectTimeout%1000*1000;
886888
do {
887-
n = select(max_fd+1, &events, NULL, NULL, &tv);
889+
n = pg_select(max_fd+1, &events, NULL, NULL, &tv, MtmUseRDMA);
888890
} while (n < 0 && errno == EINTR);
889891
} while (n < 0 && MtmRecovery());
890892

contrib/mmts/multimaster.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ int MtmHeartbeatRecvTimeout;
252252
int MtmMin2PCTimeout;
253253
int MtmMax2PCRatio;
254254
bool MtmUseDtm;
255+
bool MtmUseRDMA;
255256
bool MtmPreserveCommitOrder;
256257
bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
257258

@@ -3295,6 +3296,19 @@ _PG_init(void)
32953296
NULL
32963297
);
32973298

3299+
DefineCustomBoolVariable(
3300+
"multimaster.use_rdma",
3301+
"Use RDMA sockets",
3302+
NULL,
3303+
&MtmUseRDMA,
3304+
false,
3305+
PGC_POSTMASTER,
3306+
0,
3307+
NULL,
3308+
NULL,
3309+
NULL
3310+
);
3311+
32983312
DefineCustomBoolVariable(
32993313
"multimaster.preserve_commit_order",
33003314
"Transactions from one node will be committed in same order al all nodes",

0 commit comments

Comments
 (0)