Skip to content

Commit 04e3530

Browse files
committed
handle prepare/commit_prepared in logical recvr
1 parent 69ca027 commit 04e3530

File tree

3 files changed

+37
-2
lines changed

3 files changed

+37
-2
lines changed

contrib/pglogical/pglogical_apply.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,24 @@ handle_commit(StringInfo s)
216216
pgstat_report_activity(STATE_IDLE, NULL);
217217
}
218218

219+
static void
220+
handle_prepare(StringInfo s)
221+
{
222+
BeginTransactionBlock();
223+
CommitTransactionCommand();
224+
StartTransactionCommand();
225+
PrepareTransactionBlock("dumb_gid");
226+
CommitTransactionCommand();
227+
}
228+
229+
static void
230+
handle_commit_prepared(StringInfo s)
231+
{
232+
StartTransactionCommand();
233+
FinishPreparedTransaction("dumb_gid", true);
234+
CommitTransactionCommand();
235+
}
236+
219237
/*
220238
* Handle ORIGIN message.
221239
*/
@@ -1026,10 +1044,18 @@ replication_handler(StringInfo s)
10261044
case 'C':
10271045
handle_commit(s);
10281046
break;
1047+
/* COMMIT PREPARED (FINISH) */
1048+
case 'F':
1049+
handle_commit_prepared(s);
1050+
break;
10291051
/* ORIGIN */
10301052
case 'O':
10311053
handle_origin(s);
10321054
break;
1055+
/* PREPARE */
1056+
case 'P':
1057+
handle_prepare(s);
1058+
break;
10331059
/* RELATION */
10341060
case 'R':
10351061
handle_relation(s);

contrib/pglogical_output/pglogical_proto_json.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
9393
if (txn->xact_action == XLOG_XACT_PREPARE)
9494
{
9595
appendStringInfoString(out, "\"action\":\"P\"");
96-
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
96+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid); /* NB: Add \0 at the end, if we are using all 200 bytes in GID */
9797
}
9898
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
9999
{

contrib/pglogical_output/pglogical_proto_native.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "access/sysattr.h"
2222
#include "access/tuptoaster.h"
2323
#include "access/xact.h"
24+
#include "access/twophase.h"
2425

2526
#include "catalog/catversion.h"
2627
#include "catalog/index.h"
@@ -196,7 +197,12 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
196197
{
197198
uint8 flags = 0;
198199

199-
pq_sendbyte(out, 'C'); /* sending COMMIT */
200+
if (txn->xact_action == XLOG_XACT_PREPARE)
201+
pq_sendbyte(out, 'P'); /* sending PREPARE */
202+
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
203+
pq_sendbyte(out, 'F'); /* sending COMMIT_PREPARED (Finish 2PC) */
204+
else
205+
pq_sendbyte(out, 'C'); /* sending COMMIT */
200206

201207
/* send the flags field */
202208
pq_sendbyte(out, flags);
@@ -205,6 +211,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
205211
pq_sendint64(out, commit_lsn);
206212
pq_sendint64(out, txn->end_lsn);
207213
pq_sendint64(out, txn->commit_time);
214+
215+
if (txn->xact_action == XLOG_XACT_PREPARE)
216+
pq_sendbytes(out, txn->gid, GIDSIZE);
208217
}
209218

210219
/*

0 commit comments

Comments
 (0)