Skip to content

Commit ad05a52

Browse files
committed
Add show_subscription_status to show info about local subscriptions
1 parent 8a6de8a commit ad05a52

File tree

7 files changed

+150
-4
lines changed

7 files changed

+150
-4
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ Nodes can be added and removed dynamically using the SQL interfaces.
181181
- `subscription_name` - name of the existing subscription
182182
- `relation` - name of existing table, optionally qualified
183183

184+
- `pglogical.show_subscription_status(subscription_name name)`
185+
Shows status and basic information about subscription.
186+
187+
Parameters:
188+
- `subscription_name` - optional name of the existing subscription, when no
189+
name was provided, the function will show status for all subscriptions on
190+
local node
191+
184192
- `pglogical.show_subscription_table(subscription_name name,
185193
relation regclass)`
186194
Shows synchronization status of a table.

expected/init.out

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ SELECT sync_kind, sync_subid, sync_nspname, sync_relname, sync_status FROM pglog
5959
f | 3848008564 | | | r
6060
(1 row)
6161

62+
SELECT * FROM pglogical.show_subscription_status();
63+
subscription_name | status | provider_node | provider_dsn | replication_sets | forward_origins
64+
-------------------+-------------+---------------+------------------------------+------------------+-----------------
65+
test_subscription | replicating | test_provider | dbname=regression user=super | {default} |
66+
(1 row)
67+
6268
-- Make sure we see the slot and active connection
6369
\c regression
6470
SELECT plugin, slot_type, database, active FROM pg_replication_slots;

pglogical--1.0.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alte
6666
CREATE FUNCTION pglogical.alter_subscription_remove_replication_set(subscription_name name, replication_set name)
6767
RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_alter_subscription_remove_replication_set';
6868

69+
CREATE FUNCTION pglogical.show_subscription_status(subscription_name name DEFAULT NULL,
70+
OUT subscription_name text, OUT status text, OUT provider_node text,
71+
OUT provider_dsn text, OUT replication_sets text[],
72+
OUT forward_origins text[])
73+
RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'pglogical_show_subscription_status';
74+
6975
CREATE TABLE pglogical.replication_set (
7076
set_id oid NOT NULL PRIMARY KEY,
7177
set_nodeid oid NOT NULL,

pglogical_functions.c

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ PG_FUNCTION_INFO_V1(pglogical_alter_subscription_synchronize);
8080
PG_FUNCTION_INFO_V1(pglogical_alter_subscription_resynchronize_table);
8181

8282
PG_FUNCTION_INFO_V1(pglogical_show_subscription_table);
83+
PG_FUNCTION_INFO_V1(pglogical_show_subscription_status);
8384

8485
/* Replication set manipulation. */
8586
PG_FUNCTION_INFO_V1(pglogical_create_replication_set);
@@ -699,6 +700,119 @@ pglogical_show_subscription_table(PG_FUNCTION_ARGS)
699700
PG_RETURN_VOID();
700701
}
701702

