Skip to content

Commit ba0a323

Browse files
committed
pglogical_output v3 - json fixes, test improvements
- fix stray braces in commit json output - quote all json keys - validate all json in tests - decode json startup message to a table - filter out mutable keys in json startup message - troubleshooting section of README covers using json - documentation copy editing
1 parent 659daae commit ba0a323

17 files changed

+704
-160
lines changed

contrib/pglogical_output/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ OBJS = pglogical_output.o pglogical_hooks.o pglogical_config.o \
55
pglogical_proto.o pglogical_proto_native.o \
66
pglogical_proto_json.o
77

8-
REGRESS = pre_clean params_native basic_native hooks_native basic_json hooks_json encoding_json
8+
REGRESS = prep params_native basic_native hooks_native basic_json hooks_json encoding_json cleanup
99

1010

1111
subdir = contrib/pglogical_output

contrib/pglogical_output/README.md

Lines changed: 111 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Unlike block-level ("physical") streaming replication, the change stream from
2828
the `pglogical` output plugin is compatible across different PostgreSQL
2929
versions and can even be consumed by non-PostgreSQL clients.
3030

31-
Becuse logical decoding is used, only the changed rows are sent on the wire.
31+
Because logical decoding is used, only the changed rows are sent on the wire.
3232
There's no index change data, no vacuum activity, etc transmitted.
3333

3434
The use of a replication slot means that the change stream is reliable and
@@ -120,7 +120,7 @@ information:
120120
6153224364663410513 | 1 | 0/C429C48 | testd | 16385
121121
(1 row)
122122

123-
Details in the replication protocol docs.
123+
Details are in the replication protocol docs.
124124

125125
## Create the slot if required
126126

@@ -225,8 +225,8 @@ changes to the downstreams.
225225
There are three forwarding modes:
226226

227227
* Forward everything. Transactions are replicated whether they were made directly
228-
on the immediate upstream or some other node upstream of it. This is the only
229-
option when running on 9.4. All rows from transactions are sent.
228+
on the immediate upstream or some other node upstream of it. All rows from all
229+
transactions are sent.
230230

231231
Selected by setting `forward_changesets` to true (default) and not setting a
232232
row or transaction filter hook.
@@ -257,10 +257,7 @@ always from the immediate upstream that’s running the decoding plugin.
257257

258258
Note that changeset forwarding may be forced to on if not requested by some
259259
servers, so the client _should_ check the forward_changesets and
260-
`forward_changeset_origins` params in the startup reply message. In particular,
261-
9.4 servers force changeset forwarding on, but never forward replication
262-
origins. This means you cannot use 9.4 for mutual replication as it’ll create
263-
an infinite loop.
260+
`forward_changeset_origins` params in the startup reply message.
264261

265262
Clients may use this facility to form arbitrarily complex topologies when
266263
combined with hooks to determine which transactions are forwarded. An obvious
@@ -369,9 +366,6 @@ level logical decoding transaction filter hook).
369366

370367
The hook function must *not* free the argument struct or modify its contents.
371368

372-
The transaction filter hook is only called on PostgreSQL 9.5 and above. It
373-
is ignored on 9.4.
374-
375369
Note that individual changes within a transaction may have different origins to
376370
the transaction as a whole; see "Origin filtering" for more details. If a
377371
transaction is filtered out, all changes are filtered out even if their origins
@@ -475,7 +469,7 @@ clients, creating new slots, etc. This is a core PostgreSQL limitation.
475469

476470
Also, there's no built-in way to guarantee that the logical replication slot
477471
from the failed master hasn't replayed further than the physical streaming
478-
replica you failed over to. You could recieve changes on your logical decoding
472+
replica you failed over to. You could receive changes on your logical decoding
479473
stream from the old master that never made it to the physical streaming
480474
replica. This is true (albeit very unlikely) *even if the physical streaming
481475
replica is synchronous* because PostgreSQL sends the replication data anyway,
@@ -512,7 +506,7 @@ old key to send to allow the receiver to tell which tuple is being updated.
512506
## UNLOGGED tables aren't replicated
513507

