Skip to content

Commit 45d0f5d

Browse files
committed
write GID in WAL on commit prepared. Decode it in logical mode. Support JSON output plugin
1 parent 687459b commit 45d0f5d

File tree

9 files changed

+86
-54
lines changed

9 files changed

+86
-54
lines changed

contrib/pglogical_output/pglogical_proto_json.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
9898
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
9999
{
100100
appendStringInfoString(out, "\"action\":\"CP\"");
101+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid); /* NB: Add \0 at the end, if we are using all 200 bytes in GID */
101102
}
102103
else
103104
{

reinit.sh

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ reinit_master() {
2929
./install/bin/pg_ctl -sw -D ./install/data -l ./install/data/logfile start
3030
./install/bin/createdb stas
3131
./install/bin/psql -c "create table t(id int primary key, v int);"
32-
# ./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
33-
# ./install/bin/psql <<SQL
34-
# begin;
35-
# insert into t values (42);
36-
# prepare transaction 'x';
37-
# commit prepared 'x';
38-
# SQL
32+
./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
33+
./install/bin/psql <<SQL
34+
begin;
35+
insert into t values (42);
36+
prepare transaction 'hellyeah';
37+
commit prepared 'hellyeah';
38+
SQL
3939
}
4040

4141
reinit_master2() {
@@ -87,37 +87,40 @@ cd ../..
8787

8888
pkill -9 postgres
8989
reinit_master
90-
reinit_master2
90+
# reinit_master2
91+
92+
# ./install/bin/psql <<SQL
93+
# CREATE EXTENSION pglogical;
94+
# SELECT pglogical.create_node(
95+
# node_name := 'provider1',
96+
# dsn := 'port=5432 dbname=stas'
97+
# );
98+
# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
99+
# SQL
91100

92-
./install/bin/psql <<SQL
93-
CREATE EXTENSION pglogical;
94-
SELECT pglogical.create_node(
95-
node_name := 'provider1',
96-
dsn := 'port=5432 dbname=stas'
97-
);
98-
SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
99-
SQL
101+
# ./install/bin/psql -p 5433 <<SQL
102+
# CREATE EXTENSION pglogical;
103+
# SELECT pglogical.create_node(
104+
# node_name := 'subscriber1',
105+
# dsn := 'port=5433 dbname=stas'
106+
# );
107+
# SELECT pglogical.create_subscription(
108+
# subscription_name := 'subscription1',
109+
# provider_dsn := 'port=5432 dbname=stas'
110+
# );
111+
# SQL
100112

101-
./install/bin/psql -p 5433 <<SQL
102-
CREATE EXTENSION pglogical;
103-
SELECT pglogical.create_node(
104-
node_name := 'subscriber1',
105-
dsn := 'port=5433 dbname=stas'
106-
);
107-
SELECT pglogical.create_subscription(
108-
subscription_name := 'subscription1',
109-
provider_dsn := 'port=5432 dbname=stas'
110-
);
113+
./install/bin/psql <<SQL
114+
SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
115+
NULL, NULL,
116+
'expected_encoding', 'UTF8',
117+
'min_proto_version', '1',
118+
'max_proto_version', '1',
119+
'startup_params_format', '1',
120+
'proto_format', 'json',
121+
'no_txinfo', 't');
111122
SQL
112123

113124

114-
# SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
115-
# NULL, NULL,
116-
# 'expected_encoding', 'UTF8',
117-
# 'min_proto_version', '1',
118-
# 'max_proto_version', '1',
119-
# 'startup_params_format', '1',
120-
# 'proto_format', 'json',
121-
# 'no_txinfo', 't');
122125

123126

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,16 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
9797
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
9898
{
9999
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
100+
uint8 gidlen = xl_twophase->gidlen;
100101

101102
parsed->twophase_xid = xl_twophase->xid;
102103

103-
data += sizeof(xl_xact_twophase);
104+
memcpy(parsed->twophase_gid, xl_twophase->gid, gidlen);
105+
/* Dirty! */
106+
memset(parsed->twophase_gid + gidlen, '\0', 200 - gidlen); // GIDSIZE
107+
108+
data += MinSizeOfXactTwophase;
109+
data += gidlen;
104110
}
105111

106112
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)

