Skip to content

Commit b2236d8

Browse files
committed
pglogical_output v2
- add json protocol output support - fix encoding comparisons to use parsed encoding not string name - Report datum field encoding and database_encoding separately - import protocol documentation - README updates, move some info from protocol doc to README - send startup msg as List not buffer - significantly expand pg_regress tests - move pglogical_output_plhooks to a top-level contrib
1 parent 97dd089 commit b2236d8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1775
-691
lines changed

contrib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ SUBDIRS = \
3636
pg_trgm \
3737
pgcrypto \
3838
pglogical_output \
39+
pglogical_output_plhooks \
3940
pgrowlocks \
4041
pgstattuple \
4142
postgres_fdw \

contrib/pglogical_output/Makefile

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,13 @@
11
MODULE_big = pglogical_output
22
PGFILEDESC = "pglogical_output - logical replication output plugin"
33

4-
OBJS = pglogical_output.o pglogical_proto.o pglogical_config.o pglogical_hooks.o
4+
OBJS = pglogical_output.o pglogical_hooks.o pglogical_config.o \
5+
pglogical_proto.o pglogical_proto_native.o \
6+
pglogical_proto_json.o
57

6-
REGRESS = params basic hooks
8+
REGRESS = pre_clean params_native basic_native hooks_native basic_json hooks_json encoding_json
79

810

9-
ifdef USE_PGXS
10-
11-
# For regression checks
12-
# http://www.postgresql.org/message-id/CAB7nPqTsR5o3g-fBi6jbsVdhfPiLFWQ_0cGU5=94Rv_8W3qvFA@mail.gmail.com
13-
# this makes "make check" give a useful error
14-
abs_top_builddir = .
15-
NO_TEMP_INSTALL = yes
16-
# Usual recipe
17-
PG_CONFIG = pg_config
18-
PGXS := $(shell $(PG_CONFIG) --pgxs)
19-
include $(PGXS)
20-
21-
# These don't do anything yet, since temp install is disabled
22-
EXTRA_INSTALL += ./examples/hooks
23-
REGRESS_OPTS += --temp-config=regression.conf
24-
25-
plhooks:
26-
make -C examples/hooks USE_PGXS=1 clean install
27-
28-
installcheck: plhooks
29-
30-
else
31-
3211
subdir = contrib/pglogical_output
3312
top_builddir = ../..
3413
include $(top_builddir)/src/Makefile.global
@@ -40,12 +19,9 @@ include $(top_srcdir)/contrib/contrib-global.mk
4019
installcheck:
4120
;
4221

43-
EXTRA_INSTALL += $(subdir)/examples/hooks
22+
EXTRA_INSTALL += contrib/pglogical_output_plhooks
4423
EXTRA_REGRESS_OPTS += --temp-config=./regression.conf
4524

46-
endif
47-
4825
install: all
4926
$(MKDIR_P) '$(DESTDIR)$(includedir)'/pglogical_output
50-
$(INSTALL_DATA) pglogical_output/compat.h '$(DESTDIR)$(includedir)'/pglogical_output
5127
$(INSTALL_DATA) pglogical_output/hooks.h '$(DESTDIR)$(includedir)'/pglogical_output

contrib/pglogical_output/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,9 @@ Each hook has its own C signature (defined below) and the pointers must be
322322
directly to the functions. Hooks that the client does not wish to set must be
323323
left null.
324324

325-
An example is provided in `examples/hooks` and the argument structs are defined
326-
in `pglogical_output/hooks.h`, which is installed into the PostgreSQL source
327-
tree when the extension is installed.
325+
An example is provided in `contrib/pglogical_output_plhooks` and the argument
326+
structs are defined in `pglogical_output/hooks.h`, which is installed into the
327+
PostgreSQL source tree when the extension is installed.
328328

329329
Each hook that is enabled results in a new startup parameter being emitted in
330330
the startup reply message. Clients must check for these and must not assume a
@@ -427,7 +427,7 @@ in the hook context will be automatically when the decoding session shuts down.
427427
## Writing hooks in procedural languages
428428

429429
You can write hooks in PL/PgSQL, etc, too, via the `pglogical_output_plhooks`
430-
adapter extension in `examples/hooks`. They won't perform very well though.
430+
adapter extension in `contrib`. They won't perform very well though.
431431

432432
# Limitations
433433

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
protocol.html

contrib/pglogical_output/doc/protocol.txt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,17 @@ Since all parameter values are sent as strings, the value types given below spec
385385

