Skip to content

Commit a157e2c

Browse files
committed
apply twophase tx with remote GID
1 parent 45d0f5d commit a157e2c

File tree

4 files changed

+62
-33
lines changed

4 files changed

+62
-33
lines changed

contrib/pglogical/pglogical_apply.c

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -141,22 +141,50 @@ handle_begin(StringInfo s)
141141
* Handle COMMIT message.
142142
*/
143143
static void
144-
handle_commit(StringInfo s)
144+
handle_commit(char action, StringInfo s)
145145
{
146146
XLogRecPtr commit_lsn;
147147
XLogRecPtr end_lsn;
148148
TimestampTz commit_time;
149+
const char *gid;
150+
PGLFlushPosition *flushpos;
149151

150-
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
152+
153+
if (action == 'C')
154+
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
155+
else if (action == 'P')
156+
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
157+
else //if (action == 'F')
158+
{
159+
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
160+
replorigin_session_origin_timestamp = commit_time;
161+
replorigin_session_origin_lsn = commit_lsn;
162+
}
151163

152164
Assert(commit_lsn == replorigin_session_origin_lsn);
153165
Assert(commit_time == replorigin_session_origin_timestamp);
154166

155-
if (IsTransactionState())
156-
{
157-
PGLFlushPosition *flushpos;
167+
// if (IsTransactionState())
168+
// {
169+
if (action == 'C')
170+
{
171+
CommitTransactionCommand();
172+
}
173+
else if (action == 'P')
174+
{
175+
BeginTransactionBlock();
176+
CommitTransactionCommand();
177+
StartTransactionCommand();
178+
PrepareTransactionBlock(gid);
179+
CommitTransactionCommand();
180+
}
181+
else if (action == 'F')
182+
{
183+
StartTransactionCommand();
184+
FinishPreparedTransaction(gid, true);
185+
CommitTransactionCommand();
186+
}
158187

159-
CommitTransactionCommand();
160188
MemoryContextSwitchTo(TopMemoryContext);
161189

162190
/* Track commit lsn */
@@ -166,7 +194,7 @@ handle_commit(StringInfo s)
166194

167195
dlist_push_tail(&lsn_mapping, &flushpos->node);
168196
MemoryContextSwitchTo(MessageContext);
169-
}
197+
// }
170198

171199
/*
172200
* If the row isn't from the immediate upstream; advance the slot of the
@@ -216,24 +244,6 @@ handle_commit(StringInfo s)
216244
pgstat_report_activity(STATE_IDLE, NULL);
217245
}
218246

219-
static void
220-
handle_prepare(StringInfo s)
221-
{
222-
BeginTransactionBlock();
223-
CommitTransactionCommand();
224-
StartTransactionCommand();
225-
PrepareTransactionBlock("dumb_gid");
226-
CommitTransactionCommand();
227-
}
228-
229-
static void
230-
handle_commit_prepared(StringInfo s)
231-
{
232-
StartTransactionCommand();
233-
FinishPreparedTransaction("dumb_gid", true);
234-
CommitTransactionCommand();
235-
}
236-
237247
/*
238248
* Handle ORIGIN message.
239249
*/
@@ -1042,20 +1052,16 @@ replication_handler(StringInfo s)
10421052
break;
10431053
/* COMMIT */
10441054
case 'C':
1045-
handle_commit(s);
1046-
break;
1055+
/* PREPARE */
1056+
case 'P':
10471057
/* COMMIT PREPARED (FINISH) */
10481058
case 'F':
1049-
handle_commit_prepared(s);
1059+
handle_commit(action, s);
10501060
break;
10511061
/* ORIGIN */
10521062
case 'O':
10531063
handle_origin(s);
10541064
break;
1055-
/* PREPARE */
1056-
case 'P':
1057-
handle_prepare(s);
1058-
break;
10591065
/* RELATION */
10601066
case 'R':
10611067
handle_relation(s);

contrib/pglogical/pglogical_proto.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
9595
*committime = pq_getmsgint64(in);
9696
}
9797

98+
/*
99+
* Read transaction PREPARE or COMMIT PREPARED from the stream.
100+
*/
101+
void
102+
pglogical_read_twophase(StringInfo in, XLogRecPtr *commit_lsn,
103+
XLogRecPtr *end_lsn, TimestampTz *committime,
104+
const char **gid)
105+
{
106+
/* read flags */
107+
uint8 flags = pq_getmsgbyte(in);
108+
Assert(flags == 0);
109+
110+
/* read fields */
111+
*commit_lsn = pq_getmsgint64(in);
112+
*end_lsn = pq_getmsgint64(in);
113+
*committime = pq_getmsgint64(in);
114+
*gid = pq_getmsgstring(in);
115+
}
116+
98117
/*
99118
* Read ORIGIN from the output stream.
100119
*/

contrib/pglogical/pglogical_proto.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ extern void pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
2828
TimestampTz *committime, TransactionId *remote_xid);
2929
extern void pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
3030
XLogRecPtr *end_lsn, TimestampTz *committime);
31+
extern void pglogical_read_twophase(StringInfo in, XLogRecPtr *commit_lsn,
32+
XLogRecPtr *end_lsn, TimestampTz *committime, const char **gid);
3133
extern char *pglogical_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
3234

3335
extern uint32 pglogical_read_rel(StringInfo in);

contrib/pglogical_output/pglogical_proto_native.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
212212
pq_sendint64(out, txn->end_lsn);
213213
pq_sendint64(out, txn->commit_time);
214214

215-
if (txn->xact_action == XLOG_XACT_PREPARE)
215+
// send that as a cstring, instead of fixlen
216+
if (txn->xact_action == XLOG_XACT_PREPARE ||
217+
txn->xact_action == XLOG_XACT_COMMIT_PREPARED )
216218
pq_sendbytes(out, txn->gid, GIDSIZE);
217219
}
218220

0 commit comments

Comments
 (0)