Skip to content

Commit 7c147f8

Browse files
committed
merge with current pglogical
2 parents 62d11b9 + dc9f33e commit 7c147f8

File tree

15 files changed

+521
-103
lines changed

15 files changed

+521
-103
lines changed

contrib/pglogical/pglogical_apply.c

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,32 +141,81 @@ handle_begin(StringInfo s)
141141
* Handle COMMIT message.
142142
*/
143143
static void
144-
handle_commit(StringInfo s)
144+
handle_commit(char action, StringInfo s)
145145
{
146146
XLogRecPtr commit_lsn;
147147
XLogRecPtr end_lsn;
148148
TimestampTz commit_time;
149149

150-
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
150+
const char *gid;
151+
PGLFlushPosition *flushpos;
151152

152-
Assert(commit_lsn == replorigin_session_origin_lsn);
153-
Assert(commit_time == replorigin_session_origin_timestamp);
154153

155-
if (IsTransactionState())
154+
if (action == 'C')
156155
{
157-
PGLFlushPosition *flushpos;
156+
// Can we really be there without tx?
157+
Assert(IsTransactionState());
158158

159+
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time);
159160
CommitTransactionCommand();
160-
MemoryContextSwitchTo(TopMemoryContext);
161+
}
162+
else if (action == 'P')
163+
{
164+
// Can we really be there without tx?
165+
Assert(IsTransactionState());
161166

162-
/* Track commit lsn */
163-
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
164-
flushpos->local_end = XactLastCommitEnd;
165-
flushpos->remote_end = end_lsn;
167+
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
166168

167-
dlist_push_tail(&lsn_mapping, &flushpos->node);
168-
MemoryContextSwitchTo(MessageContext);
169+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
170+
BeginTransactionBlock();
171+
CommitTransactionCommand();
172+
StartTransactionCommand();
173+
174+
/* PREPARE itself */
175+
PrepareTransactionBlock(gid);
176+
CommitTransactionCommand();
177+
}
178+
else if (action == 'F')
179+
{
180+
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
181+
182+
StartTransactionCommand();
183+
FinishPreparedTransaction(gid, true);
184+
CommitTransactionCommand();
185+
186+
/* There were no BEGIN stmt for COMMIT PREPARED */
187+
replorigin_session_origin_timestamp = commit_time;
188+
replorigin_session_origin_lsn = commit_lsn;
189+
}
190+
else if (action == 'X')
191+
{
192+
pglogical_read_twophase(s, &commit_lsn, &end_lsn, &commit_time, &gid);
193+
194+
StartTransactionCommand();
195+
FinishPreparedTransaction(gid, false);
196+
CommitTransactionCommand();
197+
198+
/* There were no BEGIN stmt for ROLLBACK PREPARED */
199+
replorigin_session_origin_timestamp = commit_time;
200+
replorigin_session_origin_lsn = commit_lsn;
169201
}
202+
else
203+
{
204+
Assert(false);
205+
}
206+
207+
MemoryContextSwitchTo(TopMemoryContext);
208+
209+
/* Track commit lsn */
210+
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
211+
flushpos->local_end = XactLastCommitEnd;
212+
flushpos->remote_end = end_lsn;
213+
214+
dlist_push_tail(&lsn_mapping, &flushpos->node);
215+
MemoryContextSwitchTo(MessageContext);
216+
217+
Assert(commit_lsn == replorigin_session_origin_lsn);
218+
Assert(commit_time == replorigin_session_origin_timestamp);
170219

171220
/*
172221
* If the row isn't from the immediate upstream; advance the slot of the
@@ -296,6 +345,7 @@ UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot)
296345
&slot->tts_tuple->t_self,
297346
estate, false, NULL, NIL);
298347

348+
299349
/* FIXME: recheck the indexes */
300350
if (recheckIndexes != NIL)
301351
ereport(ERROR,
@@ -403,6 +453,7 @@ handle_insert(StringInfo s)
403453
estate = create_estate_for_relation(rel->rel);
404454
econtext = GetPerTupleExprContext(estate);
405455

456+
406457
PushActiveSnapshot(GetTransactionSnapshot());
407458

408459
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -1019,7 +1070,13 @@ replication_handler(StringInfo s)
10191070
break;
10201071
/* COMMIT */
10211072
case 'C':
1022-
handle_commit(s);
1073+
/* PREPARE */
1074+
case 'P':
1075+
/* COMMIT PREPARED */
1076+
case 'F':
1077+
/* ROLLBACK PREPARED */
1078+
case 'X':
1079+
handle_commit(action, s);
10231080
break;
10241081
/* ORIGIN */
10251082
case 'O':

contrib/pglogical/pglogical_proto.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,25 @@ pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
9595
*committime = pq_getmsgint64(in);
9696
}
9797