src/backend/access/transam/twophase.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
184184
RelFileNode *rels,
185185
int ninvalmsgs,
186186
SharedInvalidationMessage *invalmsgs,
187-
bool initfileinval);
187+
bool initfileinval,
188+
const char *gid);
188189
static void RecordTransactionAbortPrepared(TransactionId xid,
189190
int nchildren,
190191
TransactionId *children,
191192
int nrels,
192-
RelFileNode *rels);
193+
RelFileNode *rels,
194+
const char *gid);
193195
static void ProcessRecords(char *bufptr, TransactionId xid,
194196
const TwoPhaseCallback callbacks[]);
195197
static void RemoveGXact(GlobalTransaction gxact);
@@ -1340,11 +1342,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13401342
hdr->nsubxacts, children,
13411343
hdr->ncommitrels, commitrels,
13421344
hdr->ninvalmsgs, invalmsgs,
1343-
hdr->initfileinval);
1345+
hdr->initfileinval, gid);
13441346
else
13451347
RecordTransactionAbortPrepared(xid,
13461348
hdr->nsubxacts, children,
1347-
hdr->nabortrels, abortrels);
1349+
hdr->nabortrels, abortrels,
1350+
gid);
13481351

13491352
ProcArrayRemove(proc, latestXid);
13501353

@@ -1981,7 +1984,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
19811984
RelFileNode *rels,
19821985
int ninvalmsgs,
19831986
SharedInvalidationMessage *invalmsgs,
1984-
bool initfileinval)
1987+
bool initfileinval,
1988+
const char *gid)
19851989
{
19861990
XLogRecPtr recptr;
19871991
TimestampTz committs = GetCurrentTimestamp();
@@ -2004,7 +2008,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
20042008
nchildren, children, nrels, rels,
20052009
ninvalmsgs, invalmsgs,
20062010
initfileinval, false,
2007-
xid);
2011+
xid, gid);
20082012

20092013

20102014
if (replorigin)
@@ -2066,7 +2070,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
20662070
int nchildren,
20672071
TransactionId *children,
20682072
int nrels,
2069-
RelFileNode *rels)
2073+
RelFileNode *rels,
2074+
const char *gid)
20702075
{
20712076
XLogRecPtr recptr;
20722077

@@ -2084,7 +2089,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
20842089
recptr = XactLogAbortRecord(GetCurrentTimestamp(),
20852090
nchildren, children,
20862091
nrels, rels,
2087-
xid);
2092+
xid, gid);
20882093

20892094
/* Always flush, since we're about to remove the 2PC state file */
20902095
XLogFlush(recptr);

src/backend/access/transam/xact.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,7 +1215,7 @@ RecordTransactionCommit(void)
12151215
nchildren, children, nrels, rels,
12161216
nmsgs, invalMessages,
12171217
RelcacheInitFileInval, forceSyncCommit,
1218-
InvalidTransactionId /* plain commit */ );
1218+
InvalidTransactionId, NULL /* plain commit */ );
12191219

12201220
if (replorigin)
12211221
/* Move LSNs forward for this replication origin */
@@ -1567,7 +1567,7 @@ RecordTransactionAbort(bool isSubXact)
15671567
XactLogAbortRecord(xact_time,
15681568
nchildren, children,
15691569
nrels, rels,
1570-
InvalidTransactionId);
1570+
InvalidTransactionId, NULL);
15711571

15721572
/*
15731573
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -5084,7 +5084,7 @@ XactLogCommitRecord(TimestampTz commit_time,
50845084
int nrels, RelFileNode *rels,
50855085
int nmsgs, SharedInvalidationMessage *msgs,
50865086
bool relcacheInval, bool forceSync,
5087-
TransactionId twophase_xid)
5087+
TransactionId twophase_xid, const char *twophase_gid)
50885088
{
50895089
xl_xact_commit xlrec;
50905090
xl_xact_xinfo xl_xinfo;
@@ -5149,6 +5149,8 @@ XactLogCommitRecord(TimestampTz commit_time,
51495149
{
51505150
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
51515151
xl_twophase.xid = twophase_xid;
5152+
xl_twophase.gidlen = strlen(twophase_gid);
5153+
memcpy(xl_twophase.gid, twophase_gid, xl_twophase.gidlen);
51525154
}
51535155

51545156
/* dump transaction origin information */
@@ -5199,7 +5201,11 @@ XactLogCommitRecord(TimestampTz commit_time,
51995201
}
52005202

