Skip to content

Commit 97e362e

Browse files
committed
Save slot name in subscription catalog
1 parent f5c01d9 commit 97e362e

File tree

6 files changed

+78
-59
lines changed

6 files changed

+78
-59
lines changed

expected/init.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ SELECT sync_kind, sync_subid, sync_nspname, sync_relname, sync_status FROM pglog
6060
(1 row)
6161

6262
SELECT * FROM pglogical.show_subscription_status();
63-
subscription_name | status | provider_node | provider_dsn | replication_sets | forward_origins
64-
-------------------+-------------+---------------+------------------------------+------------------+-----------------
65-
test_subscription | replicating | test_provider | dbname=regression user=super | {default} |
63+
subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins
64+
-------------------+-------------+---------------+------------------------------+--------------------------------------------+------------------+-----------------
65+
test_subscription | replicating | test_provider | dbname=regression user=super | pgl_postgres_test_provider_test_sube55bf37 | {default} |
6666
(1 row)
6767

6868
-- Make sure we see the slot and active connection

pglogical--1.0.sql

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ CREATE TABLE pglogical.subscription (
2727
sub_origin_if oid NOT NULL REFERENCES node_interface(if_id),
2828
sub_target_if oid NOT NULL REFERENCES node_interface(if_id),
2929
sub_enabled boolean NOT NULL DEFAULT true,
30-
sub_sync_structure boolean DEFAULT true,
31-
sub_sync_data boolean DEFAULT true,
30+
sub_slot_name name NOT NULL,
3231
sub_replication_sets text[],
3332
sub_forward_origins text[],
3433
UNIQUE (sub_origin, sub_target)
@@ -46,7 +45,7 @@ CREATE TABLE pglogical.local_sync_status (
4645

4746
CREATE FUNCTION pglogical.create_node(node_name name, dsn text)
4847
RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_create_node';
49-
CREATE FUNCTION pglogical.drop_node(mode_name name, ifexists boolean DEFAULT false)
48+
CREATE FUNCTION pglogical.drop_node(node_name name, ifexists boolean DEFAULT false)
5049
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_drop_node';
5150

5251
CREATE FUNCTION pglogical.create_subscription(subscription_name name, provider_dsn text,
@@ -68,7 +67,7 @@ RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alte
6867

6968
CREATE FUNCTION pglogical.show_subscription_status(subscription_name name DEFAULT NULL,
7069
OUT subscription_name text, OUT status text, OUT provider_node text,
71-
OUT provider_dsn text, OUT replication_sets text[],
70+
OUT provider_dsn text, OUT slot_name text, OUT replication_sets text[],
7271
OUT forward_origins text[])
7372
RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_status';
7473

@@ -190,6 +189,6 @@ CREATE FUNCTION pglogical.pglogical_node_info(OUT node_id oid, OUT node_name tex
190189
RETURNS record
191190
STABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME';
192191

193-
CREATE FUNCTION pglogical.pglogical_gen_slot_name(name)
192+
CREATE FUNCTION pglogical.pglogical_gen_slot_name(name, name, name)
194193
RETURNS name
195194
IMMUTABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME';

pglogical_create_subscriber.c

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ static char *validate_replication_set_input(char *replication_sets);
8080
static void remove_unwanted_data(PGconn *conn);
8181
static void initialize_replication_origin(PGconn *conn, char *origin_name, char *remote_lsn);
8282
static char *create_restore_point(PGconn *conn, char *restore_point_name);
83-
static char *initialize_replication_slot(PGconn *conn, char *subscriber_name,
84-
bool drop_slot_if_exists);
83+
static char *initialize_replication_slot(PGconn *conn, char *dbname,
84+
char *provider_node_name, char *subscription_name,
85+
bool drop_slot_if_exists);
8586
static void pglogical_subscribe(PGconn *conn, char *subscriber_name,
8687
char *subscriber_dsn,
8788
char *provider_connstr,
@@ -307,6 +308,8 @@ main(int argc, char **argv)
307308
print_msg(VERBOSITY_NORMAL,
308309
_("Creating replication slot ...\n"));
309310
slot_name = initialize_replication_slot(provider_conn,
311+
remote_info->dbname,
312+
remote_info->node_name,
310313
subscriber_name,
311314
drop_slot_if_exists);
312315

@@ -629,7 +632,8 @@ check_data_dir(char *data_dir, RemoteInfo *remoteinfo)
629632
* Initialize replication slots
630633
*/
631634
static char *
632-
initialize_replication_slot(PGconn *conn, char *subscriber_name,
635+
initialize_replication_slot(PGconn *conn, char *dbname,
636+
char *provider_node_name, char *subscription_name,
633637
bool drop_slot_if_exists)
634638
{
635639
PQExpBufferData query;
@@ -639,8 +643,12 @@ initialize_replication_slot(PGconn *conn, char *subscriber_name,
639643
/* Generate the slot name. */
640644
initPQExpBuffer(&query);
641645
printfPQExpBuffer(&query,
642-
"SELECT pglogical.pglogical_gen_slot_name(%s)",
643-
PQescapeLiteral(conn, subscriber_name, strlen(subscriber_name)));
646+
"SELECT pglogical.pglogical_gen_slot_name(%s, %s, %s)",
647+
PQescapeLiteral(conn, dbname, strlen(dbname)),
648+
PQescapeLiteral(conn, provider_node_name,
649+
strlen(provider_node_name)),
650+
PQescapeLiteral(conn, subscription_name,
651+
strlen(subscription_name)));
644652

645653
res = PQexec(conn, query.data);
646654
if (PQresultStatus(res) != PGRES_TUPLES_OK)

pglogical_functions.c

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ PG_FUNCTION_INFO_V1(pglogical_dependency_check_trigger);
9999
PG_FUNCTION_INFO_V1(pglogical_gen_slot_name);
100100
PG_FUNCTION_INFO_V1(pglogical_node_info);
101101

102+
static void gen_slot_name(Name slot_name, char *dbname,
103+
const char *provider_name,
104+
const char *subscriber_name);
105+
102106
/*
103107
* Create new node
104108
*/
@@ -197,6 +201,7 @@ pglogical_create_subscription(PG_FUNCTION_ARGS)
197201
PGlogicalInterface originif;
198202
PGLogicalLocalNode *localnode;
199203
PGlogicalInterface targetif;
204+
NameData slot_name;
200205

201206
/* Check that this is actually a node. */
202207
localnode = get_local_node(true);
@@ -230,6 +235,10 @@ pglogical_create_subscription(PG_FUNCTION_ARGS)
230235
sub.replication_sets = textarray_to_list(rep_set_names);
231236
sub.forward_origins = textarray_to_list(forward_origin_names);
232237
sub.enabled = true;
238+
gen_slot_name(&slot_name, get_database_name(MyDatabaseId),
239+
origin.name, sub_name);
240+
sub.slot_name = pstrdup(NameStr(slot_name));
241+
233242
create_subscription(&sub);
234243

235244
/* Create synchronization status for the subscription. */
@@ -763,8 +772,8 @@ pglogical_show_subscription_status(PG_FUNCTION_ARGS)
763772
{
764773
PGLogicalSubscription *sub = lfirst(lc);
765774
PGLogicalWorker *apply;
766-
Datum values[6];
767-
bool nulls[6];
775+
Datum values[7];
776+
bool nulls[7];
768777
char *status;
769778

770779
memset(values, 0, sizeof(values));
@@ -794,16 +803,17 @@ pglogical_show_subscription_status(PG_FUNCTION_ARGS)
794803
values[1] = CStringGetTextDatum(status);
795804
values[2] = CStringGetTextDatum(sub->origin->name);
796805
values[3] = CStringGetTextDatum(sub->origin_if->dsn);
806+
values[4] = CStringGetTextDatum(sub->slot_name);
797807
if (sub->replication_sets)
798-
values[4] =
808+
values[5] =
799809
PointerGetDatum(strlist_to_textarray(sub->replication_sets));
800810
else
801-
nulls[4] = true;
811+
nulls[5] = true;
802812
if (sub->forward_origins)
803-
values[5] =
813+
values[6] =
804814
PointerGetDatum(strlist_to_textarray(sub->forward_origins));
805815
else
806-
nulls[5] = true;
816+
nulls[6] = true;
807817

808818
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
809819
}
@@ -1306,21 +1316,40 @@ pglogical_node_info(PG_FUNCTION_ARGS)
13061316
Datum
13071317
pglogical_gen_slot_name(PG_FUNCTION_ARGS)
13081318
{
1309-
char *subscription_name = NameStr(*PG_GETARG_NAME(0));
1319+
char *dbname = NameStr(*PG_GETARG_NAME(0));
1320+
char *provider_node_name = NameStr(*PG_GETARG_NAME(1));
1321+
char *subscription_name = NameStr(*PG_GETARG_NAME(2));
13101322
Name slot_name;
1311-
PGLogicalLocalNode *node;
1312-
1313-
node = get_local_node(false);
13141323

13151324
slot_name = (Name) palloc0(NAMEDATALEN);
13161325

1317-
/* This must be same as what is in subscription_fromtuple() */
1318-
snprintf(NameStr(*slot_name), NAMEDATALEN,
1319-
"pgl_%s_%s_%s",
1320-
shorten_hash(get_database_name(MyDatabaseId), 16),
1321-
shorten_hash(node->node->name, 16),
1322-
shorten_hash(subscription_name, 16));
1323-
NameStr(*slot_name)[NAMEDATALEN-1] = '\0';
1326+
gen_slot_name(slot_name, dbname, provider_node_name,
1327+
subscription_name);
13241328

13251329
PG_RETURN_NAME(slot_name);
13261330
}
1331+
1332+
1333+
/*
1334+
* Generate slot name (used also for origin identifier)
1335+
*
1336+
* The current format is:
1337+
* pgl_<subscriber database name>_<provider node name>_<subscription name>
1338+
*
1339+
* Note that we want to leave enough free space for 8 bytes of suffix
1340+
* which in practice means 9 bytes including the underscore.
1341+
*/
1342+
static void
1343+
gen_slot_name(Name slot_name, char *dbname, const char *provider_node,
1344+
const char *subscription_name)
1345+
{
1346+
memset(NameStr(*slot_name), 0, NAMEDATALEN);
1347+
snprintf(NameStr(*slot_name), NAMEDATALEN,
1348+
"pgl_%s_%s_%s",
1349+
shorten_hash(dbname, 16),
1350+
shorten_hash(provider_node, 16),
1351+
shorten_hash(subscription_name, 16));
1352+
1353+
NameStr(*slot_name)[NAMEDATALEN-1] = '\0';
1354+
}
1355+

pglogical_node.c

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,22 @@ typedef struct SubscriptionTuple
8484
Oid sub_origin_if;
8585
Oid sub_target_if;
8686
bool sub_enabled;
87-
bool sub_sync_structure;
88-
bool sub_sync_data;
87+
NameData slot_name;
8988
text sub_replication_sets[1];
9089
text sub_forward_origins[1];
9190
} SubscriptionTuple;
9291

93-
#define Natts_subscription 11
92+
#define Natts_subscription 10
9493
#define Anum_sub_id 1
9594
#define Anum_sub_name 2
9695
#define Anum_sub_origin 3
9796
#define Anum_sub_target 4
9897
#define Anum_sub_origin_if 5
9998
#define Anum_sub_target_if 6
10099
#define Anum_sub_enabled 7
101-
#define Anum_sub_sync_structure 8
102-
#define Anum_sub_sync_data 9
103-
#define Anum_sub_replication_sets 10
104-
#define Anum_sub_forward_origins 11
100+
#define Anum_sub_slot_name 8
101+
#define Anum_sub_replication_sets 9
102+
#define Anum_sub_forward_origins 10
105103

106104
/*
107105
* Add new node to catalog.
@@ -597,6 +595,7 @@ create_subscription(PGLogicalSubscription *sub)
597595
Datum values[Natts_subscription];
598596
bool nulls[Natts_subscription];
599597
NameData sub_name;
598+
NameData slot_name;
600599

601600
if (get_subscription_by_name(sub->name, true) != NULL)
602601
elog(ERROR, "subscription %s already exists", sub->name);
@@ -622,8 +621,8 @@ create_subscription(PGLogicalSubscription *sub)
622621
values[Anum_sub_origin_if - 1] = ObjectIdGetDatum(sub->origin_if->id);
623622
values[Anum_sub_target_if - 1] = ObjectIdGetDatum(sub->target_if->id);
624623
values[Anum_sub_enabled - 1] = BoolGetDatum(sub->enabled);
625-
values[Anum_sub_sync_structure - 1] = BoolGetDatum(sub->sync_structure);
626-
values[Anum_sub_sync_data - 1] = BoolGetDatum(sub->sync_data);
624+
namestrcpy(&slot_name, sub->slot_name);
625+
values[Anum_sub_slot_name - 1] = NameGetDatum(&slot_name);
627626

628627
if (list_length(sub->replication_sets) > 0)
629628
values[Anum_sub_replication_sets - 1] =
@@ -671,6 +670,7 @@ alter_subscription(PGLogicalSubscription *sub)
671670
Datum values[Natts_subscription];
672671
bool nulls[Natts_subscription];
673672
bool replaces[Natts_subscription];
673+
NameData slot_name;
674674

675675
rv = makeRangeVar(EXTENSION_NAME, CATALOG_SUBSCRIPTION, -1);
676676
rel = heap_openrv(rv, RowExclusiveLock);
@@ -706,8 +706,8 @@ alter_subscription(PGLogicalSubscription *sub)
706706
values[Anum_sub_origin_if - 1] = ObjectIdGetDatum(sub->origin_if->id);
707707
values[Anum_sub_target_if - 1] = ObjectIdGetDatum(sub->target_if->id);
708708
values[Anum_sub_enabled - 1] = BoolGetDatum(sub->enabled);
709-
values[Anum_sub_sync_structure - 1] = BoolGetDatum(sub->sync_structure);
710-
values[Anum_sub_sync_data - 1] = BoolGetDatum(sub->sync_data);
709+
namestrcpy(&slot_name, sub->slot_name);
710+
values[Anum_sub_slot_name - 1] = NameGetDatum(&slot_name);
711711

712712
if (list_length(sub->replication_sets) > 0)
713713
values[Anum_sub_replication_sets - 1] =
@@ -785,15 +785,13 @@ subscription_fromtuple(HeapTuple tuple, TupleDesc desc)
785785
SubscriptionTuple *subtup = (SubscriptionTuple *) GETSTRUCT(tuple);
786786
Datum d;
787787
bool isnull;
788-
NameData slot_name;
789788

790789
PGLogicalSubscription *sub =
791790
(PGLogicalSubscription *) palloc(sizeof(PGLogicalSubscription));
792791
sub->id = subtup->sub_id;
793792
sub->name = pstrdup(NameStr(subtup->sub_name));
794793
sub->enabled = subtup->sub_enabled;
795-
sub->sync_structure = subtup->sub_sync_structure;
796-
sub->sync_data = subtup->sub_sync_data;
794+
sub->slot_name = pstrdup(NameStr(subtup->slot_name));
797795

798796
sub->origin = get_node(subtup->sub_origin);
799797
sub->target = get_node(subtup->sub_target);
@@ -822,19 +820,6 @@ subscription_fromtuple(HeapTuple tuple, TupleDesc desc)
822820
sub->forward_origins = forward_origin_names;
823821
}
824822

825-
/*
826-
* Generate slot name, leaving 8 bytes for suffix
827-
* (_suffix is actually 9 bytes)
828-
*/
829-
snprintf(NameStr(slot_name), NAMEDATALEN,
830-
"pgl_%s_%s_%s",
831-
shorten_hash(get_database_name(MyDatabaseId), 16),
832-
shorten_hash(sub->origin->name, 16),
833-
shorten_hash(sub->name, 16));
834-
NameStr(slot_name)[NAMEDATALEN-1] = '\0';
835-
836-
sub->slot_name = pstrdup(NameStr(slot_name));
837-
838823
return sub;
839824
}
840825

pglogical_node.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ typedef struct PGLogicalSubscription
4242
PGlogicalInterface *origin_if;
4343
PGlogicalInterface *target_if;
4444
bool enabled;
45-
bool sync_structure;
46-
bool sync_data;
4745
List *replication_sets;
4846
List *forward_origins;
4947
char *slot_name;

0 commit comments

Comments
 (0)