Skip to content

Commit 561030c

Browse files
committed
Changes based on review by Tomasz Rybak
* Better comments on whole old tuple support * Use named defines for startup params version, startup reply msg version Additionally, use decoding context for relmetacache lifetime and delete the cache from the shutdown hook.
1 parent 6294165 commit 561030c

File tree

7 files changed

+103
-75
lines changed

7 files changed

+103
-75
lines changed

contrib/pglogical_output/pglogical_config.c

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ void
129129
process_parameters_v1(List *options, PGLogicalOutputData *data)
130130
{
131131
Datum val;
132-
bool found;
133132
ListCell *lc;
134133

135134
/*
@@ -272,13 +271,13 @@ process_parameters(List *options, PGLogicalOutputData *data)
272271

273272
params_format = DatumGetUInt32(val);
274273

275-
if (params_format == 1)
274+
if (params_format == PGLOGICAL_STARTUP_PARAM_FORMAT_FLAT)
276275
process_parameters_v1(options, data);
277276
else
278277
ereport(ERROR,
279278
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
280-
errmsg("startup_params_format %d not supported, only version 1 supported",
281-
params_format)));
279+
errmsg("startup_params_format %d not supported, only version %d supported",
280+
params_format, PGLOGICAL_STARTUP_PARAM_FORMAT_FLAT)));
282281

283282
return params_format;
284283
}

contrib/pglogical_output/pglogical_output.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
376376

377377
/* if cache enabled, init it */
378378
if (data->relmeta_cache_size != 0)
379-
pglogical_init_relmetacache();
379+
pglogical_init_relmetacache(ctx->context);
380380
}
381381
}
382382

@@ -561,6 +561,8 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
561561

562562
call_shutdown_hook(data);
563563

564+
pglogical_destroy_relmetacache();
565+
564566
/*
565567
* no need to delete data->context or data->hooks_mctxt as they're children
566568
* of ctx->context which will expire on return.

contrib/pglogical_output/pglogical_output.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@
3939
#define PGLOGICAL_PROTO_VERSION_NUM 1
4040
#define PGLOGICAL_PROTO_MIN_VERSION_NUM 1
4141

42+
/*
43+
* The startup parameter format is versioned separately to the rest of the wire
44+
* protocol because we negotiate the wire protocol version using the startup
45+
* parameters sent to us. It hopefully won't ever need to change, but this
46+
* field is present in case we do need to change it, e.g. to a structured json
47+
* object. We can look at the startup params version to see whether we can
48+
* understand the startup params sent by the client and to fall back to
49+
* reading an older format if needed.
50+
*/
51+
#define PGLOGICAL_STARTUP_PARAM_FORMAT_FLAT 1
52+
4253
struct PGLogicalProtoAPI;
4354

4455
typedef struct PGLogicalOutputData

contrib/pglogical_output/pglogical_proto_native.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,16 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
271271
/* use Oid as relation identifier */
272272
pq_sendint(out, RelationGetRelid(rel), 4);
273273

274-
/* FIXME support whole tuple (O tuple type) */
274+
/*
275+
* TODO: support whole tuple (O tuple type)
276+
*
277+
* Right now we can only write the key-part since logical decoding
278+
* doesn't know how to record the whole old tuple for us in WAL.
279+
* We can't use REPLICA IDENTITY FULL for this, since that makes
280+
* the key-part the whole tuple, causing issues with conflict
281+
* resultion and index lookups. We need a separate decoding option
282+
* to record whole tuples.
283+
*/
275284
if (oldtuple != NULL)
276285
{
277286
pq_sendbyte(out, 'K'); /* old key follows */
@@ -299,7 +308,11 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
299308
/* use Oid as relation identifier */
300309
pq_sendint(out, RelationGetRelid(rel), 4);
301310

302-
/* FIXME support whole tuple (O tuple type) */
311+
/*
312+
* TODO support whole tuple ('O' tuple type)
313+
*
314+
* See notes on update for details
315+
*/
303316
pq_sendbyte(out, 'K'); /* old key follows */
304317
pglogical_write_tuple(out, data, rel, oldtuple);
305318
}
@@ -314,7 +327,7 @@ write_startup_message(StringInfo out, List *msg)
314327
ListCell *lc;
315328

