62
62
#include "pglogical_worker.h"
63
63
#include "pglogical.h"
64
64
65
+
65
66
void pglogical_apply_main (Datum main_arg );
66
67
67
68
static bool in_remote_transaction = false;
@@ -77,6 +78,15 @@ PGLogicalSubscription *MySubscription = NULL;
77
78
78
79
static PGconn * applyconn = NULL ;
79
80
81
+ typedef struct PGLFlushPosition
82
+ {
83
+ dlist_node node ;
84
+ XLogRecPtr local_end ;
85
+ XLogRecPtr remote_end ;
86
+ } PGLFlushPosition ;
87
+
88
+ dlist_head lsn_mapping = DLIST_STATIC_INIT (lsn_mapping );
89
+
80
90
static void handle_queued_message (HeapTuple msgtup , bool tx_just_started );
81
91
static void handle_startup_param (const char * key , const char * value );
82
92
static bool parse_bool_param (const char * key , const char * value );
@@ -144,7 +154,17 @@ handle_commit(StringInfo s)
144
154
145
155
if (IsTransactionState ())
146
156
{
157
+ PGLFlushPosition * flushpos ;
158
+
147
159
CommitTransactionCommand ();
160
+ MemoryContextSwitchTo (TopMemoryContext );
161
+
162
+ /* Track commit lsn */
163
+ flushpos = (PGLFlushPosition * ) palloc (sizeof (PGLFlushPosition ));
164
+ flushpos -> local_end = XactLastCommitEnd ;
165
+ flushpos -> remote_end = end_lsn ;
166
+
167
+ dlist_push_tail (& lsn_mapping , & flushpos -> node );
148
168
MemoryContextSwitchTo (MessageContext );
149
169
}
150
170
@@ -1032,6 +1052,58 @@ replication_handler(StringInfo s)
1032
1052
}
1033
1053
}
1034
1054
1055
+ /*
1056
+ * Figure out which write/flush positions to report to the walsender process.
1057
+ *
1058
+ * We can't simply report back the last LSN the walsender sent us because the
1059
+ * local transaction might not yet be flushed to disk locally. Instead we
1060
+ * build a list that associates local with remote LSNs for every commit. When
1061
+ * reporting back the flush position to the sender we iterate that list and
1062
+ * check which entries on it are already locally flushed. Those we can report
1063
+ * as having been flushed.
1064
+ *
1065
+ * Returns true if there's no outstanding transactions that need to be
1066
+ * flushed.
1067
+ */
1068
+ static bool
1069
+ get_flush_position (XLogRecPtr * write , XLogRecPtr * flush )
1070
+ {
1071
+ dlist_mutable_iter iter ;
1072
+ XLogRecPtr local_flush = GetFlushRecPtr ();
1073
+
1074
+ * write = InvalidXLogRecPtr ;
1075
+ * flush = InvalidXLogRecPtr ;
1076
+
1077
+ dlist_foreach_modify (iter , & lsn_mapping )
1078
+ {
1079
+ PGLFlushPosition * pos =
1080
+ dlist_container (PGLFlushPosition , node , iter .cur );
1081
+
1082
+ * write = pos -> remote_end ;
1083
+
1084
+ if (pos -> local_end <= local_flush )
1085
+ {
1086
+ * flush = pos -> remote_end ;
1087
+ dlist_delete (iter .cur );
1088
+ pfree (pos );
1089
+ }
1090
+ else
1091
+ {
1092
+ /*
1093
+ * Don't want to uselessly iterate over the rest of the list which
1094
+ * could potentially be long. Instead get the last element and
1095
+ * grab the write position from there.
1096
+ */
1097
+ pos = dlist_tail_element (PGLFlushPosition , node ,
1098
+ & lsn_mapping );
1099
+ * write = pos -> remote_end ;
1100
+ return false;
1101
+ }
1102
+ }
1103
+
1104
+ return dlist_is_empty (& lsn_mapping );
1105
+ }
1106
+
1035
1107
/*
1036
1108
* Send a Standby Status Update message to server.
1037
1109
*
@@ -1054,7 +1126,14 @@ send_feedback(PGconn *conn, XLogRecPtr recvpos, int64 now, bool force)
1054
1126
if (recvpos < last_recvpos )
1055
1127
recvpos = last_recvpos ;
1056
1128
1057
- flushpos = writepos = recvpos ;
1129
+ if (get_flush_position (& writepos , & flushpos ))
1130
+ {
1131
+ /*
1132
+ * No outstanding transactions to flush, we can report the latest
1133
+ * received position. This is important for synchronous replication.
1134
+ */
1135
+ flushpos = writepos = recvpos ;
1136
+ }
1058
1137
1059
1138
if (writepos < last_writepos )
1060
1139
writepos = last_writepos ;
@@ -1554,6 +1633,20 @@ pglogical_apply_main(Datum main_arg)
1554
1633
/* Connect to our database. */
1555
1634
BackgroundWorkerInitializeConnectionByOid (MyPGLogicalWorker -> dboid , InvalidOid );
1556
1635
1636
+ /* setup synchronous commit according to the user's wishes */
1637
+ SetConfigOption ("synchronous_commit" ,
1638
+ pglogical_synchronous_commit ? "local" : "off" ,
1639
+ PGC_BACKEND , PGC_S_OVERRIDE ); /* other context? */
1640
+
1641
+ /*
1642
+ * Disable function body checks during replay. That's necessary because a)
1643
+ * the creator of the function might have had it disabled b) the function
1644
+ * might be search_path dependant and we don't fix the contents of
1645
+ * functions.
1646
+ */
1647
+ SetConfigOption ("check_function_bodies" , "off" ,
1648
+ PGC_INTERNAL , PGC_S_OVERRIDE );
1649
+
1557
1650
/* Load the subscription. */
1558
1651
StartTransactionCommand ();
1559
1652
saved_ctx = MemoryContextSwitchTo (TopMemoryContext );
0 commit comments