Skip to content

Commit 75f4d3d

Browse files
committed
pglogical v4
* Implement relation metadata caching * Add the relmeta_cache_size parameter for cache control * Add an extension to get version information * Create the pglogical_output header directory on install * 9.4 compatibility
1 parent 0c31414 commit 75f4d3d

29 files changed

+927
-157
lines changed

contrib/pglogical_output/Makefile

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,38 @@ PGFILEDESC = "pglogical_output - logical replication output plugin"
33

44
OBJS = pglogical_output.o pglogical_hooks.o pglogical_config.o \
55
pglogical_proto.o pglogical_proto_native.o \
6-
pglogical_proto_json.o
6+
pglogical_proto_json.o pglogical_relmetacache.o \
7+
pglogical_infofuncs.o
78

8-
REGRESS = prep params_native basic_native hooks_native basic_json hooks_json encoding_json cleanup
9+
REGRESS = prep params_native basic_native hooks_native basic_json hooks_json encoding_json extension cleanup
910

11+
EXTENSION = pglogical_output
12+
DATA = pglogical_output--1.0.0.sql
13+
EXTRA_CLEAN += pglogical_output.control
14+
15+
16+
ifdef USE_PGXS
17+
18+
# For regression checks
19+
# http://www.postgresql.org/message-id/CAB7nPqTsR5o3g-fBi6jbsVdhfPiLFWQ_0cGU5=94Rv_8W3qvFA@mail.gmail.com
20+
# this makes "make check" give a useful error
21+
abs_top_builddir = .
22+
NO_TEMP_INSTALL = yes
23+
# Usual recipe
24+
PG_CONFIG = pg_config
25+
PGXS := $(shell $(PG_CONFIG) --pgxs)
26+
include $(PGXS)
27+
28+
# These don't do anything yet, since temp install is disabled
29+
EXTRA_INSTALL += ./examples/hooks
30+
REGRESS_OPTS += --temp-config=regression.conf
31+
32+
plhooks:
33+
make -C examples/hooks USE_PGXS=1 clean install
34+
35+
installcheck: plhooks
36+
37+
else
1038

1139
subdir = contrib/pglogical_output
1240
top_builddir = ../..
@@ -16,11 +44,28 @@ include $(top_srcdir)/contrib/contrib-global.mk
1644
# 'make installcheck' disabled when building in-tree because these tests
1745
# require "wal_level=logical", which typical installcheck users do not have
1846
# (e.g. buildfarm clients).
19-
installcheck:;
47+
installcheck:
48+
;
2049

2150
EXTRA_INSTALL += contrib/pglogical_output_plhooks
2251
EXTRA_REGRESS_OPTS += --temp-config=./regression.conf
2352

2453
install: all
54+
55+
endif
56+
57+
# The # in #define is taken as a comment, per https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=142043
58+
# so it must be escaped. The $ placeholders in awk must be doubled too.
59+
pglogical_output_version=$(shell awk '/\#define PGLOGICAL_OUTPUT_VERSION[ \t]+\".*\"/ { print substr($$3,2,length($$3)-2) }' pglogical_output.h )
60+
61+
all: pglogical_output.control
62+
63+
pglogical_output.control: pglogical_output.control.in pglogical_output.h
64+
sed 's/__PGLOGICAL_OUTPUT_VERSION__/$(pglogical_output_version)/' pglogical_output.control.in > pglogical_output.control
65+
66+
install: header_install
67+
68+
header_install: pglogical_output/compat.h pglogical_output/hooks.h
2569
$(MKDIR_P) '$(DESTDIR)$(includedir)'/pglogical_output
70+
$(INSTALL_DATA) pglogical_output/compat.h '$(DESTDIR)$(includedir)'/pglogical_output
2671
$(INSTALL_DATA) pglogical_output/hooks.h '$(DESTDIR)$(includedir)'/pglogical_output

contrib/pglogical_output/doc/DESIGN.md

Lines changed: 114 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@ not the priority as tools like wal2json already exist for that.
3030

3131
## Column metadata
3232

33-
The output plugin sends metadata for columsn - at minimum, the column names -
34-
before each row. It will soon be changed to send the data before each row from
35-
a new, different table, so that streams of inserts from COPY etc don't repeat
36-
the metadata each time. That's just a pending feature.
33+
The output plugin sends metadata for columns - at minimum, the column names -
34+
before each row that first refers to that relation.
3735

3836
The reason metadata must be sent is that the upstream and downstream table's
3937
attnos don't necessarily correspond. The column names might, and their ordering
@@ -53,16 +51,67 @@ maintained by DDL replication, but:
5351

5452
So despite the bandwidth cost, we need to send metadata.
5553