316329
pq_sendbyte(out, 'S'); /* message type field */
317-
pq_sendbyte(out, 1); /* startup message version */
330+
pq_sendbyte(out, PGLOGICAL_STARTUP_MSG_FORMAT_FLAT); /* startup message version */
318331
foreach (lc, msg)
319332
{
320333
DefElem *param = (DefElem*)lfirst(lc);

contrib/pglogical_output/pglogical_proto_native.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@
1414
#define PG_LOGICAL_PROTO_NATIVE_H
1515

1616

17+
/*
18+
* For similar reasons to the startup params
19+
* (PGLOGICAL_STARTUP_PARAM_FORMAT_FLAT) the startup reply message format is
20+
* versioned separately to the rest of the protocol. The client has to be able
21+
* to read it to find out what protocol version was selected by the upstream
22+
* when using the native protocol.
23+
*/
24+
#define PGLOGICAL_STARTUP_MSG_FORMAT_FLAT 1
25+
1726
extern void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel,
1827
struct PGLRelMetaCacheEntry *cache_entry);
1928

contrib/pglogical_output/pglogical_relmetacache.c

Lines changed: 58 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -23,88 +23,71 @@
2323
static void relmeta_cache_callback(Datum arg, Oid relid);
2424

2525
/*
26-
* We need a global hash table that invalidation callbacks can
27-
* access because they survive past the logical decoding context and
28-
* therefore past our local PGLogicalOutputData's lifetime when
29-
* using the SQL interface. We cannot just pass them a pointer to a
30-
* palloc'd struct.
26+
* We need a global that invalidation callbacks can access because they
27+
* survive past the logical decoding context and therefore past our
28+
* local PGLogicalOutputData's lifetime when using the SQL interface. We
29+
* cannot just pass them a pointer to a palloc'd struct.
30+
*
31+
* If the hash table has been destroyed the callbacks know to do nothing.
3132
*/
3233
static HTAB *RelMetaCache = NULL;
3334

35+
/*
36+
* The callback persists across decoding sessions so we should only
37+
* register it once.
38+
*/
39+
static bool callback_registered = false;
40+
3441

3542
/*
36-
* Initialize the relation metadata cache if not already initialized.
37-
*
38-
* Purge it if it already exists.
43+
* Initialize the relation metadata cache for a decoding session.
3944
*
40-
* The hash table its self must be in CacheMemoryContext or TopMemoryContext
41-
* since it persists outside the decoding session.
45+
* The hash table is destoyed at the end of a decoding session. While
46+
* relcache invalidations still exist and will still be invoked, they
47+
* will just see the null hash table global and take no action.
4248
*/
4349
void
44-
pglogical_init_relmetacache(void)
50+
pglogical_init_relmetacache(MemoryContext decoding_context)
4551
{
4652
HASHCTL ctl;
4753

48-
if (RelMetaCache == NULL)
49-
{
50-
/* first time, init the cache */
51-
int hash_flags = HASH_ELEM | HASH_CONTEXT;
54+
Assert(RelMetaCache == NULL);
5255

53-
/* Make sure we've initialized CacheMemoryContext. */
54-
if (CacheMemoryContext == NULL)
55-
CreateCacheMemoryContext();
56+
/* Make a new hash table for the cache */
57+
int hash_flags = HASH_ELEM | HASH_CONTEXT;
5658

57-
MemSet(&ctl, 0, sizeof(ctl));
58-
ctl.keysize = sizeof(Oid);
59-
ctl.entrysize = sizeof(struct PGLRelMetaCacheEntry);
60-
/* safe to allocate to CacheMemoryContext since it's never reset */
61-
ctl.hcxt = CacheMemoryContext;
59+
MemSet(&ctl, 0, sizeof(ctl));
60+
ctl.keysize = sizeof(Oid);
61+
ctl.entrysize = sizeof(struct PGLRelMetaCacheEntry);
62+
ctl.hcxt = decoding_context;
6263

6364
#if PG_VERSION_NUM >= 90500
64-
hash_flags |= HASH_BLOBS;
65+
hash_flags |= HASH_BLOBS;
6566
#else
66-
ctl.hash = tag_hash;
67-
hash_flags |= HASH_FUNCTION;
67+
ctl.hash = tag_hash;
68+
hash_flags |= HASH_FUNCTION;
6869
#endif
6970

70-
RelMetaCache = hash_create("pglogical relation metadata cache", 128,
71-
&ctl, hash_flags);
71+
RelMetaCache = hash_create("pglogical relation metadata cache", 128,
72+
&ctl, hash_flags);
7273

73-
Assert(RelMetaCache != NULL);
74+
Assert(RelMetaCache != NULL);
7475

75-
/*
76-
* Watch for invalidation events.
77-
*
78-
* We don't pass PGLogicalOutputData here because it's scoped to the
79-
* individual decoding session, which with the SQL interface has a shorter
80-
* lifetime than the relcache invalidation callback registration. We have
81-
* no way to remove invalidation callbacks at the end of the decoding
82-
* session so we have to cope with them being called later.
83-
*/
84-
CacheRegisterRelcacheCallback(relmeta_cache_callback, (Datum)0);
85-
}
86-
else
76+
/*
77+
* Watch for invalidation events.
78+
*
79+
* We don't pass PGLogicalOutputData here because it's scoped to the
80+
* individual decoding session, which with the SQL interface has a
81+
* shorter lifetime than the relcache invalidation callback
82+
* registration. We have no way to remove invalidation callbacks at
83+
* the end of the decoding session or change them - so we have to
84+
* cope with them being called later. If we wanted to pass the
85+
* decoding private data we'd need to stash it in a global.
86+
*/
87+
if (!callback_registered)
8788
{
88-
/*
89-
* On re-init we must flush the cache since there could be
90-
* dangling pointers to api_private data in the freed
91-
* decoding context of a prior session. We could go through
92-
* and clear them and the is_cached flag but it seems best
93-
* to have a clean slate.
94-
*/
95-
HASH_SEQ_STATUS status;
96-
struct PGLRelMetaCacheEntry *hentry;
97-
hash_seq_init(&status, RelMetaCache);
98-
99-
while ((hentry = (struct PGLRelMetaCacheEntry*) hash_seq_search(&status)) != NULL)
100-
{
101-
if (hash_search(RelMetaCache,
102-
(void *) &hentry->relid,
103-
HASH_REMOVE, NULL) == NULL)
104-
elog(ERROR, "pglogical RelMetaCache hash table corrupted");
105-
}
106-
107-
return;
89+
CacheRegisterRelcacheCallback(relmeta_cache_callback, (Datum)0);
90+
callback_registered = true;
10891
}
10992
}
11093

