Skip to content

Commit 86f6183

Browse files
committed
do not break ddl replication; move P/CP/AP to commit flags
1 parent 7c147f8 commit 86f6183

File tree

6 files changed

+104
-113
lines changed

6 files changed

+104
-113
lines changed

contrib/pglogical/pglogical_apply.c

Lines changed: 56 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -141,79 +141,79 @@ handle_begin(StringInfo s)
141141
* Handle COMMIT message.
142142
*/
143143
static void
144-
handle_commit(char action, StringInfo s)
144+
handle_commit(StringInfo s)
145145
{
146146
XLogRecPtr commit_lsn;
147147
XLogRecPtr end_lsn;
148148
TimestampTz commit_time;
149-
150-
const char *gid;
149+
uint8 flags;
150+
const char *gid;
151151
PGLFlushPosition *flushpos;
152+
bool flush = true;
152153

154+
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time, &flags, &gid);
153155

154-
if (action == 'C')
155-
{
156-
// Can we really be there without tx?
157-
Assert(IsTransactionState());
158-
159-
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
160-
CommitTransactionCommand();
161-
}
162-
else if (action == 'P')
156+
switch(PGLOGICAL_XACT_EVENT(flags))
163157
{
164-
// Can we really be there without tx?
165-
Assert(IsTransactionState());
158+
case PGLOGICAL_COMMIT:
159+
{
160+
if (IsTransactionState())
161+
CommitTransactionCommand();
162+
else
163+
flush = false;
164+
break;
165+
}
166+
case PGLOGICAL_PREPARE:
167+
{
168+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
169+
BeginTransactionBlock();
170+
CommitTransactionCommand();
171+
StartTransactionCommand();
166172

167-
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
173+
/* PREPARE itself */
174+
PrepareTransactionBlock(gid);
175+
CommitTransactionCommand();
176+
break;
177+
}
178+
case PGLOGICAL_COMMIT_PREPARED:
179+
{
180+
StartTransactionCommand();
181+
FinishPreparedTransaction(gid, true);
182+
CommitTransactionCommand();
168183

169-
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
170-
BeginTransactionBlock();
171-
CommitTransactionCommand();
172-
StartTransactionCommand();
184+
/* There were no BEGIN stmt for COMMIT PREPARED */
185+
replorigin_session_origin_timestamp = commit_time;
186+
replorigin_session_origin_lsn = commit_lsn;
187+
break;
188+
}
189+
case PGLOGICAL_ABORT_PREPARED:
190+
{
191+
StartTransactionCommand();
192+
FinishPreparedTransaction(gid, false);
193+
CommitTransactionCommand();
173194

174-
/* PREPARE itself */
175-
PrepareTransactionBlock(gid);
176-
CommitTransactionCommand();
195+
/* There were no BEGIN stmt for ROLLBACK PREPARED */
196+
replorigin_session_origin_timestamp = commit_time;
197+
replorigin_session_origin_lsn = commit_lsn;
198+
break;
199+
}
200+
default:
201+
Assert(false);
177202
}
178-
else if (action == 'F')
179-
{
180-
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
181-
182-
StartTransactionCommand();
183-
FinishPreparedTransaction(gid, true);
184-
CommitTransactionCommand();
185203

186-
/* There were no BEGIN stmt for COMMIT PREPARED */
187-
replorigin_session_origin_timestamp = commit_time;
188-
replorigin_session_origin_lsn = commit_lsn;
189-
}
190-
else if (action == 'X')
204+
if (flush)
191205
{
192-
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
206+
MemoryContextSwitchTo(TopMemoryContext);
193207

194-
StartTransactionCommand();
195-
FinishPreparedTransaction(gid, false);
196-
CommitTransactionCommand();
208+
/* Track commit lsn */
209+
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
210+
flushpos->local_end = XactLastCommitEnd;
211+
flushpos->remote_end = end_lsn;
197212

198-
/* There were no BEGIN stmt for ROLLBACK PREPARED */
199-
replorigin_session_origin_timestamp = commit_time;
200-
replorigin_session_origin_lsn = commit_lsn;
201-
}
202-
else
203-
{
204-
Assert(false);
213+
dlist_push_tail(&lsn_mapping, &flushpos->node);
214+
MemoryContextSwitchTo(MessageContext);
205215
}
206216