514508
Because `UNLOGGED` tables aren't written to WAL, they aren't replicated by
515-
logical or physical repliation. You can only replicate `UNLOGGED` tables
509+
logical or physical replication. You can only replicate `UNLOGGED` tables
516510
with trigger-based solutions.
517511

518512
## Unchanged fields are often sent in `UPDATE`
@@ -522,3 +516,107 @@ logical decoding can't tell if a given field was changed by an update.
522516
Unchanged fields can only by identified and omitted if they're a variable
523517
length TOASTable type and are big enough to get stored out-of-line in
524518
a TOAST table.
519+
520+
# Troubleshooting and debugging
521+
522+
## Non-destructively previewing pending data on a slot
523+
524+
Using the json mode of `pglogical_output` you can examine pending transactions
525+
on a slot without consuming them, so they are still delivered to the usual
526+
client application that created/owns this slot. This is best done using the SQL
527+
interface to logical decoding, since it gives you finer control than using
528+
`pg_recvlogical`.
529+
530+
You can only peek at a slot while there is no other client connected to that
531+
slot.
532+
533+
Use `pg_logical_slot_peek_changes` to examine the change stream without
534+
destructively consuming changes. This is extremely helpful when trying to
535+
determine why an error occurs in a downstream, since you can examine a
536+
json-ified representation of the xact. It's necessary to supply a minimal
537+
set of required parameters to the output plugin.
538+
539+
e.g. given setup:
540+
541+
CREATE TABLE discard_test(blah text);
542+
SELECT 'init' FROM pg_create_logical_replication_slot('demo_slot', 'pglogical_output');
543+
INSERT INTO discard_test(blah) VALUES('one');
544+
INSERT INTO discard_test(blah) VALUES('two1'),('two2'),('two3');
545+
INSERT INTO discard_test(blah) VALUES('three1'),('three2');
546+
547+
you can peek at the change stream with:
548+
549+
SELECT location, xid, data
550+
FROM pg_logical_slot_peek_changes('demo_slot', NULL, NULL,
551+
'min_proto_version', '1', 'max_proto_version', '1',
552+
'startup_params_format', '1', 'proto_format', 'json');
553+
554+
The two `NULL`s mean you don't want to stop decoding after any particular
555+
LSN or any particular number of changes. Decoding will stop when there's nothing
556+
left to decode or you cancel the query.
557+
558+
This will emit a key/value startup message then change data rows like:
559+
560+
location | xid | data
561+
0/4E8AAF0 | 5562 | {"action":"B", has_catalog_changes:"f", xid:"5562", first_lsn:"0/4E8AAF0", commit_time:"2015-11-13 14:26:21.404425+08"}
562+
0/4E8AAF0 | 5562 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"one"}}
563+
0/4E8AB70 | 5562 | {"action":"C", final_lsn:"0/4E8AB30", end_lsn:"0/4E8AB70"}
564+
0/4E8ABA8 | 5563 | {"action":"B", has_catalog_changes:"f", xid:"5563", first_lsn:"0/4E8ABA8", commit_time:"2015-11-13 14:26:32.015611+08"}
565+
0/4E8ABA8 | 5563 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"two1"}}
566+
0/4E8ABE8 | 5563 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"two2"}}
567+
0/4E8AC28 | 5563 | {"action":"I","relation":["public","discard_test"],"newtuple":{"blah":"two3"}}
568+
0/4E8ACA8 | 5563 | {"action":"C", final_lsn:"0/4E8AC68", end_lsn:"0/4E8ACA8"}
569+
....
570+
571+
The output is the LSN (log sequence number) associated with a change, the top
572+
level transaction ID that performed the change, and the change data as json.
573+
574+
You can see the transaction boundaries by xid changes and by the "B"egin and
575+
"C"ommit messages, and you can see the individual row "I"nserts. Replication
576+
origins, commit timestamps, etc will be shown if known.
577+
578+
See http://www.postgresql.org/docs/current/static/functions-admin.html for
579+
information on the peek functions.
580+
581+
If you want the binary format you can get that with
582+
`pg_logical_slot_peek_binary_changes` and the `native` protocol, but that's
583+
generally much less useful.
584+
585+
# Manually discarding a change from a slot
586+
587+
Sometimes it's desirable to manually purge one or more changes from a
588+
replication slot. This is usually an error recovery step when problems arise
589+
with the downstream code that's replaying from the slot.
590+
591+
You can use the peek functions to determine the point in the stream you want to
592+
discard up to, as identifed by LSN (log sequence number). See
593+
"non-destructively previewing pending data on a slot" above for details.
594+
595+
You can't control the point you start discarding from, it's always from the
596+
current stream position up to a point you specify. If the peek shows that
597+
there's data you still want to retain you must make sure that the downstream
598+
replays up to the point you want to keep changes and sends replay confirmation.
599+
In other words there's no way to cut a sequence of changes out of the middle of
600+
the pending change stream.
601+
602+
Once you've peeked the stream and know the LSN you want to discard up to, you
603+
can use `pg_logical_slot_peek_changes`, specifying an `upto_lsn`, to consume
604+
changes from the slot up to but not including that point, i.e. that will be the
605+
point at which replay resumes.
606+
607+
For example, if you wanted to discard the first transaction in the example
608+
from the section above, i.e. discard xact 5562 and start decoding at xact
609+
5563 from its' BEGIN lsn `0/4E8ABA8`, you'd run:
610+
611+
SELECT location, xid, data
612+
FROM pg_logical_slot_get_changes('demo_slot', '0/4E8ABA8', NULL,
613+
'min_proto_version', '1', 'max_proto_version', '1',
614+
'startup_params_format', '1', 'proto_format', 'json');
615+
616+
Note that `_get_changes` is used instead of `_peek_changes` and that
617+
the `upto_lsn` is `'0/4E8ABA8'` instead of `NULL`.
618+
619+
620+
621+
622+