52015203
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
5202-
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
5204+
{
5205+
// Write gid only in logical mode
5206+
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
5207+
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
5208+
}
52035209

52045210
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
52055211
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
@@ -5220,7 +5226,7 @@ XLogRecPtr
52205226
XactLogAbortRecord(TimestampTz abort_time,
52215227
int nsubxacts, TransactionId *subxacts,
52225228
int nrels, RelFileNode *rels,
5223-
TransactionId twophase_xid)
5229+
TransactionId twophase_xid, const char *twophase_gid)
52245230
{
52255231
xl_xact_abort xlrec;
52265232
xl_xact_xinfo xl_xinfo;

src/backend/replication/logical/decode.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
459459

460460
if (TransactionIdIsValid(parsed->twophase_xid)) {
461461
ReorderBufferCommitPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
462-
commit_time, origin_id, origin_lsn);
462+
commit_time, origin_id, origin_lsn,
463+
parsed->twophase_gid);
463464
return;
464465
}
465466

src/backend/replication/logical/reorderbuffer.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1616,7 +1616,8 @@ void
16161616
ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
16171617
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
16181618
TimestampTz commit_time,
1619-
RepOriginId origin_id, XLogRecPtr origin_lsn)
1619+
RepOriginId origin_id, XLogRecPtr origin_lsn,
1620+
char *gid)
16201621
{
16211622
ReorderBufferTXN *txn;
16221623

@@ -1629,6 +1630,7 @@ ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
16291630
txn->origin_id = origin_id;
16301631
txn->origin_lsn = origin_lsn;
16311632
txn->xact_action = XLOG_XACT_COMMIT_PREPARED;
1633+
memcpy(txn->gid, gid, GIDSIZE);
16321634

16331635
rb->commit(rb, txn, commit_lsn);
16341636
}

src/include/access/xact.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,14 @@ typedef struct xl_xact_invals
219219
typedef struct xl_xact_twophase
220220
{
221221
TransactionId xid;
222+
/*
223+
* Gid and gidlen will be set only with wal_level=logical.
224+
* See details in XactLogCommitRecord().
225+
*/
226+
uint8 gidlen;
227+
char gid[200];
222228
} xl_xact_twophase;
223-
#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
229+
#define MinSizeOfXactTwophase offsetof(xl_xact_twophase, gid)
224230

225231
typedef struct xl_xact_origin
226232
{
@@ -279,6 +285,7 @@ typedef struct xl_xact_parsed_commit
279285
SharedInvalidationMessage *msgs;
280286

281287
TransactionId twophase_xid; /* only for 2PC */
288+
char twophase_gid[200]; // GIDSIZE
282289

283290
XLogRecPtr origin_lsn;
284291
TimestampTz origin_timestamp;
@@ -360,12 +367,12 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
360367
int nrels, RelFileNode *rels,
361368
int nmsgs, SharedInvalidationMessage *msgs,
362369
bool relcacheInval, bool forceSync,
363-
TransactionId twophase_xid);
370+
TransactionId twophase_xid, const char *twophase_gid);
364371

365372
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
366373
int nsubxacts, TransactionId *subxacts,
367374
int nrels, RelFileNode *rels,
368-
TransactionId twophase_xid);
375+
TransactionId twophase_xid, const char *twophase_gid);
369376
extern void xact_redo(XLogReaderState *record);
370377

371378
/* xactdesc.c */

src/include/replication/reorderbuffer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,8 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId,
366366
void ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
367367
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
368368
TimestampTz commit_time,
369-
RepOriginId origin_id, XLogRecPtr origin_lsn);
369+
RepOriginId origin_id, XLogRecPtr origin_lsn,
370+
char *gid);
370371
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
371372
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
372373
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);

0 commit comments

Comments
 (0)