Skip to content

Commit 1b28c01

Browse files
committed
Remove forward_changesets from pglogical_output
Use a hook if you want this functionality.
1 parent f19fc53 commit 1b28c01

File tree

9 files changed

+30
-92
lines changed

9 files changed

+30
-92
lines changed

contrib/pglogical_output/README.md

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -222,42 +222,32 @@ It's possible to use `pglogical_output` to cascade replication between multiple
222222
PostgreSQL servers, in combination with an appropriate client to apply the
223223
changes to the downstreams.
224224

225-
There are three forwarding modes:
225+
There are two forwarding modes:
226226

227227
* Forward everything. Transactions are replicated whether they were made directly
228228
on the immediate upstream or some other node upstream of it. All rows from all
229229
transactions are sent.
230230

231-
Selected by setting `forward_changesets` to true (default) and not setting a
232-
row or transaction filter hook.
233-
234-
* No forwarding. Only transactions applied immediately on the upstream node are
235-
forwarded. Transactions with any non-local origin are skipped. All rows from
236-
locally originated transactions are sent.
237-
238-
Selected by setting `forward_changesets` to false. Remember to confirm by
239-
checking the startup reply message.
231+
Selected by not setting a row or transaction filter hook.
240232

241233
* Filtered forwarding. Transactions are replicated unless a client-supplied
242234
transaction filter hook says to skip this transaction. Row changes are
243235
replicated unless the client-supplied row filter hook (if provided) says to
244236
skip that row.
245237

246-
Selected by setting `forward_changesets` to `true` and installing a
247-
transaction and/or row filter hook (see "hooks").
248-
249-
If the upstream server is 9.5 or newer and `forward_changesets` is enabled, the
250-
server will enable changeset origin information. It will set
251-
`forward_changeset_origins` to true in the startup reply message to indicate
252-
this. It will then send changeset origin messages after the `BEGIN` for each
253-
transaction, per the protocol documentation. Origin messages are omitted for
254-
transactions originating directly on the immediate upstream to save bandwidth.
255-
If `forward_changeset_origins` is true then transactions without an origin are
256-
always from the immediate upstream that’s running the decoding plugin.
257-
258-
Note that changeset forwarding may be forced to on if not requested by some
259-
servers, so the client _should_ check the forward_changesets and
260-
`forward_changeset_origins` params in the startup reply message.
238+
Selected by installing a transaction and/or row filter hook (see "hooks").
239+
240+
If the upstream server is 9.5 or newer the server will enable changeset origin
241+
information. It will set `forward_changeset_origins` to true in the startup
242+
reply message to indicate this. It will then send changeset origin messages
243+
after the `BEGIN` for each transaction, per the protocol documentation. Origin
244+
messages are omitted for transactions originating directly on the immediate
245+
upstream to save bandwidth. If `forward_changeset_origins` is true then
246+
transactions without an origin are always from the immediate upstream that’s
247+
running the decoding plugin.
248+
249+
Note that 9.4 servers can't expose replication origin information so they pass
250+
zero to the row filter hook and don't call the transaction filter hook.
261251

262252
Clients may use this facility to form arbitrarily complex topologies when
263253
combined with hooks to determine which transactions are forwarded. An obvious

contrib/pglogical_output/doc/protocol.txt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,7 @@ Since all parameter values are sent as strings, the value types given below spec
393393
|binary|_set of parameters, specified separately_|See “_the __‘binary’__ parameters_” below, and “_Parameters relating to exchange of binary values_”
394394
|database_encoding|string|The native text encoding of the database the plugin is running in
395395
|encoding|string|Field values for textual data will be in this encoding in native protocol text, binary or internal representation. For the native protocol this is currently always the same as `database_encoding`. For text-mode json protocol this is always the same as `client_encoding`.
396-
|forward_changesets|bool|Specifies that all transactions, not just those originating on the upstream, will be forwarded. See “_Changeset forwarding_”.
397-
|forward_changeset_origins|bool|Tells the client that the server will send changeset origin information. Independent of forward_changesets. See “_Changeset forwarding_” for details.
396+
|forward_changeset_origins|bool|Tells the client that the server will send changeset origin information. See “_Changeset forwarding_” for details.
398397
|no_txinfo|bool|Requests that variable transaction info such as XIDs, LSNs, and timestamps be omitted from output. Mainly for tests. Currently ignored for protos other than json.
399398
|===
400399

