Skip to content

Commit 6d69f15

Browse files
committed
decode gid
1 parent cff6d25 commit 6d69f15

File tree

5 files changed

+27
-2
lines changed

5 files changed

+27
-2
lines changed

contrib/pglogical_output/pglogical_proto_json.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,18 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
9191
{
9292
appendStringInfoChar(out, '{');
9393
if (txn->xact_action == XLOG_XACT_PREPARE)
94+
{
9495
appendStringInfoString(out, "\"action\":\"P\"");
96+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
97+
}
9598
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
99+
{
96100
appendStringInfoString(out, "\"action\":\"CP\"");
101+
}
97102
else
103+
{
98104
appendStringInfoString(out, "\"action\":\"C\"");
105+
}
99106

100107
if (!data->client_no_txinfo)
101108
{

reinit.sh

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,15 @@ make install > /dev/null
5151
pkill -9 postgres
5252
reinit_master >> /dev/null
5353

54+
55+
56+
# SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
57+
# NULL, NULL,
58+
# 'expected_encoding', 'UTF8',
59+
# 'min_proto_version', '1',
60+
# 'max_proto_version', '1',
61+
# 'startup_params_format', '1',
62+
# 'proto_format', 'json',
63+
# 'no_txinfo', 't');
64+
65+

src/backend/replication/logical/decode.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,8 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
570570
invalmsgs = (SharedInvalidationMessage *) twophase_bufptr;
571571
twophase_bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
572572

573+
memcpy(ctx->reorder->gid, hdr->gid, GIDSIZE);
574+
573575
/*
574576
* Process invalidation messages, even if we're not interested in the
575577
* transaction's contents, since the various caches need to always be

src/backend/replication/logical/reorderbuffer.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
12791279
txn->origin_id = origin_id;
12801280
txn->origin_lsn = origin_lsn;
12811281
txn->xact_action = rb->xact_action;
1282+
memcpy(txn->gid, rb->gid, GIDSIZE);
12821283

12831284
/* serialize the last bunch of changes if we need start earlier anyway */
12841285
if (txn->nentries_mem != txn->nentries)

src/include/replication/reorderbuffer.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#define REORDERBUFFER_H
1111

1212
#include "access/htup_details.h"
13+
#include "access/twophase.h"
1314
#include "lib/ilist.h"
1415
#include "storage/sinval.h"
1516
#include "utils/hsearch.h"
@@ -137,7 +138,8 @@ typedef struct ReorderBufferTXN
137138
* as well as abort for ROLLBACK and ROLLBACK PREPARED. Here
138139
* stored actual xact action allowing decoding plugin to distinguish them.
139140
*/
140-
uint8 xact_action;
141+
uint8 xact_action;
142+
char gid[GIDSIZE];
141143

142144
/* did the TX have catalog changes */
143145
bool has_catalog_changes;
@@ -286,7 +288,8 @@ struct ReorderBuffer
286288
HTAB *by_txn;
287289

288290
/* For twophase tx support we need to pass XACT action to ReorderBufferTXN */
289-
uint8 xact_action;
291+
uint8 xact_action;
292+
char gid[GIDSIZE];
290293

291294
/*
292295
* Transactions that could be a toplevel xact, ordered by LSN of the first

0 commit comments

Comments
 (0)