56-
In future a client-negotiated cache is planned, so that clients can announce
57-
to the output plugin that they can cache metadata across change series, and
58-
metadata can only be sent when invalidated by relation changes or when a new
59-
relation is seen.
60-
6154
Support for type metadata is penciled in to the protocol so that clients that
6255
don't have table definitions at all - like queueing engines - can decode the
6356
data. That'll also permit type validation sanity checking on the apply side
6457
with logical replication.
6558

59+
The upstream expects the client to cache this metadata and re-use it when data
60+
is sent for the relation again. Cache size controls, an LRU and purge
61+
notifications will be added.
62+
63+
## Relation metadata cache size controls
64+
65+
The relation metadata cache will have downstream size control added. The
66+
downstream will send a parameter indicating that it supports caching, and the
67+
maximum cache size desired. The option will have settings for "no cache",
68+
"cache unlimited" and "fixed size LRU [size specified]".
69+
70+
Since there is no downstream-to-upstream communication after the startup params
71+
there's no easy way for the downstream to tell the upstream when it purges
72+
cache entries. So the downstream cache is a slave cache that must depend
73+
strictly on the upstream cache. The downstream tells the upstream how to manage
74+
its cache and then after that it just follows orders.
75+
76+
To keep the caches in sync so the upstream never sends a row without knowing
77+
the downstream has metadata for it cached the downstream must always cache
78+
relation metadata when it receives it, and may not purge it from its cache
79+
until it receives a purge message for that relation from the upstream. If a
80+
new metadata message for the same relation arrives it *must* replace the old
81+
entry in the cache.
82+
83+
The downstream does *not* have to promptly purge or invalidate cache entries
84+
when it gets purge messages from the upstream. They are just notifications that
85+
the upstream no longer expects the downstream to retain that cache entry and
86+
will re-send it if it is required again later.
87+
88+
## Not an extension
89+
90+
There's no extension script for pglogical_output. That's by design. We've tried
91+
really hard to avoid needing one, allowing applications using pglogical_output
92+
to entirely define any SQL level catalogs they need and interact with them
93+
using the hooks.
94+
95+
That way applications don't have to deal with some of their catalog data being
96+
in pglogical_output extension catalogs and some being in their own.
97+
98+
There's no issue with dump and restore that way either. The app controls it
99+
entirely and pglogical_output doesn't need any policy or tools for it.
100+
101+
pglogical_output is meant to be a re-usable component of other solutions. Users
102+
shouldn't need to care about it directly.
103+
104+
## Hooks
105+
106+
Quite a bit of functionality that could be done directly in the output
107+
plugin is instead delegated to pluggable hooks. Replication origin filtering
108+
for example.
109+
110+
That's because pglogical_output tries hard not to know anything about the
111+
topology of the replication cluster and leave that to applications using the
112+
plugin. It doesn't
113+
114+
66115
## Hook entry point as a SQL function
67116

68117
The hooks entry point is a SQL function that populates a passed `internal`
@@ -123,17 +172,59 @@ disconnect and report an error to the user if the server didn't do what it
123172
asked. This can be important, e.g. when a security-significant hook is
124173
specified.
125174