@@ -115,6 +98,14 @@ pglogical_init_relmetacache(void)
11598
static void
11699
relmeta_cache_callback(Datum arg, Oid relid)
117100
{
101+
/*
102+
* We can be called after decoding session teardown becaues the
103+
* relcache callback isn't cleared. In that case there's no action
104+
* to take.
105+
*/
106+
if (RelMetaCache == NULL)
107+
return;
108+
118109
/*
119110
* Nobody keeps pointers to entries in this hash table around so
120111
* it's safe to directly HASH_REMOVE the entries as soon as they are
@@ -176,12 +167,13 @@ pglogical_cache_relmeta(struct PGLogicalOutputData *data,
176167

177168

178169
/*
179-
* Tear down the relation metadata cache.
170+
* Tear down the relation metadata cache at the end of a decoding
171+
* session.
180172
*
181-
* Do *not* call this at decoding shutdown. The hash table must
182-
* continue to exist so that relcache invalidation callbacks can
183-
* continue to reference it after a SQL decoding session finishes.
184-
* It must be called at backend shutdown only.
173+
* The api_private data need not be freed explicitly; it'll be purged
174+
* by destruction of the memory context. The main reason we do this
175+
* much is to make sure we nullify the global, making sure that
176+
* callbacks will see that there's nothing to do.
185177
*/
186178
void
187179
pglogical_destroy_relmetacache(void)

contrib/pglogical_output/pglogical_relmetacache.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef PGLOGICAL_RELMETA_CACHE_H
22
#define PGLOGICAL_RELMETA_CACHE_H
33

4+
#include "nodes/memnodes.h"
5+
46
struct PGLRelMetaCacheEntry
57
{
68
Oid relid;
@@ -12,7 +14,7 @@ struct PGLRelMetaCacheEntry
1214

1315
struct PGLogicalOutputData;
1416

15-
extern void pglogical_init_relmetacache(void);
17+
extern void pglogical_init_relmetacache(MemoryContext decoding_context);
1618
extern bool pglogical_cache_relmeta(struct PGLogicalOutputData *data, Relation rel, struct PGLRelMetaCacheEntry **entry);
1719
extern void pglogical_destroy_relmetacache(void);
1820

0 commit comments

Comments
 (0)