386386
|max_proto_version|integer|Newest version of the protocol supported by output plugin.
387387
|min_proto_version|integer|Oldest protocol version supported by server.
388+
|proto_format|text|Protocol format requested. native (documented here) or json. Default is native.
388389
|coltypes|boolean|Column types will be sent in table metadata.
389390
|pg_version_num|integer|PostgreSQL server_version_num of server, if it’s PostgreSQL. e.g. 090400
390391
|pg_version|string|PostgreSQL server_version of server, if it’s PostgreSQL.
391392
|pg_catversion|uint32|Version of the PostgreSQL system catalogs on the upstream server, if it’s PostgreSQL.
392393
|binary|_set of parameters, specified separately_|See “_the __‘binary’__ parameters_” below, and “_Parameters relating to exchange of binary values_”
393-
|encoding|string|The text encoding used in the upstream database. Used for text fields.
394+
|database_encoding|string|The native text encoding of the database the plugin is running in
395+
|encoding|string|Field values for textual data will be in this encoding in native protocol text, binary or internal representation. For the native protocol this is currently always the same as `database_encoding`. For text-mode json protocol this is always the same as `client_encoding`.
394396
|forward_changesets|bool|Specifies that all transactions, not just those originating on the upstream, will be forwarded. See “_Changeset forwarding_”.
395397
|forward_changeset_origins|bool|Tells the client that the server will send changeset origin information. Independent of forward_changesets. See “_Changeset forwarding_” for details.
398+
|no_txinfo|bool|Requests that variable transaction info such as XIDs, LSNs, and timestamps be omitted from output. Mainly for tests. Currently ignored for protos other than json.
396399
|===
397400

398401

@@ -468,7 +471,7 @@ Because these versions are expected to be incremented, to make it clear that the
468471
|===
469472
|*Key*|*Type*|*Default*|*Notes*
470473

