Skip to content

Commit 87af48e

Browse files
committed
Move the queued ddl replication do ddl_sql repset
Bump the version to 1.0.1
1 parent a46029c commit 87af48e

File tree

8 files changed

+241
-8
lines changed

8 files changed

+241
-8
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ MODULE_big = pglogical
44
EXTENSION = pglogical
55
PGFILEDESC = "pglogical - logical replication"
66

7-
DATA = pglogical--1.0.0.sql
7+
DATA = pglogical--1.0.0.sql pglogical--1.0.1.sql pglogical--1.0.0--1.0.1.sql
88

99
OBJS = pglogical_apply.o pglogical_conflict.o pglogical_manager.o \
1010
pglogical_node.o pglogical_proto.o pglogical_relcache.o \

expected/init.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ SELECT sync_kind, sync_subid, sync_nspname, sync_relname, sync_status FROM pglog
8181
(1 row)
8282

8383
SELECT * FROM pglogical.show_subscription_status();
84-
subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins
85-
-------------------+-------------+---------------+------------------------------+--------------------------------------------+-------------------------------+-----------------
86-
test_subscription | replicating | test_provider | dbname=regression user=super | pgl_postgres_test_provider_test_sube55bf37 | {default,default_insert_only} |
84+
subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins
85+
-------------------+-------------+---------------+------------------------------+--------------------------------------------+---------------------------------------+-----------------
86+
test_subscription | replicating | test_provider | dbname=regression user=super | pgl_postgres_test_provider_test_sube55bf37 | {default,default_insert_only,ddl_sql} |
8787
(1 row)
8888

8989
-- Make sure we see the slot and active connection

expected/replication_set.out

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,5 +215,6 @@ SELECT * FROM pglogical.replication_set;
215215
------------+------------+---------------------+------------------+------------------+------------------+--------------------
216216
828867312 | 1755434425 | default | t | t | t | t
217217
3318003856 | 1755434425 | default_insert_only | t | f | f | t
218-
(2 rows)
218+
2796587818 | 1755434425 | ddl_sql | t | f | f | f
219+
(3 rows)
219220

pglogical--1.0.0--1.0.1.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
CREATE OR REPLACE FUNCTION pglogical.create_subscription(subscription_name name, provider_dsn text,
2+
replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = true,
3+
synchronize_data boolean = true, forward_origins text[] = '{all}')
4+
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_subscription';
5+
6+
SELECT * FROM pglogical.create_replication_set('ddl_sql', true, false, false, false);
7+
8+
UPDATE pglogical.subscription SET sub_replication_sets = array_append(sub_replication_sets, 'ddl_sql');
9+
10+
BEGIN;
11+
WITH applys AS (
12+
SELECT sub_name FROM pglogical.subscription WHERE sub_enabled
13+
),
14+
WITH disable AS (
15+
SELECT pglogical.alter_subscription_disable(sub_name, true) FROM applys
16+
)
17+
SELECT pglogical.alter_subscription_enable(sub_name, true) FROM applys;
18+
COMMIT;

