Skip to content

Commit e874370

Browse files
committed
Add support for diff constraints and defaults on subscription
1 parent 7bbf688 commit e874370

File tree

4 files changed

+109
-10
lines changed

4 files changed

+109
-10
lines changed

expected/basic.out

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
2727

2828
\c postgres
2929
ALTER TABLE public.basic_dml ADD COLUMN subonly integer;
30+
ALTER TABLE public.basic_dml ADD COLUMN subonly_def integer DEFAULT 99;
3031
\c regression
3132
-- check basic insert replication
3233
INSERT INTO basic_dml(other, data, something)
@@ -102,14 +103,14 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
102103
(1 row)
103104

104105
\c postgres
105-
SELECT id, other, data, something FROM basic_dml ORDER BY id;
106-
id | other | data | something
107-
----+-------+------+--------------------
108-
1 | 1 | foo1 | @ 50 secs
109-
2 | 2 | bar2 | @ 84 days -10 secs
110-
3 | 3 | baz3 | @ 2 years 1 hour
111-
4 | 4 | | @ 3 days 10 secs
112-
5 | 5 | |
106+
SELECT id, other, data, something, subonly, subonly_def FROM basic_dml ORDER BY id;
107+
id | other | data | something | subonly | subonly_def
108+
----+-------+------+--------------------+---------+-------------
109+
1 | 1 | foo1 | @ 50 secs | | 99
110+
2 | 2 | bar2 | @ 84 days -10 secs | | 99
111+
3 | 3 | baz3 | @ 2 years 1 hour | | 99
112+
4 | 4 | | @ 3 days 10 secs | | 99
113+
5 | 5 | | | | 99
113114
(5 rows)
114115

115116
-- delete one row

pglogical_apply.c

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333
#include "nodes/makefuncs.h"
3434
#include "nodes/parsenodes.h"
3535

36+
#include "optimizer/planner.h"
37+
3638
#include "replication/origin.h"
3739

40+
#include "rewrite/rewriteHandler.h"
41+
3842
#include "storage/ipc.h"
3943
#include "storage/lmgr.h"
4044
#include "storage/proc.h"
@@ -229,11 +233,22 @@ create_estate_for_relation(Relation rel)
229233
{
230234
EState *estate;
231235
ResultRelInfo *resultRelInfo;
236+
RangeTblEntry *rte;
232237

233238
estate = CreateExecutorState();
234239

240+
rte = makeNode(RangeTblEntry);
241+
rte->rtekind = RTE_RELATION;
242+
rte->relid = RelationGetRelid(rel);
243+
rte->relkind = rel->rd_rel->relkind;
244+
estate->es_range_table = list_make1(rte);
245+
235246
resultRelInfo = makeNode(ResultRelInfo);
236-
resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
247+
/* InitResultRelInfo(resultRelInfo,
248+
rel,
249+
1,
250+
0);*/
251+
resultRelInfo->ri_RangeTableIndex = 1;
237252
resultRelInfo->ri_RelationDesc = rel;
238253
resultRelInfo->ri_TrigInstrument = NULL;
239254

@@ -272,6 +287,69 @@ UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot)
272287
}
273288
}
274289

290+
static bool
291+
physatt_in_attmap(PGLogicalRelation *rel, int attid)
292+
{
293+
AttrNumber i;
294+
295+
for (i = 0; i < rel->natts; i++)
296+
if (rel->attmap[i] == attid)
297+
return true;
298+
299+
return false;
300+
}
301+
302+
/*
303+
* Executes default values for columns for which we didn't get any data.
304+
*
305+
* TODO: this needs caching, it's not exactly fast.
306+
*/
307+
static void
308+
fill_tuple_defaults(PGLogicalRelation *rel, ExprContext *econtext,
309+
PGLogicalTupleData *tuple)
310+
{
311+
TupleDesc desc = RelationGetDescr(rel->rel);
312+
AttrNumber num_phys_attrs = desc->natts;
313+
int i;
314+
AttrNumber attnum,
315+
num_defaults = 0;
316+
int *defmap;
317+
ExprState **defexprs;
318+
319+
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
320+
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
321+
322+
for (attnum = 0; attnum < num_phys_attrs; attnum++)
323+
{
324+
Expr *defexpr;
325+
326+
if (desc->attrs[attnum]->attisdropped)
327+
continue;
328+
329+
if (physatt_in_attmap(rel, attnum))
330+
continue;
331+
332+
defexpr = (Expr *) build_column_default(rel->rel, attnum + 1);
333+
334+
if (defexpr != NULL)
335+
{
336+
/* Run the expression through planner */
337+
defexpr = expression_planner(defexpr);
338+
339+
/* Initialize executable expression in copycontext */
340+
defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
341+
defmap[num_defaults] = attnum;
342+
num_defaults++;
343+
}
344+
345+
}
346+
347+
for (i = 0; i < num_defaults; i++)
348+
tuple->values[defmap[i]] = ExecEvalExpr(defexprs[i],
349+
econtext,
350+
&tuple->nulls[defmap[i]],
351+
NULL);
352+
}
275353