471-
|expected_encoding|string|null|The text encoding the downstream expects results to be in. If specified, the upstream must honour it.
474+
|expected_encoding|string|null|The text encoding the downstream expects field values to be in. Applies to text, binary and internal representations of field values in native format. Has no effect on other protocol content. If specified, the upstream must honour it. For json protocol, must be unset or match `client_encoding`. (Current plugin versions ERROR if this is set for the native protocol and not equal to the upstream database's encoding).
472475
|forward_changesets|bool|false|Request that all transactions, not just those originating on the upstream, be forwarded. See “_Changeset forwarding_”.
473476
|want_coltypes|boolean|false|The client wants to receive data type information about columns.
474477
|===
@@ -533,3 +536,11 @@ with a json object value to carry their extension information. Additions to the
533536
startup message should follow the same pattern.
534537

535538
Hooks and plugins can be used to add functionality specific to a client.
539+
540+
== JSON protocol
541+
542+
If `proto_format` is set to `json` then the output plugin will emit JSON
543+
instead of the custom binary protocol. JSON support is intended mainly for
544+
debugging and diagnostics.
545+
546+
The JSON format supports all the same hooks.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
\i sql/basic_setup.sql
2+
SET synchronous_commit = on;
3+
-- Schema setup
4+
CREATE TABLE demo (
5+
seq serial primary key,
6+
tx text,
7+
ts timestamp,
8+
jsb jsonb,
9+
js json,
10+
ba bytea
11+
);
12+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
13+
?column?
14+
----------
15+
init
16+
(1 row)
17+
18+
-- Queue up some work to decode with a variety of types
19+
INSERT INTO demo(tx) VALUES ('textval');
20+
INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
21+
INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
22+
INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
23+
-- Rolled back txn
24+
BEGIN;
25+
DELETE FROM demo;
26+
INSERT INTO demo(tx) VALUES ('blahblah');
27+
ROLLBACK;
28+
-- Multi-statement transaction with subxacts
29+
BEGIN;
30+
SAVEPOINT sp1;
31+
INSERT INTO demo(tx) VALUES ('row1');
32+
RELEASE SAVEPOINT sp1;
33+
SAVEPOINT sp2;
34+
UPDATE demo SET tx = 'update-rollback' WHERE tx = 'row1';
35+
ROLLBACK TO SAVEPOINT sp2;
36+
SAVEPOINT sp3;
37+
INSERT INTO demo(tx) VALUES ('row2');
38+
INSERT INTO demo(tx) VALUES ('row3');
39+
RELEASE SAVEPOINT sp3;
40+
SAVEPOINT sp4;
41+
DELETE FROM demo WHERE tx = 'row2';
42+
RELEASE SAVEPOINT sp4;
43+
SAVEPOINT sp5;
44+
UPDATE demo SET tx = 'updated' WHERE tx = 'row1';
45+
COMMIT;
46+
-- txn with catalog changes
47+
BEGIN;
48+
CREATE TABLE cat_test(id integer);
49+
INSERT INTO cat_test(id) VALUES (42);
50+
COMMIT;
51+
-- Aborted subxact with catalog changes
52+
BEGIN;
53+
INSERT INTO demo(tx) VALUES ('1');
54+
SAVEPOINT sp1;
55+
ALTER TABLE demo DROP COLUMN tx;
56+
ROLLBACK TO SAVEPOINT sp1;
57+
INSERT INTO demo(tx) VALUES ('2');
58+
COMMIT;
59+
-- Simple decode with text-format tuples
60+
SELECT data
61+
FROM pg_logical_slot_peek_changes('regression_slot',
62+
NULL, NULL,
63+
'expected_encoding', 'UTF8',
64+
'min_proto_version', '1',
65+
'max_proto_version', '1',
66+
'startup_params_format', '1',
67+
'proto_format', 'json',
68+
'no_txinfo', 't');
69+
data
70+
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
71+
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90600","pg_version":"9.6devel","pg_catversion":"201510161","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"f","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"906","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"906","no_txinfo":"t","hooks.startup_hook_enabled":"f","hooks.shutdown_hook_enabled":"f","hooks.row_filter_enabled":"f","hooks.transaction_filter_enabled":"f"}}
72+
{"action":"B", has_catalog_changes:"f"}
73+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":1,"tx":"textval","ts":null,"jsb":null,"js":null,"ba":null}}
74+
{{"action":"C"}}
75+
{"action":"B", has_catalog_changes:"f"}
76+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":2,"tx":null,"ts":null,"jsb":null,"js":null,"ba":"\\xdeadbeef0001"}}
77+
{{"action":"C"}}
78+
{"action":"B", has_catalog_changes:"f"}
79+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":3,"tx":"blah","ts":"2045-09-12T12:34:56","jsb":null,"js":null,"ba":null}}
80+
{{"action":"C"}}
81+
{"action":"B", has_catalog_changes:"f"}
82+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":4,"tx":null,"ts":null,"jsb":{"key": "value"},"js":{"key":"value"},"ba":null}}
83+
{{"action":"C"}}
84+
{"action":"B", has_catalog_changes:"f"}
85+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":6,"tx":"row1","ts":null,"jsb":null,"js":null,"ba":null}}
86+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":7,"tx":"row2","ts":null,"jsb":null,"js":null,"ba":null}}
87+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":8,"tx":"row3","ts":null,"jsb":null,"js":null,"ba":null}}
88+
{"action":"D","relation":["public","demo"],"oldtuple":{"seq":7,"tx":null,"ts":null,"jsb":null,"js":null,"ba":null}}
89+
{"action":"U","relation":["public","demo"],"newtuple":{"seq":6,"tx":"updated","ts":null,"jsb":null,"js":null,"ba":null}}
90+
{{"action":"C"}}
91+
{"action":"B", has_catalog_changes:"t"}
92+
{"action":"I","relation":["public","cat_test"],"newtuple":{"id":42}}
93+
{{"action":"C"}}
94+
{"action":"B", has_catalog_changes:"f"}
95+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":9,"tx":"1","ts":null,"jsb":null,"js":null,"ba":null}}
96+
{"action":"I","relation":["public","demo"],"newtuple":{"seq":10,"tx":"2","ts":null,"jsb":null,"js":null,"ba":null}}
97+
{{"action":"C"}}
98+
(27 rows)
99+
100+
\i sql/basic_teardown.sql
101+
SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
102+
?column?
103+
----------
104+
drop
105+
(1 row)
106+
107+
DROP TABLE demo;
108+
DROP TABLE cat_test;