contrib/pglogical_output/expected/basic_json.out

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ ROLLBACK TO SAVEPOINT sp1;
5757
INSERT INTO demo(tx) VALUES ('2');
5858
COMMIT;
5959
-- Simple decode with text-format tuples
60-
SELECT data
60+
TRUNCATE TABLE json_decoding_output;
61+
INSERT INTO json_decoding_output(ch, rn)
62+
SELECT
63+
data::jsonb,
64+
row_number() OVER ()
6165
FROM pg_logical_slot_peek_changes('regression_slot',
6266
NULL, NULL,
6367
'expected_encoding', 'UTF8',
@@ -66,37 +70,64 @@ FROM pg_logical_slot_peek_changes('regression_slot',
6670
'startup_params_format', '1',
6771
'proto_format', 'json',
6872
'no_txinfo', 't');
69-
data
70-
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
71-
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90600","pg_version":"9.6devel","pg_catversion":"201510161","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"f","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"906","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"906","no_txinfo":"t","hooks.startup_hook_enabled":"f","hooks.shutdown_hook_enabled":"f","hooks.row_filter_enabled":"f","hooks.transaction_filter_enabled":"f"}}
72-
{"action":"B", has_catalog_changes:"f"}
73-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":1,"tx":"textval","ts":null,"jsb":null,"js":null,"ba":null}}
74-
{{"action":"C"}}
75-
{"action":"B", has_catalog_changes:"f"}
76-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":2,"tx":null,"ts":null,"jsb":null,"js":null,"ba":"\\xdeadbeef0001"}}
77-
{{"action":"C"}}
78-
{"action":"B", has_catalog_changes:"f"}
79-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":3,"tx":"blah","ts":"2045-09-12T12:34:56","jsb":null,"js":null,"ba":null}}
80-
{{"action":"C"}}
81-
{"action":"B", has_catalog_changes:"f"}
82-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":4,"tx":null,"ts":null,"jsb":{"key": "value"},"js":{"key":"value"},"ba":null}}
83-
{{"action":"C"}}
84-
{"action":"B", has_catalog_changes:"f"}
85-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":6,"tx":"row1","ts":null,"jsb":null,"js":null,"ba":null}}
86-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":7,"tx":"row2","ts":null,"jsb":null,"js":null,"ba":null}}
87-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":8,"tx":"row3","ts":null,"jsb":null,"js":null,"ba":null}}
88-
{"action":"D","relation":["public","demo"],"oldtuple":{"seq":7,"tx":null,"ts":null,"jsb":null,"js":null,"ba":null}}
89-
{"action":"U","relation":["public","demo"],"newtuple":{"seq":6,"tx":"updated","ts":null,"jsb":null,"js":null,"ba":null}}
90-
{{"action":"C"}}
91-
{"action":"B", has_catalog_changes:"t"}
92-
{"action":"I","relation":["public","cat_test"],"newtuple":{"id":42}}
93-
{{"action":"C"}}
94-
{"action":"B", has_catalog_changes:"f"}
95-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":9,"tx":"1","ts":null,"jsb":null,"js":null,"ba":null}}
96-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":10,"tx":"2","ts":null,"jsb":null,"js":null,"ba":null}}
97-
{{"action":"C"}}
98-
(27 rows)
73+
SELECT * FROM get_startup_params();
74+
key | value
75+
----------------------------------+--------
76+
binary.binary_basetypes | "f"
77+
binary.float4_byval | "t"
78+
binary.float8_byval | "t"
79+
binary.internal_basetypes | "f"
80+
binary.sizeof_datum | "8"
81+
binary.sizeof_int | "4"
82+
binary.sizeof_long | "8"
83+
coltypes | "f"
84+
database_encoding | "UTF8"
85+
encoding | "UTF8"
86+
forward_changeset_origins | "f"
87+
forward_changesets | "f"
88+
hooks.row_filter_enabled | "f"
89+
hooks.shutdown_hook_enabled | "f"
90+
hooks.startup_hook_enabled | "f"
91+
hooks.transaction_filter_enabled | "f"
92+
max_proto_version | "1"
93+
min_proto_version | "1"
94+
no_txinfo | "t"
95+
(19 rows)
9996