pglogical--1.0.1.sql

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
\echo Use "CREATE EXTENSION pglogical_origin" to load this file. \quit
2+
3+
CREATE TABLE pglogical.node (
4+
node_id oid NOT NULL PRIMARY KEY,
5+
node_name name NOT NULL UNIQUE
6+
) WITH (user_catalog_table=true);
7+
8+
CREATE TABLE pglogical.node_interface (
9+
if_id oid NOT NULL PRIMARY KEY,
10+
if_name name NOT NULL, -- default same as node name
11+
if_nodeid oid REFERENCES node(node_id),
12+
if_dsn text NOT NULL,
13+
UNIQUE (if_nodeid, if_name)
14+
);
15+
16+
CREATE TABLE pglogical.local_node (
17+
node_id oid PRIMARY KEY REFERENCES node(node_id),
18+
node_local_interface oid NOT NULL REFERENCES node_interface(if_id)
19+
);
20+
21+
-- Currently we allow only one node record per database
22+
CREATE UNIQUE INDEX local_node_onlyone ON pglogical.local_node ((true));
23+
24+
CREATE TABLE pglogical.subscription (
25+
sub_id oid NOT NULL PRIMARY KEY,
26+
sub_name name NOT NULL UNIQUE,
27+
sub_origin oid NOT NULL REFERENCES node(node_id),
28+
sub_target oid NOT NULL REFERENCES node(node_id),
29+
sub_origin_if oid NOT NULL REFERENCES node_interface(if_id),
30+
sub_target_if oid NOT NULL REFERENCES node_interface(if_id),
31+
sub_enabled boolean NOT NULL DEFAULT true,
32+
sub_slot_name name NOT NULL,
33+
sub_replication_sets text[],
34+
sub_forward_origins text[],
35+
UNIQUE (sub_origin, sub_target)
36+
);
37+
38+
CREATE TABLE pglogical.local_sync_status (
39+
sync_kind "char" NOT NULL CHECK (sync_kind IN ('i', 's', 'd', 'f')),
40+
sync_subid oid NOT NULL REFERENCES pglogical.subscription(sub_id),
41+
sync_nspname name,
42+
sync_relname name,
43+
sync_status "char" NOT NULL,
44+
UNIQUE (sync_subid, sync_nspname, sync_relname)
45+
);
46+
47+
48+
CREATE FUNCTION pglogical.create_node(node_name name, dsn text)
49+
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_node';
50+
CREATE FUNCTION pglogical.drop_node(node_name name, ifexists boolean DEFAULT false)
51+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_node';
52+
53+
CREATE FUNCTION pglogical.create_subscription(subscription_name name, provider_dsn text,
54+
replication_sets text[] = '{default,default_insert_only,ddl_sql}', synchronize_structure boolean = true,
55+
synchronize_data boolean = true, forward_origins text[] = '{all}')
56+
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_subscription';
57+
CREATE FUNCTION pglogical.drop_subscription(subscription_name name, ifexists boolean DEFAULT false)
58+
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_subscription';
59+
60+
CREATE FUNCTION pglogical.alter_subscription_disable(subscription_name name, immediate boolean DEFAULT false)
61+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_disable';
62+
CREATE FUNCTION pglogical.alter_subscription_enable(subscription_name name, immediate boolean DEFAULT false)
63+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_enable';
64+
65+
CREATE FUNCTION pglogical.alter_subscription_add_replication_set(subscription_name name, replication_set name)
66+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_add_replication_set';
67+
CREATE FUNCTION pglogical.alter_subscription_remove_replication_set(subscription_name name, replication_set name)
68+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_remove_replication_set';
69+
70+
CREATE FUNCTION pglogical.show_subscription_status(subscription_name name DEFAULT NULL,
71+
OUT subscription_name text, OUT status text, OUT provider_node text,
72+
OUT provider_dsn text, OUT slot_name text, OUT replication_sets text[],
73+
OUT forward_origins text[])
74+
RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_status';
75+
76+
CREATE TABLE pglogical.replication_set (
77+
set_id oid NOT NULL PRIMARY KEY,
78+
set_nodeid oid NOT NULL,
79+
set_name name NOT NULL,
80+
replicate_insert boolean NOT NULL DEFAULT true,
81+
replicate_update boolean NOT NULL DEFAULT true,
82+
replicate_delete boolean NOT NULL DEFAULT true,
83+
replicate_truncate boolean NOT NULL DEFAULT true,
84+
UNIQUE (set_nodeid, set_name)
85+
) WITH (user_catalog_table=true);
86+
87+
CREATE TABLE pglogical.replication_set_table (
88+
set_id integer NOT NULL,
89+
set_reloid regclass NOT NULL,
90+
PRIMARY KEY(set_id, set_reloid)
91+
) WITH (user_catalog_table=true);
92+
93+
CREATE VIEW pglogical.TABLES AS
94+
WITH set_tables AS (
95+
SELECT s.set_name, t.set_reloid
96+
FROM pglogical.replication_set_table t,
97+
pglogical.replication_set s,
98+
pglogical.local_node n
99+
WHERE s.set_nodeid = n.node_id
100+
AND s.set_id = t.set_id
101+
),
102+
user_tables AS (
103+
SELECT r.oid, n.nspname, r.relname, r.relreplident
104+
FROM pg_catalog.pg_class r,
105+
pg_catalog.pg_namespace n
106+
WHERE r.relkind = 'r'
107+
AND r.relpersistence = 'p'
108+
AND n.oid = r.relnamespace
109+
AND n.nspname !~ '^pg_'
110+
AND n.nspname != 'information_schema'
111+
AND n.nspname != 'pglogical'
112+
)
113+
SELECT n.nspname, r.relname, s.set_name
114+
FROM pg_catalog.pg_namespace n,
115+
pg_catalog.pg_class r,
116+
set_tables s
117+
WHERE r.relkind = 'r'
118+
AND n.oid = r.relnamespace
119+
AND r.oid = s.set_reloid
120+
UNION
121+
SELECT t.nspname, t.relname, NULL
122+
FROM user_tables t
123+
WHERE t.oid NOT IN (SELECT set_reloid FROM set_tables);
124+
125+
CREATE FUNCTION pglogical.create_replication_set(set_name name,
126+
replicate_insert boolean = true, replicate_update boolean = true,
127+
replicate_delete boolean = true, replicate_truncate boolean = true)
128+
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_replication_set';
129+
CREATE FUNCTION pglogical.alter_replication_set(set_name name,
130+
replicate_insert boolean DEFAULT NULL, replicate_update boolean DEFAULT NULL,
131+
replicate_delete boolean DEFAULT NULL, replicate_truncate boolean DEFAULT NULL)
132+
RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_replication_set';
133+
CREATE FUNCTION pglogical.drop_replication_set(set_name name, ifexists boolean DEFAULT false)
134+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_replication_set';
135+
136+
CREATE FUNCTION pglogical.replication_set_add_table(set_name name, relation regclass, synchronize boolean DEFAULT false)
137+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_table';
138+
CREATE FUNCTION pglogical.replication_set_add_all_tables(set_name name, schema_names text[], synchronize boolean DEFAULT false)
139+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_add_all_tables';
140+
CREATE FUNCTION pglogical.replication_set_remove_table(set_name name, relation regclass)
141+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replication_set_remove_table';
142+
143+
CREATE FUNCTION pglogical.alter_subscription_synchronize(subscription_name name, truncate boolean DEFAULT false)
144+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_synchronize';
145+
146+
CREATE FUNCTION pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)
147+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_resynchronize_table';
148+
149+
CREATE FUNCTION pglogical.show_subscription_table(subscription_name name, relation regclass, OUT nspname text, OUT relname text, OUT status text)
150+
RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_table';
151+
152+
CREATE TABLE pglogical.queue (
153+
queued_at timestamp with time zone NOT NULL,
154+
role name NOT NULL,
155+
replication_sets text[],
156+
message_type "char" NOT NULL,
157+
message json NOT NULL
158+
);
159+
160+
CREATE FUNCTION pglogical.replicate_ddl_command(command text)
161+
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_replicate_ddl_command';
162+
163+
CREATE OR REPLACE FUNCTION pglogical.queue_truncate()
164+
RETURNS trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_queue_truncate';
165+
166+
CREATE OR REPLACE FUNCTION pglogical.truncate_trigger_add()
167+
RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_truncate_trigger_add';
168+
169+
CREATE EVENT TRIGGER pglogical_truncate_trigger_add
170+
ON ddl_command_end
171+
WHEN TAG IN ('CREATE TABLE', 'CREATE TABLE AS')
172+
EXECUTE PROCEDURE pglogical.truncate_trigger_add();
173+
174+
CREATE OR REPLACE FUNCTION pglogical.dependency_check_trigger()
175+
RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_dependency_check_trigger';
176+
177+
CREATE EVENT TRIGGER pglogical_dependency_check_trigger
178+
ON sql_drop
179+
EXECUTE PROCEDURE pglogical.dependency_check_trigger();
180+
181+
CREATE FUNCTION pglogical.pglogical_hooks_setup(internal)
182+
RETURNS void
183+
STABLE LANGUAGE c AS 'MODULE_PATHNAME';
184+
185+
CREATE FUNCTION pglogical.pglogical_node_info(OUT node_id oid, OUT node_name text, OUT sysid text, OUT dbname text, OUT replication_sets text)
186+
RETURNS record
187+
STABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME';
188+
189+
CREATE FUNCTION pglogical.pglogical_gen_slot_name(name, name, name)
190+
RETURNS name
191+
IMMUTABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME';
192+
193+
CREATE FUNCTION pglogical_version() RETURNS text
194+
LANGUAGE c AS 'MODULE_PATHNAME';
195+
196+
CREATE FUNCTION pglogical_version_num() RETURNS integer
197+
LANGUAGE c AS 'MODULE_PATHNAME';
198+
199+
CREATE FUNCTION pglogical_max_proto_version() RETURNS integer
200+
LANGUAGE c AS 'MODULE_PATHNAME';
201+
202+
CREATE FUNCTION pglogical_min_proto_version() RETURNS integer
203+
LANGUAGE c AS 'MODULE_PATHNAME';

