Skip to content

Commit f7cf59c

Browse files
committed
2 parents edfb332 + af2424d commit f7cf59c

File tree

17 files changed

+116
-89
lines changed

17 files changed

+116
-89
lines changed

contrib/pg_dtm/dtmd/include/limits.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66

77
#define BUFFER_SIZE (64 * 1024)
88
#define LISTEN_QUEUE_SIZE 100
9-
#define MAX_STREAMS 128
9+
#define MAX_STREAMS 1024
1010

1111
#endif

contrib/pg_dtm/dtmd/include/server.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define SERVER_H
33

44
#include <stdbool.h>
5+
#include "int.h"
56

67
/*
78
* You should not want to know what is inside those structures.
@@ -95,6 +96,6 @@ bool client_message_finish(client_t client);
9596
*
9697
* Returns 'true' on success, 'false' otherwise.
9798
*/
98-
bool client_message_shortcut(client_t client, long long arg);
99+
bool client_message_shortcut(client_t client, xid_t arg);
99100

100101
#endif

contrib/pg_dtm/dtmd/src/clog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ int clog_read(clog_t clog, xid_t xid) {
133133
bool clog_write(clog_t clog, xid_t xid, int status) {
134134
clogfile_t *file = clog_xid_to_file(clog, xid);
135135
if (!file) {
136-
debug("xid %016llx out of range, creating the file\n", xid);
136+
debug("xid %u out of range, creating the file\n", xid);
137137
clogfile_t newfile;
138138
if (!clogfile_open_by_id(&newfile, clog->datadir, XID_TO_FILEID(xid), true)) {
139139
shout(

contrib/pg_dtm/dtmd/src/server.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ bool client_message_finish(client_t client) {
325325
return stream_message_finish(client->stream);
326326
}
327327

328-
bool client_message_shortcut(client_t client, long long arg) {
328+
bool client_message_shortcut(client_t client, xid_t arg) {
329329
if (!stream_message_start(client->stream, client->chan)) {
330330
return false;
331331
}

contrib/pg_dtm/libdtm.c

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ static DTMConn DtmConnect(char *host, int port)
3939
DTMConn dtm;
4040
int sd;
4141

42-
if (strcmp(host, "localhost") == 0)
42+
if (host == NULL)
4343
{
44+
// use a UNIX socket
4445
struct sockaddr sock;
4546
int len = offsetof(struct sockaddr, sa_data) + snprintf(sock.sa_data, sizeof(sock.sa_data), "%s/p%u", dtm_unix_sock_dir, port);
4647
sock.sa_family = AF_UNIX;
@@ -62,6 +63,7 @@ static DTMConn DtmConnect(char *host, int port)
6263
}
6364
else
6465
{
66+
// use an IP socket
6567
struct addrinfo *addrs = NULL;
6668
struct addrinfo hint;
6769
char portstr[6];
@@ -227,7 +229,9 @@ void DtmGlobalConfig(char *host, int port, char* sock_dir) {
227229
free(dtmhost);
228230
dtmhost = NULL;
229231
}
230-
dtmhost = strdup(host);
232+
if (host) {
233+
dtmhost = strdup(host);
234+
}
231235
dtmport = port;
232236
dtm_unix_sock_dir = sock_dir;
233237
}
@@ -237,14 +241,14 @@ static DTMConn GetConnection()
237241
static DTMConn dtm = NULL;
238242
if (dtm == NULL)
239243
{
240-
if (dtmhost) {
241-
dtm = DtmConnect(dtmhost, dtmport);
242-
if (dtm == NULL)
243-
{
244-
elog(ERROR, "Failed to connect to DTMD %s:%d", dtmhost, dtmport);
244+
dtm = DtmConnect(dtmhost, dtmport);
245+
if (dtm == NULL)
246+
{
247+
if (dtmhost) {
248+
elog(ERROR, "Failed to connect to DTMD at tcp %s:%d", dtmhost, dtmport);
249+
} else {
250+
elog(ERROR, "Failed to connect to DTMD at unix %d", dtmport);
245251
}
246-
} else {
247-
/* elog(ERROR, "DTMD address not specified"); */
248252
}
249253
}
250254
return dtm;

contrib/pg_dtm/pg_dtm.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -654,9 +654,6 @@ static void DtmInitialize()
654654
dtm->nReservedXids = 0;
655655
dtm->minXid = InvalidTransactionId;
656656
RegisterXactCallback(DtmXactCallback, NULL);
657-
if (DtmBufferSize != 0) {
658-
RegisterBackgroundWorker(&DtmWorker);
659-
}
660657
}
661658
LWLockRelease(AddinShmemInitLock);
662659

@@ -793,7 +790,13 @@ _PG_init(void)
793790
NULL
794791
);
795792

796-
DtmGlobalConfig(DtmHost, DtmPort, Unix_socket_directories);
793+
794+
if (DtmBufferSize != 0) {
795+
DtmGlobalConfig(NULL, DtmPort, Unix_socket_directories);
796+
RegisterBackgroundWorker(&DtmWorker);
797+
} else {
798+
DtmGlobalConfig(DtmHost, DtmPort, Unix_socket_directories);
799+
}
797800

798801
/*
799802
* Install hooks.
@@ -899,9 +902,6 @@ void DtmBackgroundWorker(Datum arg)
899902
params.file = unix_sock_path;
900903
params.buffer_size = DtmBufferSize;
901904

902-
DtmGlobalConfig("localhost", DtmPort, Unix_socket_directories);
903-
904905
ShubInitialize(&shub, &params);
905-
906906
ShubLoop(&shub);
907907
}

contrib/pg_dtm/tests/daemons.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9191
bin,
9292
"-D", datadir,
9393
"-p", strconv.Itoa(port),
94+
"-c", "dtm.buffer_size=65536",
9495
"-c", "dtm.host=127.0.0.1",
9596
"-c", "dtm.port=" + strconv.Itoa(5431),
9697
"-c", "autovacuum=off",

contrib/pg_dtm/tests/deploy_layouts/cluster.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
- hosts: master-workers
2+
- hosts: dtm
33
roles:
44
- role: postgrespro
55
deploy_dtm: true
@@ -10,5 +10,5 @@
1010
pg_port: 15432
1111
deploy_postgres: true
1212
pg_dtm_enable: true
13-
pg_dtm_host: "{{groups['master-workers'][0]}}"
13+
pg_dtm_host: "{{ groups['dtm'][0] }}"
1414

contrib/pg_dtm/tests/deploy_layouts/cluster_nodtm.yml

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
- hosts: master-workers
3+
roles:
4+
- role: postgrespro
5+
deploy_dtm: true
6+
7+
- hosts: master-workers:workers
8+
roles:
9+
- role: postgrespro
10+
pg_port: 15432
11+
deploy_postgres: true
12+
deploy_pg_shard: true
13+
pg_dtm_enable: true
14+
pg_dtm_host: "{{ groups['master-workers'][0] }}"
15+

0 commit comments

Comments
 (0)