Skip to content

Commit cff6d25

Browse files
committed
track xact_action in ReorderBufferTXN
1 parent 48bf023 commit cff6d25

File tree

4 files changed

+21
-27
lines changed

4 files changed

+21
-27
lines changed

contrib/pglogical_output/pglogical_proto_json.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,13 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
9090
XLogRecPtr commit_lsn)
9191
{
9292
appendStringInfoChar(out, '{');
93-
appendStringInfoString(out, "\"action\":\"C\"");
93+
if (txn->xact_action == XLOG_XACT_PREPARE)
94+
appendStringInfoString(out, "\"action\":\"P\"");
95+
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
96+
appendStringInfoString(out, "\"action\":\"CP\"");
97+
else
98+
appendStringInfoString(out, "\"action\":\"C\"");
99+
94100
if (!data->client_no_txinfo)
95101
{
96102
appendStringInfo(out, ", \"final_lsn\":\"%X/%X\"",

src/backend/replication/logical/decode.c

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
197197
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
198198
return;
199199

200+
reorder->xact_action = info;
201+
200202
switch (info)
201203
{
202204
case XLOG_XACT_COMMIT:
@@ -583,32 +585,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
583585
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
584586
hdr->nsubxacts, children);
585587

586-
/* ----
587-
* Check whether we are interested in this specific transaction, and tell
588-
* the reorderbuffer to forget the content of the (sub-)transactions
589-
* if not.
590-
*
591-
* There can be several reasons we might not be interested in this
592-
* transaction:
593-
* 1) We might not be interested in decoding transactions up to this
594-
* LSN. This can happen because we previously decoded it and now just
595-
* are restarting or if we haven't assembled a consistent snapshot yet.
596-
* 2) The transaction happened in another database.
597-
* 3) The output plugin is not interested in the origin.
598-
*
599-
* We can't just use ReorderBufferAbort() here, because we need to execute
600-
* the transaction's invalidations. This currently won't be needed if
601-
* we're just skipping over the transaction because currently we only do
602-
* so during startup, to get to the first transaction the client needs. As
603-
* we have reset the catalog caches before starting to read WAL, and we
604-
* haven't yet touched any catalogs, there can't be anything to invalidate.
605-
* But if we're "forgetting" this commit because it's it happened in
606-
* another database, the invalidations might be important, because they
607-
* could be for shared catalogs and we might have loaded data into the
608-
* relevant syscaches.
609-
* ---
610-
*/
611-
612588
// Add db check here
613589
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
614590
FilterByOrigin(ctx, origin_id))

src/backend/replication/logical/reorderbuffer.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
12781278
txn->commit_time = commit_time;
12791279
txn->origin_id = origin_id;
12801280
txn->origin_lsn = origin_lsn;
1281+
txn->xact_action = rb->xact_action;
12811282

12821283
/* serialize the last bunch of changes if we need start earlier anyway */
12831284
if (txn->nentries_mem != txn->nentries)
@@ -1626,6 +1627,7 @@ ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
16261627
txn->commit_time = commit_time;
16271628
txn->origin_id = origin_id;
16281629
txn->origin_lsn = origin_lsn;
1630+
txn->xact_action = XLOG_XACT_COMMIT_PREPARED;
16291631

16301632
rb->commit(rb, txn, commit_lsn);
16311633
}

src/include/replication/reorderbuffer.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ typedef struct ReorderBufferTXN
132132
*/
133133
TransactionId xid;
134134

135+
/*
136+
* Commit callback is used for COMMIT/PREPARE/COMMMIT PREPARED,
137+
* as well as abort for ROLLBACK and ROLLBACK PREPARED. Here
138+
* stored actual xact action allowing decoding plugin to distinguish them.
139+
*/
140+
uint8 xact_action;
141+
135142
/* did the TX have catalog changes */
136143
bool has_catalog_changes;
137144

@@ -278,6 +285,9 @@ struct ReorderBuffer
278285
*/
279286
HTAB *by_txn;
280287

288+
/* For twophase tx support we need to pass XACT action to ReorderBufferTXN */
289+
uint8 xact_action;
290+
281291
/*
282292
* Transactions that could be a toplevel xact, ordered by LSN of the first
283293
* record bearing that xid..

0 commit comments

Comments
 (0)