207-
MemoryContextSwitchTo(TopMemoryContext);
208-
209-
/* Track commit lsn */
210-
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
211-
flushpos->local_end = XactLastCommitEnd;
212-
flushpos->remote_end = end_lsn;
213-
214-
dlist_push_tail(&lsn_mapping, &flushpos->node);
215-
MemoryContextSwitchTo(MessageContext);
216-
217217
Assert(commit_lsn == replorigin_session_origin_lsn);
218218
Assert(commit_time == replorigin_session_origin_timestamp);
219219

@@ -1070,13 +1070,7 @@ replication_handler(StringInfo s)
10701070
break;
10711071
/* COMMIT */
10721072
case 'C':
1073-
/* PREPARE */
1074-
case 'P':
1075-
/* COMMIT PREPARED */
1076-
case 'F':
1077-
/* ROLLBACK PREPARED */
1078-
case 'X':
1079-
handle_commit(action, s);
1073+
handle_commit(s);
10801074
break;
10811075
/* ORIGIN */
10821076
case 'O':

contrib/pglogical/pglogical_proto.c

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -83,35 +83,19 @@ pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
8383
*/
8484
void
8585
pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
86-
XLogRecPtr *end_lsn, TimestampTz *committime)
86+
XLogRecPtr *end_lsn, TimestampTz *committime,
87+
uint8 *flags, const char **gid)
8788
{
8889
/* read flags */
89-
uint8 flags = pq_getmsgbyte(in);
90-
Assert(flags == 0);
90+
*flags = pq_getmsgbyte(in);
9191

9292
/* read fields */
9393
*commit_lsn = pq_getmsgint64(in);
9494
*end_lsn = pq_getmsgint64(in);
9595
*committime = pq_getmsgint64(in);
96-
}
9796

98-
/*
99-
* Read transaction PREPARE or COMMIT PREPARED from the stream.
100-
*/
101-
void
102-
pglogical_read_twophase(StringInfo in, XLogRecPtr *commit_lsn,
103-
XLogRecPtr *end_lsn, TimestampTz *committime,
104-
const char **gid)
105-
{
106-
/* read flags */
107-
uint8 flags = pq_getmsgbyte(in);
108-
Assert(flags == 0);
109-
110-
/* read fields */
111-
*commit_lsn = pq_getmsgint64(in);
112-
*end_lsn = pq_getmsgint64(in);
113-
*committime = pq_getmsgint64(in);
114-
*gid = pq_getmsgstring(in);
97+
if (PGLOGICAL_XACT_EVENT(*flags) != PGLOGICAL_COMMIT)
98+
*gid = pq_getmsgstring(in);
11599
}
116100

117101
/*

contrib/pglogical/pglogical_proto.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@ typedef struct PGLogicalTupleData
2424
bool changed[MaxTupleAttributeNumber];
2525
} PGLogicalTupleData;
2626

27+
#define PGLOGICAL_COMMIT 0x00
28+
#define PGLOGICAL_PREPARE 0x01
29+
#define PGLOGICAL_COMMIT_PREPARED 0x02
30+
#define PGLOGICAL_ABORT_PREPARED 0x03
31+
32+
#define PGLOGICAL_XACT_EVENT(flags) (flags & 0x3)
33+
2734
extern void pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
2835
TimestampTz *committime, TransactionId *remote_xid);
2936
extern void pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
30-
XLogRecPtr *end_lsn, TimestampTz *committime);
31-
extern void pglogical_read_twophase(StringInfo in, XLogRecPtr *commit_lsn,
32-
XLogRecPtr *end_lsn, TimestampTz *committime,
33-
const char **gid);
37+
XLogRecPtr *end_lsn, TimestampTz *committime,
38+
uint8 *flags, const char **gid);
3439
extern char *pglogical_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
3540

3641
extern uint32 pglogical_read_rel(StringInfo in);

contrib/pglogical_output/pglogical_proto_native.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,15 +196,16 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
196196
{
197197
uint8 flags = 0;
198198

199+
pq_sendbyte(out, 'C'); /* sending COMMIT */
199200