@@ -472,7 +471,6 @@ Because these versions are expected to be incremented, to make it clear that the
472471
|*Key*|*Type*|*Default*|*Notes*
473472

474473
|expected_encoding|string|null|The text encoding the downstream expects field values to be in. Applies to text, binary and internal representations of field values in native format. Has no effect on other protocol content. If specified, the upstream must honour it. For json protocol, must be unset or match `client_encoding`. (Current plugin versions ERROR if this is set for the native protocol and not equal to the upstream database's encoding).
475-
|forward_changesets|bool|false|Request that all transactions, not just those originating on the upstream, be forwarded. See “_Changeset forwarding_”.
476474
|want_coltypes|boolean|false|The client wants to receive data type information about columns.
477475
|===
478476

contrib/pglogical_output/expected/basic_json.out

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,15 @@ SELECT * FROM get_startup_params();
8383
coltypes | "f"
8484
database_encoding | "UTF8"
8585
encoding | "UTF8"
86-
forward_changeset_origins | "f"
87-
forward_changesets | "f"
86+
forward_changeset_origins | "t"
8887
hooks.row_filter_enabled | "f"
8988
hooks.shutdown_hook_enabled | "f"
9089
hooks.startup_hook_enabled | "f"
9190
hooks.transaction_filter_enabled | "f"
9291
max_proto_version | "1"
9392
min_proto_version | "1"
9493
no_txinfo | "t"
95-
(19 rows)
94+
(18 rows)
9695

9796
SELECT * FROM get_queued_data();
9897
data

contrib/pglogical_output/expected/basic_json_1.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ FROM pg_logical_slot_peek_changes('regression_slot',
6868
'no_txinfo', 't');
6969
data
7070
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
71-
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"t","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"f","hooks.shutdown_hook_enabled":"f","hooks.row_filter_enabled":"f","hooks.transaction_filter_enabled":"f"}}
71+
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changeset_origins":"t","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"f","hooks.shutdown_hook_enabled":"f","hooks.row_filter_enabled":"f","hooks.transaction_filter_enabled":"f"}}
7272
{"action":"B", "has_catalog_changes":"f"}
7373
{"action":"I","relation":["public","demo"],"newtuple":{"seq":1,"tx":"textval","ts":null,"jsb":null,"js":null,"ba":null}}
7474
{"action":"C"}

contrib/pglogical_output/expected/hooks_json.out

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,15 @@ SELECT * FROM get_startup_params();
6464
coltypes | "f"
6565
database_encoding | "UTF8"
6666
encoding | "UTF8"
67-
forward_changeset_origins | "f"
68-
forward_changesets | "f"
67+
forward_changeset_origins | "t"
6968
hooks.row_filter_enabled | "t"
7069
hooks.shutdown_hook_enabled | "t"
7170
hooks.startup_hook_enabled | "t"
7271
hooks.transaction_filter_enabled | "t"
7372
max_proto_version | "1"
7473
min_proto_version | "1"
7574
no_txinfo | "t"
76-
(19 rows)
75+
(18 rows)
7776

7877
SELECT * FROM get_queued_data();
7978
data
@@ -134,16 +133,15 @@ SELECT * FROM get_startup_params();
134133
coltypes | "f"
135134
database_encoding | "UTF8"
136135
encoding | "UTF8"
137-
forward_changeset_origins | "f"
138-
forward_changesets | "f"
136+
forward_changeset_origins | "t"
139137
hooks.row_filter_enabled | "t"
140138
hooks.shutdown_hook_enabled | "t"
141139
hooks.startup_hook_enabled | "t"
142140
hooks.transaction_filter_enabled | "t"
143141
max_proto_version | "1"
144142
min_proto_version | "1"
145143
no_txinfo | "t"
146-
(19 rows)
144+
(18 rows)
147145

148146
SELECT * FROM get_queued_data();
149147
data

