Skip to content

Commit 603a97a

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents bf89fd4 + 862d0e0 commit 603a97a

28 files changed

+477
-1832
lines changed

contrib/pg_dtm/dtmd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC=gcc
2-
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
2+
CFLAGS=-g -O2 -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
33
LIBUV_PREFIX=$(HOME)/libuv-build
44
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
55
LIBUV_LDFLAGS=-luv -pthread -lrt

contrib/pg_dtm/libdtm.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ void DtmInitSnapshot(Snapshot snapshot)
301301
* we are in recovery, see later comments.
302302
*/
303303
snapshot->xip = (TransactionId *)
304-
malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
304+
malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
305305
if (snapshot->xip == NULL)
306306
ereport(ERROR,
307307
(errcode(ERRCODE_OUT_OF_MEMORY),

contrib/pg_dtm/pg_dtm.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -205,22 +205,39 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
205205
if (src->xmin < dst->xmin) {
206206
dst->xmin = src->xmin;
207207
}
208-
209-
n = dst->xcnt;
210-
Assert(src->xcnt + n <= GetMaxSnapshotXidCount());
211-
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
212-
n += src->xcnt;
213-
214-
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
215-
xid = InvalidTransactionId;
216-
217-
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
218-
if (dst->xip[i] != xid) {
219-
dst->xip[j++] = xid = dst->xip[i];
220-
}
221-
}
222-
dst->xcnt = j;
223-
208+
Assert(src->subxcnt == 0);
209+
210+
if (src->xcnt + dst->subxcnt + dst->xcnt <= GetMaxSnapshotXidCount()) {
211+
Assert(dst->subxcnt == 0);
212+
memcpy(dst->xip + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
213+
n = dst->xcnt + src->xcnt;
214+
215+
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
216+
xid = InvalidTransactionId;
217+
218+
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
219+
if (dst->xip[i] != xid) {
220+
dst->xip[j++] = xid = dst->xip[i];
221+
}
222+
}
223+
dst->xcnt = j;
224+
} else {
225+
Assert(src->xcnt + dst->subxcnt + dst->xcnt <= GetMaxSnapshotSubxidCount());
226+
memcpy(dst->subxip + dst->subxcnt, dst->xip, dst->xcnt*sizeof(TransactionId));
227+
memcpy(dst->subxip + dst->subxcnt + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
228+
n = dst->xcnt + dst->subxcnt + src->xcnt;
229+
230+
qsort(dst->subxip, n, sizeof(TransactionId), xidComparator);
231+
xid = InvalidTransactionId;
232+
233+
for (i = 0, j = 0; i < n && dst->subxip[i] < dst->xmax; i++) {
234+
if (dst->subxip[i] != xid) {
235+
dst->subxip[j++] = xid = dst->subxip[i];
236+
}
237+
}
238+
dst->subxcnt = j;
239+
dst->xcnt = 0;
240+
}
224241
DumpSnapshot(dst, "merged");
225242
}
226243

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,23 @@ static void reconnect(Shub* shub)
151151
}
152152
}
153153

