Skip to content

Commit ca53820

Browse files
committed
Drop replication sets and slots when dropping node
1 parent 4c16014 commit ca53820

File tree

6 files changed

+101
-9
lines changed

6 files changed

+101
-9
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ SCRIPTS_built = pglogical_create_subscriber
1818
PG_CPPFLAGS = -I$(libpq_srcdir)
1919
SHLIB_LINK = $(libpq)
2020

21-
REGRESS = init basic extended toasted replication_set add_table matview bidirectional
21+
REGRESS = init basic extended toasted replication_set add_table matview bidirectional drop
2222

2323
ifdef PG94
2424
PG_CPPFLAGS += -Icompat

pglogical_functions.c

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
#include "replication/origin.h"
3939
#include "replication/reorderbuffer.h"
40+
#include "replication/slot.h"
4041

4142
#include "storage/latch.h"
4243
#include "storage/proc.h"
@@ -140,7 +141,7 @@ pglogical_create_node(PG_FUNCTION_ARGS)
140141
/*
141142
* Drop the named node.
142143
*
143-
* TODO: cascade support
144+
* TODO: support cascade (drop subscribers)
144145
*/
145146
Datum
146147
pglogical_drop_node(PG_FUNCTION_ARGS)
@@ -165,13 +166,57 @@ pglogical_drop_node(PG_FUNCTION_ARGS)
165166
errmsg("cannot drop node \"%s\" because it still has subscriptions associated with it", node_name),
166167
errhint("drop the subscriptions first")));
167168

168-
/* Drop all the interfaces. */
169-
drop_node_interfaces(node->id);
170-
171169
/* If the node is local node, drop the record as well. */
172170
local_node = get_local_node(true);
173171
if (local_node && local_node->node->id == node->id)
172+
{
173+
int slotno;
174+
175+
/* Also drop all the slots associated with the node. */
176+
for (slotno = 0; slotno < max_replication_slots; slotno++)
177+
{
178+
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
179+
180+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
181+
SpinLockAcquire(&slot->mutex);
182+
/*
183+
* Check the slot is in use and it's slot belonging to current
184+
* pglogical node.
185+
*/
186+
if (!slot->in_use ||
187+
slot->data.database != MyDatabaseId ||
188+
namestrcmp(&slot->data.plugin, "pglogical_output") != 0 ||
189+
strncmp(NameStr(slot->data.name), "pgl_", 4) != 0)
190+
{
191+
SpinLockRelease(&slot->mutex);
192+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
193+
continue;
194+
}
195+
196+
if (slot->active_pid != 0)
197+
{
198+
SpinLockRelease(&slot->mutex);
199+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
200+
ereport(ERROR,
201+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
202+
errmsg("cannot drop node \"%s\" because replication slot \"%s\" on the node is still active",
203+
node_name, NameStr(slot->data.name)),
204+
errhint("drop the subscriptions first")));
205+
}
206+
SpinLockRelease(&slot->mutex);
207+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
208+
209+
ReplicationSlotDrop(NameStr(slot->data.name));
210+
}
211+
174212
drop_local_node();
213+
}
214+
215+
/* Drop all the interfaces. */
216+
drop_node_interfaces(node->id);
217+
218+
/* Drop replication sets associated with the node. */
219+
drop_node_replication_sets(node->id);
175220

176221
/* Drop the node itself. */
177222
drop_node(node->id);

pglogical_hooks.c

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,18 +209,35 @@ pglogical_row_filter_hook(struct PGLogicalRowFilterArgs *rowfilter_args)
209209
* We can use this to update our cached replication set info, without
210210
* having to deal with cache invalidation callbacks.
211211
*/
212-
HeapTuple tup = &rowfilter_args->change->data.tp.newtuple->tuple;
212+
HeapTuple tup;
213213
PGLogicalRepSet *replicated_set;
214214
ListCell *plc;
215215