contrib/pglogical_output/expected/basic.out renamed to contrib/pglogical_output/expected/basic_native.out

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
\i sql/basic_setup.sql
12
SET synchronous_commit = on;
23
-- Schema setup
34
CREATE TABLE demo (
@@ -19,6 +20,42 @@ INSERT INTO demo(tx) VALUES ('textval');
1920
INSERT INTO demo(ba) VALUES (BYTEA '\xDEADBEEF0001');
2021
INSERT INTO demo(ts, tx) VALUES (TIMESTAMP '2045-09-12 12:34:56.00', 'blah');
2122
INSERT INTO demo(js, jsb) VALUES ('{"key":"value"}', '{"key":"value"}');
23+
-- Rolled back txn
24+
BEGIN;
25+
DELETE FROM demo;
26+
INSERT INTO demo(tx) VALUES ('blahblah');
27+
ROLLBACK;
28+
-- Multi-statement transaction with subxacts
29+
BEGIN;
30+
SAVEPOINT sp1;
31+
INSERT INTO demo(tx) VALUES ('row1');
32+
RELEASE SAVEPOINT sp1;
33+
SAVEPOINT sp2;
34+
UPDATE demo SET tx = 'update-rollback' WHERE tx = 'row1';
35+
ROLLBACK TO SAVEPOINT sp2;
36+
SAVEPOINT sp3;
37+
INSERT INTO demo(tx) VALUES ('row2');
38+
INSERT INTO demo(tx) VALUES ('row3');
39+
RELEASE SAVEPOINT sp3;
40+
SAVEPOINT sp4;
41+
DELETE FROM demo WHERE tx = 'row2';
42+
RELEASE SAVEPOINT sp4;
43+
SAVEPOINT sp5;
44+
UPDATE demo SET tx = 'updated' WHERE tx = 'row1';
45+
COMMIT;
46+
-- txn with catalog changes
47+
BEGIN;
48+
CREATE TABLE cat_test(id integer);
49+
INSERT INTO cat_test(id) VALUES (42);
50+
COMMIT;
51+
-- Aborted subxact with catalog changes
52+
BEGIN;
53+
INSERT INTO demo(tx) VALUES ('1');
54+
SAVEPOINT sp1;
55+
ALTER TABLE demo DROP COLUMN tx;
56+
ROLLBACK TO SAVEPOINT sp1;
57+
INSERT INTO demo(tx) VALUES ('2');
58+
COMMIT;
2259
-- Simple decode with text-format tuples
2360
--
2461
-- It's still the logical decoding binary protocol and as such it has
@@ -33,7 +70,7 @@ SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
3370
'startup_params_format', '1');
3471
count
3572
-------
36-
17
73+
39
3774
(1 row)
3875

3976
-- ... and send/recv binary format
@@ -48,13 +85,15 @@ SELECT count(data) FROM pg_logical_slot_peek_binary_changes('regression_slot',
4885
'binary.basetypes_major_version', (current_setting('server_version_num')::integer / 100)::text);
4986
count
5087
-------
51-
17
88+
39
5289
(1 row)
5390

91+
\i sql/basic_teardown.sql
5492
SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
5593
?column?
5694
----------
5795
drop
5896
(1 row)
5997

6098
DROP TABLE demo;
99+
DROP TABLE cat_test;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
SET synchronous_commit = on;
2+
-- This file doesn't share common setup with the native tests,
3+
-- since it's specific to how the text protocol handles encodings.
4+
CREATE TABLE enctest(blah text);
5+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');
6+
?column?
7+
----------
8+
init
9+
(1 row)
10+
11+
SET client_encoding = 'UTF-8';
12+
INSERT INTO enctest(blah)
13+
VALUES
14+
('áàä'),('fl'), ('½⅓'), ('カンジ');
15+
RESET client_encoding;
16+
SET client_encoding = 'LATIN-1';
17+
-- Will ERROR, explicit encoding request doesn't match client_encoding
18+
SELECT data
19+
FROM pg_logical_slot_peek_changes('regression_slot',
20+
NULL, NULL,
21+
'expected_encoding', 'UTF8',
22+
'min_proto_version', '1',
23+
'max_proto_version', '1',
24+
'startup_params_format', '1',
25+
'proto_format', 'json',
26+
'no_txinfo', 't');
27+
ERROR: expected_encoding must be unset or match client_encoding in text protocols
28+
CONTEXT: slot "regression_slot", output plugin "pglogical_output", in the startup callback
29+
-- Will succeed since we don't request any encoding
30+
-- then ERROR because it can't turn the kanjii into latin-1
31+
SELECT data
32+
FROM pg_logical_slot_peek_changes('regression_slot',
33+
NULL, NULL,
34+
'min_proto_version', '1',
35+
'max_proto_version', '1',
36+
'startup_params_format', '1',
37+
'proto_format', 'json',
38+
'no_txinfo', 't');
39+
ERROR: character with byte sequence 0xef 0xac 0x82 in encoding "UTF8" has no equivalent in encoding "LATIN1"
40+
-- Will succeed since it matches the current encoding
41+
-- then ERROR because it can't turn the kanjii into latin-1
42+
SELECT data
43+
FROM pg_logical_slot_peek_changes('regression_slot',
44+
NULL, NULL,
45+
'expected_encoding', 'LATIN-1',
46+
'min_proto_version', '1',
47+
'max_proto_version', '1',
48+
'startup_params_format', '1',
49+
'proto_format', 'json',
50+
'no_txinfo', 't');
51+
ERROR: character with byte sequence 0xef 0xac 0x82 in encoding "UTF8" has no equivalent in encoding "LATIN1"
52+
RESET client_encoding;
53+
SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
54+
?column?
55+
----------
56+
drop
57+
(1 row)
58+
59+
DROP TABLE enctest;

0 commit comments

Comments
 (0)