Skip to content

Commit b07de08

Browse files
committed
fix xlog gid writes
1 parent 3591844 commit b07de08

File tree

6 files changed

+57
-65
lines changed

6 files changed

+57
-65
lines changed

reinit.sh

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -77,47 +77,47 @@ pkill -9 postgres
7777
reinit_master
7878
reinit_master2
7979

80-
./install/bin/psql <<SQL
81-
CREATE EXTENSION pglogical;
82-
SELECT pglogical.create_node(
83-
node_name := 'provider1',
84-
dsn := 'port=5432 dbname=stas'
85-
);
86-
SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
87-
SQL
88-
89-
./install/bin/psql -p 5433 <<SQL
90-
CREATE EXTENSION pglogical;
91-
SELECT pglogical.create_node(
92-
node_name := 'subscriber1',
93-
dsn := 'port=5433 dbname=stas'
94-
);
95-
SELECT pglogical.create_subscription(
96-
subscription_name := 'subscription1',
97-
provider_dsn := 'port=5432 dbname=stas'
98-
);
99-
SQL
100-
101-
# ./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102-
10380
# ./install/bin/psql <<SQL
104-
# begin;
105-
# insert into t values (42);
106-
# prepare transaction 'hellyeah';
107-
# rollback prepared 'hellyeah';
81+
# CREATE EXTENSION pglogical;
82+
# SELECT pglogical.create_node(
83+
# node_name := 'provider1',
84+
# dsn := 'port=5432 dbname=stas'
85+
# );
86+
# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
10887
# SQL
10988

110-
# ./install/bin/psql <<SQL
111-
# SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112-
# NULL, NULL,
113-
# 'expected_encoding', 'UTF8',
114-
# 'min_proto_version', '1',
115-
# 'max_proto_version', '1',
116-
# 'startup_params_format', '1',
117-
# 'proto_format', 'json',
118-
# 'no_txinfo', 't');
89+
# ./install/bin/psql -p 5433 <<SQL
90+
# CREATE EXTENSION pglogical;
91+
# SELECT pglogical.create_node(
92+
# node_name := 'subscriber1',
93+
# dsn := 'port=5433 dbname=stas'
94+
# );
95+
# SELECT pglogical.create_subscription(
96+
# subscription_name := 'subscription1',
97+
# provider_dsn := 'port=5432 dbname=stas'
98+
# );
11999
# SQL
120100

101+
./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102+
103+
./install/bin/psql <<SQL
104+
begin;
105+
insert into t values (42);
106+
prepare transaction 'hellyeah';
107+
rollback prepared 'hellyeah';
108+
SQL
109+
110+
./install/bin/psql <<SQL
111+
SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112+
NULL, NULL,
113+
'expected_encoding', 'UTF8',
114+
'min_proto_version', '1',
115+
'max_proto_version', '1',
116+
'startup_params_format', '1',
117+
'proto_format', 'json',
118+
'no_txinfo', 't');
119+
SQL
120+
121121

122122

123123

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,9 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
100100
uint8 gidlen = xl_twophase->gidlen;
101101

102102
parsed->twophase_xid = xl_twophase->xid;
103-
104-
memcpy(parsed->twophase_gid, xl_twophase->gid, gidlen);
105-
/* Dirty! */
106-
memset(parsed->twophase_gid + gidlen, '\0', 200 - gidlen); // GIDSIZE
107-
108103
data += MinSizeOfXactTwophase;
104+
105+
strcpy(parsed->twophase_gid, data);
109106
data += gidlen;
110107
}
111108

@@ -172,12 +169,9 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
172169
uint8 gidlen = xl_twophase->gidlen;
173170

174171
parsed->twophase_xid = xl_twophase->xid;
175-
176-
memcpy(parsed->twophase_gid, xl_twophase->gid, gidlen);
177-
/* Dirty! */
178-
memset(parsed->twophase_gid + gidlen, '\0', 200 - gidlen); // GIDSIZE
179-
180172
data += MinSizeOfXactTwophase;
173+
174+
strcpy(parsed->twophase_gid, data);
181175
data += gidlen;
182176
}
183177
}

