Skip to content

Commit 97662b2

Browse files
committed
decode aborts
1 parent a157e2c commit 97662b2

File tree

8 files changed

+97
-38
lines changed

8 files changed

+97
-38
lines changed

contrib/pglogical/pglogical_apply.c

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -151,50 +151,70 @@ handle_commit(char action, StringInfo s)
151151

152152

153153
if (action == 'C')
154+
{
155+
// Can we really be there without tx?
156+
Assert(IsTransactionState());
157+
154158
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
159+
CommitTransactionCommand();
160+
}
155161
else if (action == 'P')
162+
{
163+
// Can we really be there without tx?
164+
Assert(IsTransactionState());
165+
156166
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
157-
else //if (action == 'F')
167+
168+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
169+
BeginTransactionBlock();
170+
CommitTransactionCommand();
171+
StartTransactionCommand();
172+
173+
/* PREPARE itself */
174+
PrepareTransactionBlock(gid);
175+
CommitTransactionCommand();
176+
}
177+
else if (action == 'F')
158178
{
159179
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
180+
181+
StartTransactionCommand();
182+
FinishPreparedTransaction(gid, true);
183+
CommitTransactionCommand();
184+
185+
/* There were no BEGIN stmt for COMMIT PREPARED */
160186
replorigin_session_origin_timestamp = commit_time;
161187
replorigin_session_origin_lsn = commit_lsn;
162188
}
189+
else if (action == 'X')
190+
{
191+
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
163192

164-
Assert(commit_lsn == replorigin_session_origin_lsn);
165-
Assert(commit_time == replorigin_session_origin_timestamp);
193+
StartTransactionCommand();
194+
FinishPreparedTransaction(gid, false);
195+
CommitTransactionCommand();
166196

167-
// if (IsTransactionState())
168-
// {
169-
if (action == 'C')
170-
{
171-
CommitTransactionCommand();
172-
}
173-
else if (action == 'P')
174-
{
175-
BeginTransactionBlock();
176-
CommitTransactionCommand();
177-
StartTransactionCommand();
178-
PrepareTransactionBlock(gid);
179-
CommitTransactionCommand();
180-
}
181-
else if (action == 'F')
182-
{
183-
StartTransactionCommand();
184-
FinishPreparedTransaction(gid, true);
185-
CommitTransactionCommand();
186-
}
197+
/* There were no BEGIN stmt for ROLLBACK PREPARED */
198+
replorigin_session_origin_timestamp = commit_time;
199+
replorigin_session_origin_lsn = commit_lsn;
200+
}
201+
else
202+
{
203+
Assert(false);
204+
}
187205

188-
MemoryContextSwitchTo(TopMemoryContext);
206+
MemoryContextSwitchTo(TopMemoryContext);
189207

190-
/* Track commit lsn */
191-
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
192-
flushpos->local_end = XactLastCommitEnd;
193-
flushpos->remote_end = end_lsn;
208+
/* Track commit lsn */
209+
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
210+
flushpos->local_end = XactLastCommitEnd;
211+
flushpos->remote_end = end_lsn;
194212

195-
dlist_push_tail(&lsn_mapping, &flushpos->node);
196-
MemoryContextSwitchTo(MessageContext);
197-
// }
213+
dlist_push_tail(&lsn_mapping, &flushpos->node);
214+
MemoryContextSwitchTo(MessageContext);
215+
216+
Assert(commit_lsn == replorigin_session_origin_lsn);
217+
Assert(commit_time == replorigin_session_origin_timestamp);
198218

199219
/*
200220
* If the row isn't from the immediate upstream; advance the slot of the
@@ -1054,8 +1074,10 @@ replication_handler(StringInfo s)
10541074
case 'C':
10551075
/* PREPARE */
10561076
case 'P':
1057-
/* COMMIT PREPARED (FINISH) */
1077+
/* COMMIT PREPARED */
10581078
case 'F':
1079+
/* ROLLBACK PREPARED */
1080+
case 'X':
10591081
handle_commit(action, s);
10601082
break;
10611083
/* ORIGIN */

contrib/pglogical_output/pglogical_proto_json.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,19 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
100100
appendStringInfoString(out, "\"action\":\"CP\"");
101101
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid); /* NB: Add \0 at the end, if we are using all 200 bytes in GID */
102102
}
103-
else
103+
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
104+
{
105+
appendStringInfoString(out, "\"action\":\"AP\"");
106+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid); /* NB: Add \0 at the end, if we are using all 200 bytes in GID */
107+
}
108+
else if (txn->xact_action == XLOG_XACT_COMMIT)
104109
{
105110
appendStringInfoString(out, "\"action\":\"C\"");
106111
}
112+
else
113+
{
114+
Assert(false);
115+
}
107116

