Skip to content

Commit 53f7137

Browse files
committed
Make synchronous commit optional for apply
The default synchronous commit is now false. We also have to keep track of commit lsn mapping between remote and local so that we can report correct flush position to the provider.
1 parent 8796abd commit 53f7137

File tree

3 files changed

+106
-1
lines changed

3 files changed

+106
-1
lines changed

pglogical.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ static const struct config_enum_entry PGLogicalConflictResolvers[] = {
4747
{NULL, 0, false}
4848
};
4949

50+
bool pglogical_synchronous_commit = false;
51+
5052
void _PG_init(void);
5153
void pglogical_supervisor_main(Datum main_arg);
5254

@@ -398,6 +400,14 @@ _PG_init(void)
398400
pglogical_conflict_resolver_check_hook,
399401
NULL, NULL);
400402

403+
DefineCustomBoolVariable("pglogical.synchronous_commit",
404+
"pglogical specific synchronous commit value",
405+
NULL,
406+
&pglogical_synchronous_commit,
407+
false, PGC_POSTMASTER,
408+
0,
409+
NULL, NULL, NULL);
410+
401411
if (IsBinaryUpgrade)
402412
return;
403413

pglogical.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
#define REPLICATION_ORIGIN_ALL "all"
3636

37+
extern bool pglogical_synchronous_commit;
38+
3739
extern char *shorten_hash(const char *str, int maxlen);
3840

3941
extern List *textarray_to_list(ArrayType *textarray);

pglogical_apply.c

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include "pglogical_worker.h"
6363
#include "pglogical.h"
6464

65+
6566
void pglogical_apply_main(Datum main_arg);
6667

6768
static bool in_remote_transaction = false;
@@ -77,6 +78,15 @@ PGLogicalSubscription *MySubscription = NULL;
7778

7879
static PGconn *applyconn = NULL;
7980

81+
typedef struct PGLFlushPosition
82+
{
83+
dlist_node node;
84+
XLogRecPtr local_end;
85+
XLogRecPtr remote_end;
86+
} PGLFlushPosition;
87+
88+
dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
89+
8090
static void handle_queued_message(HeapTuple msgtup, bool tx_just_started);
8191
static void handle_startup_param(const char *key, const char *value);
8292
static bool parse_bool_param(const char *key, const char *value);
@@ -144,7 +154,17 @@ handle_commit(StringInfo s)
144154

145155
if (IsTransactionState())
146156
{
157+
PGLFlushPosition *flushpos;
158+
147159
CommitTransactionCommand();
160+
MemoryContextSwitchTo(TopMemoryContext);
161+
162+
/* Track commit lsn */
163+
flushpos = (PGLFlushPosition *) palloc(sizeof(PGLFlushPosition));
164+
flushpos->local_end = XactLastCommitEnd;
165+
flushpos->remote_end = end_lsn;
166+
167+
dlist_push_tail(&lsn_mapping, &flushpos->node);
148168
MemoryContextSwitchTo(MessageContext);
149169
}
150170

@@ -1032,6 +1052,58 @@ replication_handler(StringInfo s)
10321052
}
10331053
}
10341054

1055+
/*
1056+
* Figure out which write/flush positions to report to the walsender process.
1057+
*
1058+
* We can't simply report back the last LSN the walsender sent us because the
1059+
* local transaction might not yet be flushed to disk locally. Instead we
1060+
* build a list that associates local with remote LSNs for every commit. When
1061+
* reporting back the flush position to the sender we iterate that list and
1062+
* check which entries on it are already locally flushed. Those we can report
1063+
* as having been flushed.
1064+
*
1065+
* Returns true if there's no outstanding transactions that need to be
1066+
* flushed.
1067+
*/
1068+
static bool
1069+
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush)
1070+
{
1071+
dlist_mutable_iter iter;
1072+
XLogRecPtr local_flush = GetFlushRecPtr();
1073+
1074+
*write = InvalidXLogRecPtr;
1075+
*flush = InvalidXLogRecPtr;
1076+
1077+
dlist_foreach_modify(iter, &lsn_mapping)
1078+
{
1079+
PGLFlushPosition *pos =
1080+
dlist_container(PGLFlushPosition, node, iter.cur);
1081+
1082+
*write = pos->remote_end;
1083+
1084+
if (pos->local_end <= local_flush)
1085+
{
1086+
*flush = pos->remote_end;
1087+
dlist_delete(iter.cur);
1088+
pfree(pos);
1089+
}
1090+
else
1091+
{
1092+
/*
1093+
* Don't want to uselessly iterate over the rest of the list which
1094+
* could potentially be long. Instead get the last element and
1095+
* grab the write position from there.
1096+
*/
1097+
pos = dlist_tail_element(PGLFlushPosition, node,
1098+
&lsn_mapping);
1099+
*write = pos->remote_end;
1100+
return false;
1101+
}
1102+
}
1103+
1104+
return dlist_is_empty(&lsn_mapping);
1105+
}
1106+
10351107
/*
10361108
* Send a Standby Status Update message to server.
10371109
*
@@ -1054,7 +1126,14 @@ send_feedback(PGconn *conn, XLogRecPtr recvpos, int64 now, bool force)
10541126
if (recvpos < last_recvpos)
10551127
recvpos = last_recvpos;
10561128

1057-
flushpos = writepos = recvpos;
1129+
if (get_flush_position(&writepos, &flushpos))
1130+
{
1131+
/*
1132+
* No outstanding transactions to flush, we can report the latest
1133+
* received position. This is important for synchronous replication.
1134+
*/
1135+
flushpos = writepos = recvpos;
1136+
}
10581137

10591138
if (writepos < last_writepos)
10601139
writepos = last_writepos;
@@ -1554,6 +1633,20 @@ pglogical_apply_main(Datum main_arg)
15541633
/* Connect to our database. */
15551634
BackgroundWorkerInitializeConnectionByOid(MyPGLogicalWorker->dboid, InvalidOid);
15561635

1636+
/* setup synchronous commit according to the user's wishes */
1637+
SetConfigOption("synchronous_commit",
1638+
pglogical_synchronous_commit ? "local" : "off",
1639+
PGC_BACKEND, PGC_S_OVERRIDE); /* other context? */
1640+
1641+
/*
1642+
* Disable function body checks during replay. That's necessary because a)
1643+
* the creator of the function might have had it disabled b) the function
1644+
* might be search_path dependant and we don't fix the contents of
1645+
* functions.
1646+
*/
1647+
SetConfigOption("check_function_bodies", "off",
1648+
PGC_INTERNAL, PGC_S_OVERRIDE);
1649+
15571650
/* Load the subscription. */
15581651
StartTransactionCommand();
15591652
saved_ctx = MemoryContextSwitchTo(TopMemoryContext);

0 commit comments

Comments
 (0)