pglogical.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
#include "pglogical_compat.h"
2727
#endif
2828

29-
#define PGLOGICAL_VERSION "1.0.0"
30-
#define PGLOGICAL_VERSION_NUM 10000
29+
#define PGLOGICAL_VERSION "1.0.1"
30+
#define PGLOGICAL_VERSION_NUM 10001
3131

3232
#define PGLOGICAL_MIN_PROTO_VERSION_NUM 1
3333
#define PGLOGICAL_MAX_PROTO_VERSION_NUM 1

pglogical_functions.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,15 @@ pglogical_create_node(PG_FUNCTION_ARGS)
153153
repset.replicate_truncate = true;
154154
create_replication_set(&repset);
155155

156+
repset.id = InvalidOid;
157+
repset.nodeid = node.id;
158+
repset.name = DDL_SQL_REPSET_NAME;
159+
repset.replicate_insert = true;
160+
repset.replicate_update = false;
161+
repset.replicate_delete = false;
162+
repset.replicate_truncate = false;
163+
create_replication_set(&repset);
164+
156165
create_local_node(node.id, nodeif.id);
157166

158167
PG_RETURN_OID(node.id);
@@ -1199,7 +1208,8 @@ pglogical_replicate_ddl_command(PG_FUNCTION_ARGS)
11991208
* Note, we keep "DDL" message type for the future when we have deparsing
12001209
* support.
12011210
*/
1202-
queue_message(NULL, GetUserId(), QUEUE_COMMAND_TYPE_SQL, cmd.data);
1211+
queue_message(list_make1(DDL_SQL_REPSET_NAME), GetUserId(),
1212+
QUEUE_COMMAND_TYPE_SQL, cmd.data);
12031213

12041214
/* Execute the query locally. */
12051215
pglogical_execute_sql_command(query, GetUserNameFromId(GetUserId()

pglogical_repset.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ typedef struct PGLogicalRepSet
2828

2929
#define DEFAULT_REPSET_NAME "default"
3030
#define DEFAULT_INSONLY_REPSET_NAME "default_insert_only"
31+
#define DDL_SQL_REPSET_NAME "ddl_sql"
3132

3233
/* This is only valid within one output plugin instance/walsender. */
3334
typedef struct PGLogicalRepSetRelation

0 commit comments

Comments
 (0)