108117
if (!data->client_no_txinfo)
109118
{

contrib/pglogical_output/pglogical_proto_native.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
201201
pq_sendbyte(out, 'P'); /* sending PREPARE */
202202
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
203203
pq_sendbyte(out, 'F'); /* sending COMMIT_PREPARED (Finish 2PC) */
204-
else
204+
else if (txn->xact_action == XLOG_XACT_COMMIT)
205205
pq_sendbyte(out, 'C'); /* sending COMMIT */
206+
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
207+
pq_sendbyte(out, 'X'); /* sending ABORT PREPARED */
208+
else
209+
Assert(false);
206210

207211
/* send the flags field */
208212
pq_sendbyte(out, flags);
@@ -214,7 +218,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
214218

215219
// send that as a cstring, instead of fixlen
216220
if (txn->xact_action == XLOG_XACT_PREPARE ||
217-
txn->xact_action == XLOG_XACT_COMMIT_PREPARED )
221+
txn->xact_action == XLOG_XACT_COMMIT_PREPARED ||
222+
txn->xact_action == XLOG_XACT_ABORT_PREPARED)
218223
pq_sendbytes(out, txn->gid, GIDSIZE);
219224
}
220225

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
169169
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
170170
{
171171
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
172+
uint8 gidlen = xl_twophase->gidlen;
172173

173174
parsed->twophase_xid = xl_twophase->xid;
174175

175-
data += sizeof(xl_xact_twophase);
176+
memcpy(parsed->twophase_gid, xl_twophase->gid, gidlen);
177+
/* Dirty! */
178+
memset(parsed->twophase_gid + gidlen, '\0', 200 - gidlen); // GIDSIZE
179+
180+
data += MinSizeOfXactTwophase;
181+
data += gidlen;
176182
}
177183
}
178184

src/backend/access/transam/xact.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5267,6 +5267,8 @@ XactLogAbortRecord(TimestampTz abort_time,
52675267
{
52685268
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
52695269
xl_twophase.xid = twophase_xid;
5270+
xl_twophase.gidlen = strlen(twophase_gid);
5271+
memcpy(xl_twophase.gid, twophase_gid, xl_twophase.gidlen);
52705272
}
52715273

52725274
if (xl_xinfo.xinfo != 0)
@@ -5298,7 +5300,11 @@ XactLogAbortRecord(TimestampTz abort_time,
52985300
}
52995301

53005302
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
5301-
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
5303+
{
5304+
// Write gid only in logical mode
5305+
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
5306+
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
5307+
}
53025308

53035309
return XLogInsert(RM_XACT_ID, info);
53045310
}

src/backend/replication/logical/decode.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,16 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
622622
xl_xact_parsed_abort *parsed, TransactionId xid)
623623
{
624624
int i;
625+
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
626+
XLogRecPtr commit_time = InvalidXLogRecPtr;
627+
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
628+
629+
if (TransactionIdIsValid(parsed->twophase_xid)) {
630+
ReorderBufferCommitPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
631+
commit_time, origin_id, origin_lsn,
632+
parsed->twophase_gid);
633+
return;
634+
}
625635

626636
SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
627637
parsed->nsubxacts, parsed->subxacts);

src/backend/replication/logical/reorderbuffer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1629,7 +1629,7 @@ ReorderBufferCommitPrepared(ReorderBuffer *rb, TransactionId xid,
16291629
txn->commit_time = commit_time;
16301630
txn->origin_id = origin_id;
16311631
txn->origin_lsn = origin_lsn;
1632-
txn->xact_action = XLOG_XACT_COMMIT_PREPARED;
1632+
txn->xact_action = rb->xact_action;
16331633
memcpy(txn->gid, gid, GIDSIZE);
16341634

16351635
rb->commit(rb, txn, commit_lsn);

src/include/access/xact.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ typedef struct xl_xact_parsed_abort
303303
RelFileNode *xnodes;
304304

305305
TransactionId twophase_xid; /* only for 2PC */
306+
char twophase_gid[200]; // GIDSIZE
306307
} xl_xact_parsed_abort;
307308

308309

0 commit comments

Comments
 (0)