154+
static void notify_disconnect(Shub* shub, int chan)
155+
{
156+
ShubMessageHdr* hdr;
157+
hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
158+
hdr->size = 0;
159+
hdr->chan = chan;
160+
hdr->code = MSG_DISCONNECT;
161+
shub->in_buffer_used += sizeof(ShubMessageHdr);
162+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > shub->params->buffer_size) {
163+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
164+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
165+
reconnect(shub);
166+
}
167+
shub->in_buffer_used = 0;
168+
}
169+
}
170+
154171
static void recovery(Shub* shub)
155172
{
156173
int i, max_fd;
@@ -162,6 +179,9 @@ static void recovery(Shub* shub)
162179
FD_ZERO(&tryset);
163180
FD_SET(i, &tryset);
164181
if (select(i+1, &tryset, NULL, NULL, &tm) < 0) {
182+
if (i != shub->input && i != shub->output) {
183+
notify_disconnect(shub, i);
184+
}
165185
close_socket(shub, i);
166186
}
167187
}
@@ -259,6 +279,7 @@ void ShubLoop(Shub* shub)
259279
if (!write_socket(chan, (char*)hdr, n)) {
260280
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
261281
close_socket(shub, chan);
282+
notify_disconnect(shub, chan);
262283
chan = -1;
263284
}
264285
if (n != hdr->size + sizeof(ShubMessageHdr)) {
@@ -274,6 +295,7 @@ void ShubLoop(Shub* shub)
274295
if (chan >= 0 && !write_socket(chan, shub->out_buffer, n)) {
275296
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
276297
close_socket(shub, chan);
298+
notify_disconnect(shub, chan);
277299
chan = -1;
278300
}
279301
tail -= n;
@@ -295,6 +317,7 @@ void ShubLoop(Shub* shub)
295317
if (available < sizeof(ShubMessageHdr)) {
296318
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
297319
close_socket(shub, i);
320+
notify_disconnect(shub, i);
298321
} else {
299322
int pos = 0;
300323
/* loop through all fetched messages */
@@ -333,6 +356,7 @@ void ShubLoop(Shub* shub)
333356
if (hdr != NULL) { /* if message header is not yet sent to the server... */
334357
/* ... then skip this message */
335358
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
359+
notify_disconnect(shub, chan);
336360
break;
337361
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338362
chan = -1; /* do not try to read rest of body of this message */
@@ -351,6 +375,10 @@ void ShubLoop(Shub* shub)
351375
shub->in_buffer_used = 0;
352376
}
353377
} while (size != 0); /* repeat until all message body is received */
378+
379+
if (chan < 0) {
380+
notify_disconnect(shub, i);
381+
}
354382

355383
pos = available;
356384
break;

contrib/pg_dtm/sockhub/sockhub.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ typedef struct {
99
unsigned int chan; /* local socket: set by SockHUB */
1010
} ShubMessageHdr;
1111

12+
enum ShubMessageCodes
13+
{
14+
MSG_DISCONNECT,
15+
MSG_FIRST_USER_CODE /* all codes >= 1 are user defined */
16+
};
17+
1218
typedef enum
1319
{
1420
SHUB_FATAL_ERROR,

contrib/pg_dtm/sockhub/start-clients.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
n_clients=100
1+
n_clients=200
22
n_iters=100000
33
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
44
for ((i=0;i<n_clients;i++))
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
- hosts: master-workers
3+
roles:
4+
- role: postgrespro
5+
deploy_dtm: true
6+
7+
- hosts: workers
8+
roles:
9+
- role: postgrespro
10+
pg_port: 15432
11+
deploy_postgres: true
12+
pg_dtm_enable: true
13+
pg_dtm_host: "{{groups['master-workers'][0]}}"
14+
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
- hosts: master-workers
3+
roles:
4+
- role: postgrespro
5+
deploy_postgres: true
6+
ppg_usedtm: false
7+
8+
- hosts: workers
9+
roles:
10+
- role: postgrespro
11+
deploy_postgres: true
12+
ppg_usedtm: false
13+

contrib/pg_dtm/tests/deploy_layouts/cluster_pg_shard.yml

Whitespace-only changes.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
---
2+
- hosts: dtm
3+
gather_facts: no
4+
roles:
5+
- role: postgrespro
6+
deploy_dtm: true
7+
8+
- hosts: master-workers
9+
gather_facts: no
10+
roles:
11+
12+
- role: postgrespro
13+
deploy_postgres: true
14+
deploy_pg_shard: true
15+
ppg_usedtm: true
16+
ppg_dtmhost: "blade8" #!!!
17+
ppg:
18+
version: xtm_pgshard
19+
src: /home/stas/postgrespro
20+
dst: /home/stas/postgrespro-build
21+
log: /home/stas/ppg.log
22+
datadir: /home/stas/postgrespro-data
23+
usedtm: true
24+
node_id: 1
25+
ppg_configfile:
26+
# we should so delete/reinit on playbook, after that drop rexeps here
27+
- line: "shared_buffers = 1024MB" # 1/4 RAM
28+
regexp: "^shared_buffers "
29+
- line: "wal_keep_segments = 128"
30+
regexp: "^wal_keep_segments "
31+
- line: "fsync = off"
32+
regexp: "^fsync "
33+
- line: "autovacuum = off"
34+
regexp: "^autovacuum "
35+
- line: "listen_addresses = '*'"
36+
regexp: "^listen_addresses "
37+
- line: "port = 5432"
38+
regexp: "^port "
39+
40+

0 commit comments

Comments
 (0)