98+
/*
99+
* Read transaction PREPARE or COMMIT PREPARED from the stream.
100+
*/
101+
void
102+
pglogical_read_twophase(StringInfo in, XLogRecPtr *commit_lsn,
103+
XLogRecPtr *end_lsn, TimestampTz *committime,
104+
const char **gid)
105+
{
106+
/* read flags */
107+
uint8 flags = pq_getmsgbyte(in);
108+
Assert(flags == 0);
109+
110+
/* read fields */
111+
*commit_lsn = pq_getmsgint64(in);
112+
*end_lsn = pq_getmsgint64(in);
113+
*committime = pq_getmsgint64(in);
114+
*gid = pq_getmsgstring(in);
115+
}
116+
98117
/*
99118
* Read ORIGIN from the output stream.
100119
*/

contrib/pglogical/pglogical_proto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ extern void pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
2828
TimestampTz *committime, TransactionId *remote_xid);
2929
extern void pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
3030
XLogRecPtr *end_lsn, TimestampTz *committime);
31+
extern void pglogical_read_twophase(StringInfo in, XLogRecPtr *commit_lsn,
32+
XLogRecPtr *end_lsn, TimestampTz *committime,
33+
const char **gid);
3134
extern char *pglogical_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
3235

3336
extern uint32 pglogical_read_rel(StringInfo in);