216+
if (rowfilter_args->change_type == REORDER_BUFFER_CHANGE_UPDATE)
217+
tup = &rowfilter_args->change->data.tp.newtuple->tuple;
218+
else if (rowfilter_args->change_type == REORDER_BUFFER_CHANGE_DELETE)
219+
tup = &rowfilter_args->change->data.tp.oldtuple->tuple;
220+
else
221+
return false;
222+
216223
replicated_set = replication_set_from_tuple(tup);
217224
foreach (plc, private->replication_sets)
218225
{
219226
PGLogicalRepSet *rs = lfirst(plc);
220227

221-
/* Our cached replication set was updated, update local cache. */
228+
/* Check if the changed repset is used by us. */
222229
if (rs->id == replicated_set->id)
223230
{
231+
/*
232+
* In case this was delete, somebody deleted one of our
233+
* rep sets, bail here and let reconnect logic handle any
234+
* potential issues.
235+
*/
236+
if (rowfilter_args->change_type == REORDER_BUFFER_CHANGE_DELETE)
237+
elog(ERROR, "replication set \"%s\" used by this connection was deleted, existing",
238+
rs->name);
239+
240+
/* This was update of our repset, update the cache. */
224241
rs->replicate_insert = replicated_set->replicate_insert;
225242
rs->replicate_update = replicated_set->replicate_update;
226243
rs->replicate_delete = replicated_set->replicate_delete;

pglogical_node.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ drop_local_node(void)
332332
HeapTuple tuple;
333333

334334
rv = makeRangeVar(EXTENSION_NAME, CATALOG_LOCAL_NODE, -1);
335-
rel = heap_openrv(rv, RowExclusiveLock);
335+
rel = heap_openrv(rv, AccessExclusiveLock);
336336

337337
/* Find the local node tuple. */
338338
scan = systable_beginscan(rel, 0, true, NULL, 0, NULL);
@@ -349,7 +349,7 @@ drop_local_node(void)
349349

350350
/* Cleanup. */
351351
systable_endscan(scan);
352-
heap_close(rel, RowExclusiveLock);
352+
heap_close(rel, NoLock);
353353

354354
CommandCounterIncrement();
355355
}

pglogical_repset.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,35 @@ drop_replication_set(Oid setid)
792792
heap_close(rel, RowExclusiveLock);
793793
}
794794

795+
void
796+
drop_node_replication_sets(Oid nodeid)
797+
{
798+
RangeVar *rv;
799+
Relation rel;
800+
SysScanDesc scan;
801+
HeapTuple tuple;
802+
ScanKeyData key[1];
803+
804+
Assert(IsTransactionState());
805+
806+
rv = makeRangeVar(EXTENSION_NAME, CATALOG_REPSET, -1);
807+
rel = heap_openrv(rv, RowExclusiveLock);
808+
809+
ScanKeyInit(&key[0],
810+
Anum_repset_nodeid,
811+
BTEqualStrategyNumber, F_OIDEQ,
812+
ObjectIdGetDatum(nodeid));
813+
814+
scan = systable_beginscan(rel, 0, true, NULL, 1, key);
815+
816+
/* Remove matching tuples. */
817+
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
818+
simple_heap_delete(rel, &tuple->t_self);
819+
820+
systable_endscan(scan);
821+
heap_close(rel, RowExclusiveLock);
822+
}
823+
795824
/*
796825
* Insert new replication set / relation mapping.
797826
*

pglogical_repset.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ extern bool relation_is_replicated(Relation rel, Oid nodeid,
6767
extern void create_replication_set(PGLogicalRepSet *repset);
6868
extern void alter_replication_set(PGLogicalRepSet *repset);
6969
extern void drop_replication_set(Oid setid);
70+
extern void drop_node_replication_sets(Oid nodeid);
7071

7172
extern void replication_set_add_table(Oid setid, Oid reloid);
7273
extern void replication_set_remove_table(Oid setid, Oid reloid,

0 commit comments

Comments
 (0)