276354
static void
277355
handle_insert(StringInfo s)
@@ -298,6 +376,7 @@ handle_insert(StringInfo s)
298376

299377
/* Initialize the executor state. */
300378
estate = create_estate_for_relation(rel->rel);
379+
fill_tuple_defaults(rel, GetPerTupleExprContext(estate), &newtup);
301380
localslot = ExecInitExtraTupleSlot(estate);
302381
applyslot = ExecInitExtraTupleSlot(estate);
303382
ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->rel));
@@ -328,6 +407,11 @@ handle_insert(StringInfo s)
328407
if (apply)
329408
{
330409
ExecStoreTuple(applytuple, applyslot, InvalidBuffer, true);
410+
/* Check the constraints of the tuple */
411+
if (rel->rel->rd_att->constr)
412+
ExecConstraints(estate->es_result_relation_info, applyslot,
413+
estate);
414+
331415
simple_heap_update(rel->rel, &localslot->tts_tuple->t_self,
332416
applytuple);
333417
/* TODO: check for HOT update? */
@@ -338,6 +422,11 @@ handle_insert(StringInfo s)
338422
{
339423
/* No conflict, insert the tuple. */
340424
ExecStoreTuple(remotetuple, applyslot, InvalidBuffer, true);
425+
/* Check the constraints of the tuple */
426+
if (rel->rel->rd_att->constr)
427+
ExecConstraints(estate->es_result_relation_info, applyslot,
428+
estate);
429+
341430
simple_heap_insert(rel->rel, remotetuple);
342431
UserTableUpdateOpenIndexes(estate, applyslot);
343432
}
@@ -414,6 +503,7 @@ handle_update(StringInfo s)
414503

415504
/* Initialize the executor state. */
416505
estate = create_estate_for_relation(rel->rel);
506+
fill_tuple_defaults(rel, GetPerTupleExprContext(estate), &newtup);
417507
localslot = ExecInitExtraTupleSlot(estate);
418508
applyslot = ExecInitExtraTupleSlot(estate);
419509
ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->rel));
@@ -469,6 +559,11 @@ handle_update(StringInfo s)
469559
if (apply)
470560
{
471561
ExecStoreTuple(applytuple, applyslot, InvalidBuffer, true);
562+
/* Check the constraints of the tuple */
563+
if (rel->rel->rd_att->constr)
564+
ExecConstraints(estate->es_result_relation_info, applyslot,
565+
estate);
566+
472567
simple_heap_update(rel->rel, &localslot->tts_tuple->t_self,
473568
applytuple);
474569

pglogical_relcache.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,6 @@ extern void pglogical_relation_close(PGLogicalRelation * rel,
3838
LOCKMODE lockmode);
3939
extern void pglogical_relation_invalidate_cb(Datum arg, Oid reloid);
4040

41+
struct PGLogicalTupleData;
42+
4143
#endif /* PGLOGICAL_RELCACHE_H */

sql/basic.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
1616
\c postgres
1717

1818
ALTER TABLE public.basic_dml ADD COLUMN subonly integer;
19+
ALTER TABLE public.basic_dml ADD COLUMN subonly_def integer DEFAULT 99;
1920

2021
\c regression
2122

@@ -49,7 +50,7 @@ UPDATE basic_dml SET other = id, something = something - '10 seconds'::interval
4950
UPDATE basic_dml SET other = id, something = something + '10 seconds'::interval WHERE id > 3;
5051
SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
5152
\c postgres
52-
SELECT id, other, data, something FROM basic_dml ORDER BY id;
53+
SELECT id, other, data, something, subonly, subonly_def FROM basic_dml ORDER BY id;
5354

5455
-- delete one row
5556
\c regression

0 commit comments

Comments
 (0)