200-
if (txn->xact_action == XLOG_XACT_PREPARE)
201-
pq_sendbyte(out, 'P'); /* sending PREPARE */
201+
if (txn->xact_action == XLOG_XACT_COMMIT)
202+
flags = PGLOGICAL_COMMIT;
203+
else if (txn->xact_action == XLOG_XACT_PREPARE)
204+
flags = PGLOGICAL_PREPARE;
202205
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
203-
pq_sendbyte(out, 'F'); /* sending COMMIT_PREPARED (Finish 2PC) */
204-
else if (txn->xact_action == XLOG_XACT_COMMIT)
205-
pq_sendbyte(out, 'C'); /* sending COMMIT */
206+
flags = PGLOGICAL_COMMIT_PREPARED;
206207
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
207-
pq_sendbyte(out, 'X'); /* sending ABORT PREPARED */
208+
flags = PGLOGICAL_ABORT_PREPARED;
208209
else
209210
Assert(false);
210211

contrib/pglogical_output/pglogical_proto_native.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@
2323
*/
2424
#define PGLOGICAL_STARTUP_MSG_FORMAT_FLAT 1
2525

26+
#define PGLOGICAL_COMMIT 0x00
27+
#define PGLOGICAL_PREPARE 0x01
28+
#define PGLOGICAL_COMMIT_PREPARED 0x02
29+
#define PGLOGICAL_ABORT_PREPARED 0x03
30+
31+
#define PGLOGICAL_XACT_EVENT(flags) (flags & 0x3)
32+
2633
extern void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel,
2734
struct PGLRelMetaCacheEntry *cache_entry);
2835

reinit.sh

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ cd ../..
7575

7676
pkill -9 postgres
7777
reinit_master
78-
reinit_master2
78+
# reinit_master2
7979

8080
# ./install/bin/psql <<SQL
8181
# CREATE EXTENSION pglogical;
@@ -98,25 +98,25 @@ reinit_master2
9898
# );
9999
# SQL
100100

101-
./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102-
103-
./install/bin/psql <<SQL
104-
begin;
105-
insert into t values (42);
106-
prepare transaction 'hellyeah';
107-
rollback prepared 'hellyeah';
108-
SQL
109-
110-
./install/bin/psql <<SQL
111-
SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112-
NULL, NULL,
113-
'expected_encoding', 'UTF8',
114-
'min_proto_version', '1',
115-
'max_proto_version', '1',
116-
'startup_params_format', '1',
117-
'proto_format', 'json',
118-
'no_txinfo', 't');
119-
SQL
101+
# ./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102+
103+
# ./install/bin/psql <<SQL
104+
# begin;
105+
# insert into t values (42);
106+
# prepare transaction 'hellyeah';
107+
# rollback prepared 'hellyeah';
108+
# SQL
109+
110+
# ./install/bin/psql <<SQL
111+
# SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112+
# NULL, NULL,
113+
# 'expected_encoding', 'UTF8',
114+
# 'min_proto_version', '1',
115+
# 'max_proto_version', '1',
116+
# 'startup_params_format', '1',
117+
# 'proto_format', 'json',
118+
# 'no_txinfo', 't');
119+
# SQL
120120

121121

122122

0 commit comments

Comments
 (0)