src/backend/access/transam/xact.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5149,8 +5149,7 @@ XactLogCommitRecord(TimestampTz commit_time,
51495149
{
51505150
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
51515151
xl_twophase.xid = twophase_xid;
5152-
xl_twophase.gidlen = strlen(twophase_gid);
5153-
memcpy(xl_twophase.gid, twophase_gid, xl_twophase.gidlen);
5152+
xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
51545153
}
51555154

51565155
/* dump transaction origin information */
@@ -5202,7 +5201,6 @@ XactLogCommitRecord(TimestampTz commit_time,
52025201

52035202
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
52045203
{
5205-
// Write gid only in logical mode
52065204
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
52075205
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
52085206
}
@@ -5267,8 +5265,7 @@ XactLogAbortRecord(TimestampTz abort_time,
52675265
{
52685266
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
52695267
xl_twophase.xid = twophase_xid;
5270-
xl_twophase.gidlen = strlen(twophase_gid);
5271-
memcpy(xl_twophase.gid, twophase_gid, xl_twophase.gidlen);
5268+
xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
52725269
}
52735270

52745271
if (xl_xinfo.xinfo != 0)
@@ -5301,7 +5298,6 @@ XactLogAbortRecord(TimestampTz abort_time,
53015298

53025299
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
53035300
{
5304-
// Write gid only in logical mode
53055301
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
53065302
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
53075303
}

src/backend/replication/logical/decode.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
458458
}
459459

460460
if (TransactionIdIsValid(parsed->twophase_xid)) {
461-
ReorderBufferCommitPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
462-
commit_time, origin_id, origin_lsn,
463-
parsed->twophase_gid);
461+
strcpy(ctx->reorder->gid, parsed->twophase_gid);
462+
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
463+
commit_time, origin_id, origin_lsn);
464464
return;
465465
}
466466

@@ -627,9 +627,9 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
627627
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
628628

629629
if (TransactionIdIsValid(parsed->twophase_xid)) {
630-
ReorderBufferCommitPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
631-
commit_time, origin_id, origin_lsn,
632-
parsed->twophase_gid);
630+
strcpy(ctx->reorder->gid, parsed->twophase_gid);
631+
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
632+
commit_time, origin_id, origin_lsn);
633633
return;
634634
}
635635

src/backend/replication/logical/reorderbuffer.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,12 +1612,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
16121612
PG_END_TRY();
16131613
}
16141614

1615+
1616+
/*
1617+
* Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
1618+
*/
16151619
void
1616-
ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
1620+
ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
16171621
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
16181622
TimestampTz commit_time,
1619-
RepOriginId origin_id, XLogRecPtr origin_lsn,
1620-
char *gid)
1623+
RepOriginId origin_id, XLogRecPtr origin_lsn)
16211624
{
16221625
ReorderBufferTXN *txn;
16231626

@@ -1630,7 +1633,7 @@ ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
16301633
txn->origin_id = origin_id;
16311634
txn->origin_lsn = origin_lsn;
16321635
txn->xact_action = rb->xact_action;
1633-
memcpy(txn->gid, gid, GIDSIZE);
1636+
strcpy(txn->gid, rb->gid);
16341637

16351638
rb->commit(rb, txn, commit_lsn);
16361639
}

src/include/replication/reorderbuffer.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,11 +363,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, R
363363
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
364364
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
365365
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
366-
void ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
366+
void ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
367367
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
368368
TimestampTz commit_time,
369-
RepOriginId origin_id, XLogRecPtr origin_lsn,
370-
char *gid);
369+
RepOriginId origin_id, XLogRecPtr origin_lsn);
371370
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
372371
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
373372
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);

0 commit comments

Comments
 (0)