97+
SELECT * FROM get_queued_data();
98+
data
99+
--------------------------------------------------------------------------------------------------------------------------------------------------------------
100+
{"action": "B", "has_catalog_changes": "f"}
101+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "textval", "jsb": null, "seq": 1}, "relation": ["public", "demo"]}
102+
{"action": "C"}
103+
{"action": "B", "has_catalog_changes": "f"}
104+
{"action": "I", "newtuple": {"ba": "\\xdeadbeef0001", "js": null, "ts": null, "tx": null, "jsb": null, "seq": 2}, "relation": ["public", "demo"]}
105+
{"action": "C"}
106+
{"action": "B", "has_catalog_changes": "f"}
107+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": "2045-09-12T12:34:56", "tx": "blah", "jsb": null, "seq": 3}, "relation": ["public", "demo"]}
108+
{"action": "C"}
109+
{"action": "B", "has_catalog_changes": "f"}
110+
{"action": "I", "newtuple": {"ba": null, "js": {"key": "value"}, "ts": null, "tx": null, "jsb": {"key": "value"}, "seq": 4}, "relation": ["public", "demo"]}
111+
{"action": "C"}
112+
{"action": "B", "has_catalog_changes": "f"}
113+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row1", "jsb": null, "seq": 6}, "relation": ["public", "demo"]}
114+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row2", "jsb": null, "seq": 7}, "relation": ["public", "demo"]}
115+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row3", "jsb": null, "seq": 8}, "relation": ["public", "demo"]}
116+
{"action": "D", "oldtuple": {"ba": null, "js": null, "ts": null, "tx": null, "jsb": null, "seq": 7}, "relation": ["public", "demo"]}
117+
{"action": "U", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "updated", "jsb": null, "seq": 6}, "relation": ["public", "demo"]}
118+
{"action": "C"}
119+
{"action": "B", "has_catalog_changes": "t"}
120+
{"action": "I", "newtuple": {"id": 42}, "relation": ["public", "cat_test"]}
121+
{"action": "C"}
122+
{"action": "B", "has_catalog_changes": "f"}
123+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "1", "jsb": null, "seq": 9}, "relation": ["public", "demo"]}
124+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "2", "jsb": null, "seq": 10}, "relation": ["public", "demo"]}
125+
{"action": "C"}
126+
{"action": "B", "has_catalog_changes": "t"}
127+
{"action": "C"}
128+
(28 rows)
129+
130+
TRUNCATE TABLE json_decoding_output;
100131
\i sql/basic_teardown.sql
101132
SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
102133
?column?

0 commit comments

Comments
 (0)