Skip to content

Commit eea081a

Browse files
committed
Rearrange logrep worker's snapshot handling some more.
It turns out that worker.c's code path for TRUNCATE was also careless about establishing a snapshot while executing user-defined code, allowing the checks added by commit 84f5c29 to fail when a trigger is fired in that context. We could just wrap Push/PopActiveSnapshot around the truncate call, but it seems better to establish a policy of holding a snapshot throughout execution of a replication step. To help with that and possible future requirements, replace the previous ensure_transaction calls with pairs of begin/end_replication_step calls. Per report from Mark Dilger. Back-patch to v11, like the previous changes. Discussion: https://postgr.es/m/B4A3AF82-79ED-4F4C-A4E5-CD2622098972@enterprisedb.com
1 parent 534b9be commit eea081a

File tree

1 file changed

+38
-32
lines changed

1 file changed

+38
-32
lines changed

src/backend/replication/logical/worker.c

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -154,30 +154,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
154154
}
155155

156156
/*
157-
* Make sure that we started local transaction.
157+
* Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
158158
*
159-
* Also switches to ApplyMessageContext as necessary.
159+
* Start a transaction, if this is the first step (else we keep using the
160+
* existing transaction).
161+
* Also provide a global snapshot and ensure we run in ApplyMessageContext.
160162
*/
161-
static bool
162-
ensure_transaction(void)
163+
static void
164+
begin_replication_step(void)
163165
{
164-
if (IsTransactionState())
165-
{
166-
SetCurrentStatementStartTimestamp();
167-
168-
if (CurrentMemoryContext != ApplyMessageContext)
169-
MemoryContextSwitchTo(ApplyMessageContext);
166+
SetCurrentStatementStartTimestamp();
170167

171-
return false;
168+
if (!IsTransactionState())
169+
{
170+
StartTransactionCommand();
171+
maybe_reread_subscription();
172172
}
173173

174-
SetCurrentStatementStartTimestamp();
175-
StartTransactionCommand();
176-
177-
maybe_reread_subscription();
174+
PushActiveSnapshot(GetTransactionSnapshot());
178175

179176
MemoryContextSwitchTo(ApplyMessageContext);
180-
return true;
177+
}
178+
179+
/*
180+
* Finish up one step of a replication transaction.
181+
* Callers of begin_replication_step() must also call this.
182+
*
183+
* We don't close out the transaction here, but we should increment
184+
* the command counter to make the effects of this step visible.
185+
*/
186+
static void
187+
end_replication_step(void)
188+
{
189+
PopActiveSnapshot();
190+
191+
CommandCounterIncrement();
181192
}
182193

183194

@@ -194,13 +205,6 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
194205
ResultRelInfo *resultRelInfo;
195206
RangeTblEntry *rte;
196207

197-
/*
198-
* Input functions may need an active snapshot, as may AFTER triggers
199-
* invoked during finish_estate. For safety, ensure an active snapshot
200-
* exists throughout all our usage of the executor.
201-
*/
202-
PushActiveSnapshot(GetTransactionSnapshot());
203-
204208
estate = CreateExecutorState();
205209

206210
rte = makeNode(RangeTblEntry);
@@ -241,7 +245,6 @@ finish_estate(EState *estate)
241245
/* Cleanup. */
242246
ExecResetTupleTable(estate->es_tupleTable, false);
243247
FreeExecutorState(estate);
244-
PopActiveSnapshot();
245248
}
246249

247250
/*
@@ -631,7 +634,7 @@ apply_handle_insert(StringInfo s)
631634
TupleTableSlot *remoteslot;
632635
MemoryContext oldctx;
633636

634-
ensure_transaction();
637+
begin_replication_step();
635638

636639
relid = logicalrep_read_insert(s, &newtup);
637640
rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -642,6 +645,7 @@ apply_handle_insert(StringInfo s)
642645
* transaction so it's safe to unlock it.
643646
*/
644647
logicalrep_rel_close(rel, RowExclusiveLock);
648+
end_replication_step();
645649
return;
646650
}
647651

@@ -668,7 +672,7 @@ apply_handle_insert(StringInfo s)
668672

669673
logicalrep_rel_close(rel, NoLock);
670674

671-
CommandCounterIncrement();
675+
end_replication_step();
672676
}
673677

674678
/*
@@ -727,7 +731,7 @@ apply_handle_update(StringInfo s)
727731
bool found;
728732
MemoryContext oldctx;
729733

730-
ensure_transaction();
734+
begin_replication_step();
731735

732736
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
733737
&newtup);
@@ -739,6 +743,7 @@ apply_handle_update(StringInfo s)
739743
* transaction so it's safe to unlock it.
740744
*/
741745
logicalrep_rel_close(rel, RowExclusiveLock);
746+
end_replication_step();
742747
return;
743748
}
744749

@@ -840,7 +845,7 @@ apply_handle_update(StringInfo s)
840845

841846
logicalrep_rel_close(rel, NoLock);
842847

843-
CommandCounterIncrement();
848+
end_replication_step();
844849
}
845850

846851
/*
@@ -862,7 +867,7 @@ apply_handle_delete(StringInfo s)
862867
bool found;
863868
MemoryContext oldctx;
864869

865-
ensure_transaction();
870+
begin_replication_step();
866871

867872
relid = logicalrep_read_delete(s, &oldtup);
868873
rel = logicalrep_rel_open(relid, RowExclusiveLock);
@@ -873,6 +878,7 @@ apply_handle_delete(StringInfo s)
873878
* transaction so it's safe to unlock it.
874879
*/
875880
logicalrep_rel_close(rel, RowExclusiveLock);
881+
end_replication_step();
876882
return;
877883
}
878884

@@ -934,7 +940,7 @@ apply_handle_delete(StringInfo s)
934940

935941
logicalrep_rel_close(rel, NoLock);
936942

937-
CommandCounterIncrement();
943+
end_replication_step();
938944
}
939945

940946
/*
@@ -955,7 +961,7 @@ apply_handle_truncate(StringInfo s)
955961
ListCell *lc;
956962
LOCKMODE lockmode = AccessExclusiveLock;
957963

958-
ensure_transaction();
964+
begin_replication_step();
959965

960966
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
961967

@@ -996,7 +1002,7 @@ apply_handle_truncate(StringInfo s)
9961002
logicalrep_rel_close(rel, NoLock);
9971003
}
9981004

999-
CommandCounterIncrement();
1005+
end_replication_step();
10001006
}
10011007

10021008

0 commit comments

Comments
 (0)