contrib/pglogical_output/pglogical_proto_json.c

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,31 @@ pglogical_json_write_commit(StringInfo out, PGLogicalOutputData *data, ReorderBu
9090
XLogRecPtr commit_lsn)
9191
{
9292
appendStringInfoChar(out, '{');
93-
appendStringInfoString(out, "\"action\":\"C\"");
93+
94+
if (txn->xact_action == XLOG_XACT_PREPARE)
95+
{
96+
appendStringInfoString(out, "\"action\":\"P\"");
97+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
98+
}
99+
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
100+
{
101+
appendStringInfoString(out, "\"action\":\"CP\"");
102+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
103+
}
104+
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
105+
{
106+
appendStringInfoString(out, "\"action\":\"AP\"");
107+
appendStringInfo(out, ", \"gid\":\"%s\"", txn->gid);
108+
}
109+
else if (txn->xact_action == XLOG_XACT_COMMIT)
110+
{
111+
appendStringInfoString(out, "\"action\":\"C\"");
112+
}
113+
else
114+
{
115+
Assert(false);
116+
}
117+
94118
if (!data->client_no_txinfo)
95119
{
96120
appendStringInfo(out, ", \"final_lsn\":\"%X/%X\"",

contrib/pglogical_output/pglogical_proto_native.c

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include "access/sysattr.h"
2222
#include "access/tuptoaster.h"
2323
#include "access/xact.h"
24-
24+
#include "access/twophase.h"
2525
#include "catalog/catversion.h"
2626
#include "catalog/index.h"
2727

@@ -196,7 +196,17 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
196196
{
197197
uint8 flags = 0;
198198

199-
pq_sendbyte(out, 'C'); /* sending COMMIT */
199+
200+
if (txn->xact_action == XLOG_XACT_PREPARE)
201+
pq_sendbyte(out, 'P'); /* sending PREPARE */
202+
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
203+
pq_sendbyte(out, 'F'); /* sending COMMIT_PREPARED (Finish 2PC) */
204+
else if (txn->xact_action == XLOG_XACT_COMMIT)
205+
pq_sendbyte(out, 'C'); /* sending COMMIT */
206+
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
207+
pq_sendbyte(out, 'X'); /* sending ABORT PREPARED */
208+
else
209+
Assert(false);
200210

201211
/* send the flags field */
202212
pq_sendbyte(out, flags);
@@ -205,6 +215,11 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
205215
pq_sendint64(out, commit_lsn);
206216
pq_sendint64(out, txn->end_lsn);
207217
pq_sendint64(out, txn->commit_time);
218+
219+
if (txn->xact_action == XLOG_XACT_PREPARE ||
220+
txn->xact_action == XLOG_XACT_COMMIT_PREPARED ||
221+
txn->xact_action == XLOG_XACT_ABORT_PREPARED)
222+
pq_sendstring(out, txn->gid);
208223
}
209224

210225
/*

reinit.sh

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#!/bin/sh
2+
3+
reinit_master() {
4+
rm -rf install/data
5+
6+
./install/bin/initdb -A trust -D ./install/data
7+
8+
echo "max_prepared_transactions = 100" >> ./install/data/postgresql.conf
9+
echo "shared_buffers = 512MB" >> ./install/data/postgresql.conf
10+
echo "fsync = off" >> ./install/data/postgresql.conf
11+
echo "log_checkpoints = on" >> ./install/data/postgresql.conf
12+
echo "max_wal_size = 48MB" >> ./install/data/postgresql.conf
13+
echo "min_wal_size = 32MB" >> ./install/data/postgresql.conf
14+
echo "wal_level = logical" >> ./install/data/postgresql.conf
15+
echo "wal_keep_segments = 64" >> ./install/data/postgresql.conf
16+
echo "max_wal_senders = 10" >> ./install/data/postgresql.conf
17+
echo "max_replication_slots = 10" >> ./install/data/postgresql.conf
18+
19+
echo "max_worker_processes = 10" >> ./install/data/postgresql.conf
20+
echo "shared_preload_libraries = 'pglogical'" >> ./install/data/postgresql.conf
21+
echo "track_commit_timestamp = on" >> ./install/data/postgresql.conf
22+
# echo "client_min_messages = debug3" >> ./install/data/postgresql.conf
23+
# echo "log_min_messages = debug3" >> ./install/data/postgresql.conf
24+
25+
echo '' > ./install/data/logfile
26+
27+
echo 'local replication stas trust' >> ./install/data/pg_hba.conf
28+
29+
./install/bin/pg_ctl -sw -D ./install/data -l ./install/data/logfile start
30+
./install/bin/createdb stas
31+
./install/bin/psql -c "create table t(id int primary key, v int);"
32+
}
33+
34+
reinit_master2() {
35+
rm -rf install/data2
36+
37+
./install/bin/initdb -A trust -D ./install/data2
38+
39+
echo "port = 5433" >> ./install/data2/postgresql.conf
40+
41+
echo "max_prepared_transactions = 100" >> ./install/data2/postgresql.conf
42+
echo "shared_buffers = 512MB" >> ./install/data2/postgresql.conf
43+
echo "fsync = off" >> ./install/data2/postgresql.conf
44+
echo "log_checkpoints = on" >> ./install/data2/postgresql.conf
45+
echo "max_wal_size = 48MB" >> ./install/data2/postgresql.conf
46+
echo "min_wal_size = 32MB" >> ./install/data2/postgresql.conf
47+
echo "wal_level = logical" >> ./install/data2/postgresql.conf
48+
echo "wal_keep_segments = 64" >> ./install/data2/postgresql.conf
49+
echo "max_wal_senders = 10" >> ./install/data2/postgresql.conf
50+
echo "max_replication_slots = 10" >> ./install/data2/postgresql.conf
51+
52+
echo "max_worker_processes = 10" >> ./install/data2/postgresql.conf
53+
echo "shared_preload_libraries = 'pglogical'" >> ./install/data2/postgresql.conf
54+
echo "track_commit_timestamp = on" >> ./install/data2/postgresql.conf
55+
56+
# echo "client_min_messages = debug3" >> ./install/data2/postgresql.conf
57+
# echo "log_min_messages = debug3" >> ./install/data2/postgresql.conf
58+
59+
echo '' > ./install/data2/logfile
60+
61+
echo 'local replication stas trust' >> ./install/data2/pg_hba.conf
62+
63+
./install/bin/pg_ctl -sw -D ./install/data2 -l ./install/data2/logfile start
64+
./install/bin/createdb stas -p5433
65+
}
66+
67+
make install > /dev/null
68+
69+
cd contrib/pglogical
70+
make clean && make install
71+
cd ../..
72+
cd contrib/pglogical_output
73+
make clean && make install
74+
cd ../..
75+
76+
pkill -9 postgres
77+
reinit_master
78+
reinit_master2
79+
80+
# ./install/bin/psql <<SQL
81+
# CREATE EXTENSION pglogical;
82+
# SELECT pglogical.create_node(
83+
# node_name := 'provider1',
84+
# dsn := 'port=5432 dbname=stas'
85+
# );
86+
# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
87+
# SQL
88+
89+
# ./install/bin/psql -p 5433 <<SQL
90+
# CREATE EXTENSION pglogical;
91+
# SELECT pglogical.create_node(
92+
# node_name := 'subscriber1',
93+
# dsn := 'port=5433 dbname=stas'
94+
# );
95+
# SELECT pglogical.create_subscription(
96+
# subscription_name := 'subscription1',
97+
# provider_dsn := 'port=5432 dbname=stas'
98+
# );
99+
# SQL
100+
101+
./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
102+
103+
./install/bin/psql <<SQL
104+
begin;
105+
insert into t values (42);
106+
prepare transaction 'hellyeah';
107+
rollback prepared 'hellyeah';
108+
SQL
109+
110+
./install/bin/psql <<SQL
111+
SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112+
NULL, NULL,
113+
'expected_encoding', 'UTF8',
114+
'min_proto_version', '1',
115+
'max_proto_version', '1',
116+
'startup_params_format', '1',
117+
'proto_format', 'json',
118+
'no_txinfo', 't');
119+
SQL
120+
121+
122+
123+
124+
125+
126+

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
9797
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
9898
{
9999
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
100+
uint8 gidlen = xl_twophase->gidlen;
100101

101102
parsed->twophase_xid = xl_twophase->xid;
103+
data += MinSizeOfXactTwophase;
102104

103-
data += sizeof(xl_xact_twophase);
105+
strcpy(parsed->twophase_gid, data);
106+
data += gidlen;
104107
}
105108

106109
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -163,10 +166,13 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
163166
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
164167
{
165168
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
169+
uint8 gidlen = xl_twophase->gidlen;
166170

167171
parsed->twophase_xid = xl_twophase->xid;
172+
data += MinSizeOfXactTwophase;
168173

169-
data += sizeof(xl_xact_twophase);
174+
strcpy(parsed->twophase_gid, data);
175+
data += gidlen;
170176
}
171177
}
172178

0 commit comments

Comments
 (0)