126-
## XIDs only in begin/commit
127-
128-
There's no need to send transaction IDs in each protocol message because
129-
logical decoding accumulates transactions' changes on the upstream in
130-
reorder buffers. It sends them only when they are committed, strictly
131-
in the order that they are committed.
132-
133-
If support for streaming transactions through logical decoding before
134-
commit is added to PostgreSQL in future, the pglogical output plugin
135-
will continue to rely on xact reordering by default. Interleaving will
136-
only be enabled if the client sends a startup parameter indicating it
137-
expects/supports interleaving. So we can add xid fields to individual
138-
messages in that case, if and when support is added, without breaking
139-
other clients.
175+
## Support for transaction streaming
176+
177+
Presently logical decoding requires that a transaction has committed before it
178+
can *begin* sending it to the client. This means long running xacts can take 2x
179+
as long, since we can't start apply on the replica until the xact is committed
180+
on the master.
181+
182+
Additionally, a big xact will cause large delays in apply of smaller
183+
transactions because logical decoding reoreders transactions into strict commit
184+
order and replays them in that sequence. Small transactions that commited after
185+
the big transaction cannot be replayed to the replica until the big transaction
186+
is transferred over the wire, and we can't get a head start on that while it's
187+
still running.
188+
189+
Finally, the accumulation of a big transaction in the reorder buffer means that
190+
storage on the upstream must be sufficient to hold the entire transaction until
191+
it can be streamed to the replica and discarded. That is in addition to the
192+
copy in retained WAL, which cannot be purged until replay is confirmed past
193+
commit for that xact. The temporary copy serves no data safety purpose; it can
194+
be regenerated from retained WAL is just a spool file.
195+
196+
There are big upsides to waiting until commit. Rolled-back transactions and
197+
subtransactions are never sent at all. The apply/downstream side is greatly
198+
simplified by not needing to do transaction ordering, worry about
199+
interdependencies and conflicts during apply. The commit timestamp is known
200+
from the beginning of replay, allowing for smarter conflict resolution
201+
behaviour in multi-master scenarios. Nonetheless sometimes we want to be able
202+
to stream changes in advance of commit.
203+
204+
So we need the ability to start streaming a transaction from the upstream as
205+
its changes are seen in WAL, either applying it immediately on the downstream
206+
or spooling it on the downstream until it's committed. This requires changes
207+
to the logical decoding facilities themselves, it isn't something pglogical_output
208+
can do alone. However, we've left room in pglogical_output to support this
209+
when support is added to logical decoding:
210+
211+
* Flags in most message types let us add fields if we need to, like a
212+
HAS_XID flag and an extra field for the transaction ID so we can
213+
differentiate between concurrent transactions when streaming. The space
214+
isn't wasted the rest of the time.
215+
216+
* The upstream isn't allowed to send new message types, etc, without a
217+
capability flag being set by the client. So for interleaved xacts we won't
218+
enable them in logical decoding unless the client tells us the client is
219+
prepared to cope with them by sending additional startup parameters.
220+
221+
Note that for consistency reasons we still have to commit things in the same
222+
order on the downstream. The purpose of transaction streaming is to reduce the
223+
latency between the commit of the last xact before a big one and the first xact
224+
after the big one, minimising the duration of the stall in the flow of smaller
225+
xacts perceptible on the downstream.
226+
227+
Transaction streaming also makes parallel apply on the downstream possible,
228+
though it is not necessary to have parallel apply to benefit from transaction
229+
streaming. Parallel apply has further complexities that are outside the scope
230+
of the output plugin design.

contrib/pglogical_output/expected/basic_json.out

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ FROM pg_logical_slot_peek_changes('regression_slot',
7171
'proto_format', 'json',
7272
'no_txinfo', 't');
7373
SELECT * FROM get_startup_params();
74-
key | value
75-
----------------------------------+--------
74+
key | value
75+
----------------------------------+---------
7676
binary.binary_basetypes | "f"
7777
binary.float4_byval | "t"
7878
binary.float8_byval | "t"
@@ -91,7 +91,9 @@ SELECT * FROM get_startup_params();
9191
max_proto_version | "1"
9292
min_proto_version | "1"
9393
no_txinfo | "t"
94-
(18 rows)
94+
pglogical_output_version | "10000"
95+
relmeta_cache_size | "0"
96+
(20 rows)
9597

9698
SELECT * FROM get_queued_data();
9799
data

contrib/pglogical_output/expected/basic_json_1.out

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ ROLLBACK TO SAVEPOINT sp1;
5757
INSERT INTO demo(tx) VALUES ('2');
5858
COMMIT;
5959
-- Simple decode with text-format tuples
60-
SELECT data::json
60+
TRUNCATE TABLE json_decoding_output;
61+
INSERT INTO json_decoding_output(ch, rn)
62+
SELECT
63+
data::jsonb,
64+
row_number() OVER ()
6165
FROM pg_logical_slot_peek_changes('regression_slot',
6266
NULL, NULL,
6367
'expected_encoding', 'UTF8',
@@ -66,37 +70,64 @@ FROM pg_logical_slot_peek_changes('regression_slot',
6670
'startup_params_format', '1',
6771
'proto_format', 'json',
6872
'no_txinfo', 't');
69-
data
70-
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
71-
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changeset_origins":"t","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"f","hooks.shutdown_hook_enabled":"f","hooks.row_filter_enabled":"f","hooks.transaction_filter_enabled":"f"}}
72-
{"action":"B", "has_catalog_changes":"f"}
73-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":1,"tx":"textval","ts":null,"jsb":null,"js":null,"ba":null}}
74-
{"action":"C"}
75-
{"action":"B", "has_catalog_changes":"f"}
76-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":2,"tx":null,"ts":null,"jsb":null,"js":null,"ba":"\\xdeadbeef0001"}}
77-
{"action":"C"}
78-
{"action":"B", "has_catalog_changes":"f"}
79-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":3,"tx":"blah","ts":"2045-09-12T12:34:56","jsb":null,"js":null,"ba":null}}
80-
{"action":"C"}
81-
{"action":"B", "has_catalog_changes":"f"}
82-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":4,"tx":null,"ts":null,"jsb":{"key": "value"},"js":{"key":"value"},"ba":null}}
83-
{"action":"C"}
84-
{"action":"B", "has_catalog_changes":"f"}
85-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":6,"tx":"row1","ts":null,"jsb":null,"js":null,"ba":null}}
86-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":7,"tx":"row2","ts":null,"jsb":null,"js":null,"ba":null}}
87-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":8,"tx":"row3","ts":null,"jsb":null,"js":null,"ba":null}}
88-
{"action":"D","relation":["public","demo"],"oldtuple":{"seq":7,"tx":null,"ts":null,"jsb":null,"js":null,"ba":null}}
89-
{"action":"U","relation":["public","demo"],"newtuple":{"seq":6,"tx":"updated","ts":null,"jsb":null,"js":null,"ba":null}}
90-
{"action":"C"}
91-
{"action":"B", "has_catalog_changes":"t"}
92-
{"action":"I","relation":["public","cat_test"],"newtuple":{"id":42}}
93-
{"action":"C"}
94-
{"action":"B", "has_catalog_changes":"f"}
95-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":9,"tx":"1","ts":null,"jsb":null,"js":null,"ba":null}}
96-
{"action":"I","relation":["public","demo"],"newtuple":{"seq":10,"tx":"2","ts":null,"jsb":null,"js":null,"ba":null}}
97-
{"action":"C"}
98-
(27 rows)
73+
SELECT * FROM get_startup_params();
74+
key | value
75+
----------------------------------+--------
76+
binary.binary_basetypes | "f"
77+
binary.float4_byval | "t"
78+
binary.float8_byval | "t"
79+
binary.internal_basetypes | "f"
80+
binary.sizeof_datum | "8"
81+
binary.sizeof_int | "4"
82+
binary.sizeof_long | "8"
83+
coltypes | "f"
84+
database_encoding | "UTF8"
85+
encoding | "UTF8"
86+
forward_changeset_origins | "f"
87+
hooks.row_filter_enabled | "f"
88+
hooks.shutdown_hook_enabled | "f"
89+
hooks.startup_hook_enabled | "f"
90+
hooks.transaction_filter_enabled | "f"
91+
max_proto_version | "1"
92+
min_proto_version | "1"
93+
no_txinfo | "t"
94+
relmeta_cache_size | "0"
95+
(19 rows)
9996