703+
/*
704+
* Show info about subscribtion.
705+
*/
706+
Datum
707+
pglogical_show_subscription_status(PG_FUNCTION_ARGS)
708+
{
709+
List *subscriptions;
710+
ListCell *lc;
711+
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
712+
TupleDesc tupdesc;
713+
Tuplestorestate *tupstore;
714+
PGLogicalLocalNode *node;
715+
MemoryContext per_query_ctx;
716+
MemoryContext oldcontext;
717+
718+
/* check to see if caller supports us returning a tuplestore */
719+
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
720+
ereport(ERROR,
721+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
722+
errmsg("set-valued function called in context that cannot accept a set")));
723+
if (!(rsinfo->allowedModes & SFRM_Materialize))
724+
ereport(ERROR,
725+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
726+
errmsg("materialize mode required, but it is not " \
727+
"allowed in this context")));
728+
729+
node = get_local_node(true);
730+
if (!node)
731+
ereport(ERROR,
732+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
733+
errmsg("current database is not configured as pglogical node"),
734+
errhint("create pglogical node first")));
735+
736+
if (PG_ARGISNULL(0))
737+
{
738+
subscriptions = get_node_subscriptions(node->node->id, false);
739+
}
740+
else
741+
{
742+
PGLogicalSubscription *sub;
743+
sub = get_subscription_by_name(NameStr(*PG_GETARG_NAME(0)), false);
744+
subscriptions = list_make1(sub);
745+
}
746+
747+
/* Switch into long-lived context to construct returned data structures */
748+
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
749+
oldcontext = MemoryContextSwitchTo(per_query_ctx);
750+
751+
/* Build a tuple descriptor for our result type */
752+
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
753+
elog(ERROR, "return type must be a row type");
754+
755+
tupstore = tuplestore_begin_heap(true, false, work_mem);
756+
rsinfo->returnMode = SFRM_Materialize;
757+
rsinfo->setResult = tupstore;
758+
rsinfo->setDesc = tupdesc;
759+
760+
MemoryContextSwitchTo(oldcontext);
761+
762+
foreach (lc, subscriptions)
763+
{
764+
PGLogicalSubscription *sub = lfirst(lc);
765+
PGLogicalWorker *apply;
766+
Datum values[6];
767+
bool nulls[6];
768+
char *status;
769+
770+
memset(values, 0, sizeof(values));
771+
memset(nulls, 0, sizeof(nulls));
772+
773+
LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE);
774+
apply = pglogical_apply_find(MyDatabaseId, sub->id);
775+
if (pglogical_worker_running(apply))
776+
{
777+
PGLogicalSyncStatus *sync;
778+
sync = get_subscription_sync_status(sub->id, true);
779+
780+
if (!sync)
781+
status = "unknown";
782+
else if (sync->status == SYNC_STATUS_READY)
783+
status = "replicating";
784+
else
785+
status = "initializing";
786+
}
787+
else if (!sub->enabled)
788+
status = "disabled";
789+
else
790+
status = "down";
791+
LWLockRelease(PGLogicalCtx->lock);
792+
793+
values[0] = CStringGetTextDatum(sub->name);
794+
values[1] = CStringGetTextDatum(status);
795+
values[2] = CStringGetTextDatum(sub->origin->name);
796+
values[3] = CStringGetTextDatum(sub->origin_if->dsn);
797+
if (sub->replication_sets)
798+
values[4] =
799+
PointerGetDatum(strlist_to_textarray(sub->replication_sets));
800+
else
801+
nulls[4] = true;
802+
if (sub->forward_origins)
803+
values[5] =
804+
PointerGetDatum(strlist_to_textarray(sub->forward_origins));
805+
else
806+
nulls[5] = true;
807+
808+
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
809+
}
810+
811+
tuplestore_donestoring(tupstore);
812+
813+
PG_RETURN_VOID();
814+
}
815+
702816
/*
703817
* Create new replication set.
704818
*/

pglogical_sync.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ pglogical_sync_subscription(PGLogicalSubscription *sub)
470470

471471
StartTransactionCommand();
472472
oldctx = MemoryContextSwitchTo(myctx);
473-
sync = get_subscription_sync_status(sub->id);
473+
sync = get_subscription_sync_status(sub->id, false);
474474
MemoryContextSwitchTo(oldctx);
475475
CommitTransactionCommand();
476476

@@ -631,7 +631,7 @@ pglogical_sync_table(PGLogicalSubscription *sub, RangeVar *table)
631631
StartTransactionCommand();
632632

633633
/* Sanity check. */
634-
sync = get_subscription_sync_status(sub->id);
634+
sync = get_subscription_sync_status(sub->id, false);
635635
if (sync->status != SYNC_STATUS_READY)
636636
{
637637
elog(ERROR,
@@ -925,7 +925,7 @@ syncstatus_fromtuple(HeapTuple tuple, TupleDesc desc)
925925

926926
/* Get the sync status for a subscription. */
927927
PGLogicalSyncStatus *
928-
get_subscription_sync_status(Oid subid)
928+
get_subscription_sync_status(Oid subid, bool missing_ok)
929929
{
930930
PGLogicalSyncStatus *sync;
931931
RangeVar *rv;
@@ -953,7 +953,16 @@ get_subscription_sync_status(Oid subid)
953953
}
954954

955955
if (!HeapTupleIsValid(tuple))
956+
{
957+
if (missing_ok)
958+
{
959+
systable_endscan(scan);
960+
heap_close(rel, RowExclusiveLock);
961+
return NULL;
962+
}
963+
956964
elog(ERROR, "subscription %u status not found", subid);
965+
}
957966

958967
sync = syncstatus_fromtuple(tuple, tupDesc);
959968

pglogical_sync.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ extern void pglogical_sync_table(PGLogicalSubscription *sub, RangeVar *table);
5555
extern void create_local_sync_status(PGLogicalSyncStatus *sync);
5656
extern void drop_subscription_sync_status(Oid subid);
5757

58-
extern PGLogicalSyncStatus *get_subscription_sync_status(Oid subid);
58+
extern PGLogicalSyncStatus *get_subscription_sync_status(Oid subid,
59+
bool missing_ok);
5960
extern void set_subscription_sync_status(Oid subid, char status);
6061

6162
extern void drop_table_sync_status(const char *nspname, const char *relname);

sql/init.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ END;$$;
5252

5353
SELECT sync_kind, sync_subid, sync_nspname, sync_relname, sync_status FROM pglogical.local_sync_status ORDER BY 2,3,4;
5454

55+
SELECT * FROM pglogical.show_subscription_status();
56+
5557
-- Make sure we see the slot and active connection
5658
\c regression
5759
SELECT plugin, slot_type, database, active FROM pg_replication_slots;

0 commit comments

Comments
 (0)