contrib/pglogical_output/expected/hooks_json_1.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ FROM pg_logical_slot_peek_changes('regression_slot',
4949
'no_txinfo', 't');
5050
data
5151
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
52-
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"t","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"t","hooks.shutdown_hook_enabled":"t","hooks.row_filter_enabled":"t","hooks.transaction_filter_enabled":"t"}}
52+
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changeset_origins":"t","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"t","hooks.shutdown_hook_enabled":"t","hooks.row_filter_enabled":"t","hooks.transaction_filter_enabled":"t"}}
5353
{"action":"B", "has_catalog_changes":"f"}
5454
{"action":"C"}
5555
{"action":"B", "has_catalog_changes":"f"}
@@ -89,7 +89,7 @@ FROM pg_logical_slot_peek_changes('regression_slot',
8989
'no_txinfo', 't');
9090
data
9191
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
92-
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changesets":"t","forward_changeset_origins":"f","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"t","hooks.shutdown_hook_enabled":"t","hooks.row_filter_enabled":"t","hooks.transaction_filter_enabled":"t"}}
92+
{"action":"S", "params": {"max_proto_version":"1","min_proto_version":"1","coltypes":"f","pg_version_num":"90405","pg_version":"9.4.5","pg_catversion":"201409291","database_encoding":"UTF8","encoding":"UTF8","forward_changeset_origins":"t","binary.internal_basetypes":"f","binary.binary_basetypes":"f","binary.basetypes_major_version":"904","binary.sizeof_int":"4","binary.sizeof_long":"8","binary.sizeof_datum":"8","binary.maxalign":"8","binary.bigendian":"f","binary.float4_byval":"t","binary.float8_byval":"t","binary.integer_datetimes":"t","binary.binary_pg_version":"904","no_txinfo":"t","hooks.startup_hook_enabled":"t","hooks.shutdown_hook_enabled":"t","hooks.row_filter_enabled":"t","hooks.transaction_filter_enabled":"t"}}
9393
{"action":"B", "has_catalog_changes":"f"}
9494
{"action":"I","relation":["public","test_filter"],"newtuple":{"id":1}}
9595
{"action":"I","relation":["public","test_filter"],"newtuple":{"id":2}}

contrib/pglogical_output/pglogical_config.c

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ enum {
7070
PARAM_BINARY_WANT_BINARY_BASETYPES,
7171
PARAM_BINARY_BASETYPES_MAJOR_VERSION,
7272
PARAM_PG_VERSION,
73-
PARAM_FORWARD_CHANGESETS,
7473
PARAM_HOOKS_SETUP_FUNCTION,
7574
PARAM_NO_TXINFO
7675
} OutputPluginParamKey;
@@ -97,7 +96,6 @@ static OutputPluginParam param_lookup[] = {
9796
{"binary.want_binary_basetypes", PARAM_BINARY_WANT_BINARY_BASETYPES},
9897
{"binary.basetypes_major_version", PARAM_BINARY_BASETYPES_MAJOR_VERSION},
9998
{"pg_version", PARAM_PG_VERSION},
100-
{"forward_changesets", PARAM_FORWARD_CHANGESETS},
10199
{"hooks.setup_function", PARAM_HOOKS_SETUP_FUNCTION},
102100
{"no_txinfo", PARAM_NO_TXINFO},
103101
{NULL, PARAM_UNRECOGNISED}
@@ -210,18 +208,6 @@ process_parameters_v1(List *options, PGLogicalOutputData *data)
210208
data->client_pg_version = DatumGetUInt32(val);
211209
break;
212210

213-
case PARAM_FORWARD_CHANGESETS:
214-
/*
215-
* Check to see if the client asked for changeset forwarding
216-
*
217-
* Note that we cannot support this on 9.4. We'll tell the client
218-
* in the startup reply message.
219-
*/
220-
val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
221-
data->client_forward_changesets_set = true;
222-
data->client_forward_changesets = DatumGetBool(val);
223-
break;
224-
225211
case PARAM_BINARY_WANT_INTERNAL_BASETYPES:
226212
/* check if we want to use internal data representation */
227213
val = get_param_value(elem, false, OUTPUT_PARAM_TYPE_BOOL);
@@ -444,8 +430,6 @@ prepare_startup_message(PGLogicalOutputData *data)
444430

445431
l = add_startup_msg_s(l, "encoding", (char*)pg_encoding_to_char(data->field_datum_encoding));
446432

447-
l = add_startup_msg_b(l, "forward_changesets",
448-
data->forward_changesets);
449433
l = add_startup_msg_b(l, "forward_changeset_origins",
450434
data->forward_changeset_origins);
451435

0 commit comments

Comments
 (0)