97+
SELECT * FROM get_queued_data();
98+
data
99+
--------------------------------------------------------------------------------------------------------------------------------------------------------------
100+
{"action": "B", "has_catalog_changes": "f"}
101+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "textval", "jsb": null, "seq": 1}, "relation": ["public", "demo"]}
102+
{"action": "C"}
103+
{"action": "B", "has_catalog_changes": "f"}
104+
{"action": "I", "newtuple": {"ba": "\\xdeadbeef0001", "js": null, "ts": null, "tx": null, "jsb": null, "seq": 2}, "relation": ["public", "demo"]}
105+
{"action": "C"}
106+
{"action": "B", "has_catalog_changes": "f"}
107+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": "2045-09-12T12:34:56", "tx": "blah", "jsb": null, "seq": 3}, "relation": ["public", "demo"]}
108+
{"action": "C"}
109+
{"action": "B", "has_catalog_changes": "f"}
110+
{"action": "I", "newtuple": {"ba": null, "js": {"key": "value"}, "ts": null, "tx": null, "jsb": {"key": "value"}, "seq": 4}, "relation": ["public", "demo"]}
111+
{"action": "C"}
112+
{"action": "B", "has_catalog_changes": "f"}
113+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row1", "jsb": null, "seq": 6}, "relation": ["public", "demo"]}
114+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row2", "jsb": null, "seq": 7}, "relation": ["public", "demo"]}
115+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "row3", "jsb": null, "seq": 8}, "relation": ["public", "demo"]}
116+
{"action": "D", "oldtuple": {"ba": null, "js": null, "ts": null, "tx": null, "jsb": null, "seq": 7}, "relation": ["public", "demo"]}
117+
{"action": "U", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "updated", "jsb": null, "seq": 6}, "relation": ["public", "demo"]}
118+
{"action": "C"}
119+
{"action": "B", "has_catalog_changes": "t"}
120+
{"action": "I", "newtuple": {"id": 42}, "relation": ["public", "cat_test"]}
121+
{"action": "C"}
122+
{"action": "B", "has_catalog_changes": "f"}
123+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "1", "jsb": null, "seq": 9}, "relation": ["public", "demo"]}
124+
{"action": "I", "newtuple": {"ba": null, "js": null, "ts": null, "tx": "2", "jsb": null, "seq": 10}, "relation": ["public", "demo"]}
125+
{"action": "C"}
126+
{"action": "B", "has_catalog_changes": "t"}
127+
{"action": "C"}
128+
(28 rows)
129+
130+
TRUNCATE TABLE json_decoding_output;
100131
\i sql/basic_teardown.sql
101132
SELECT 'drop' FROM pg_drop_replication_slot('regression_slot');
102133
?column?

0 commit comments

Comments
 (0)