Skip to content

Commit ac8f2e1

Browse files
committed
In walreceiver, don't try to do ereport() in a signal handler.
This is quite unsafe, even for the case of ereport(FATAL) where we won't return control to the interrupted code, and despite this code's use of a flag to restrict the areas where we'd try to do it. It's possible for example that we interrupt malloc or free while that's holding a lock that's meant to protect against cross-thread interference. Then, any attempt to do malloc or free within ereport() will result in a deadlock, preventing the walreceiver process from exiting in response to SIGTERM. We hypothesize that this explains some hard-to-reproduce failures seen in the buildfarm. Hence, get rid of the immediate-exit code in WalRcvShutdownHandler, as well as the logic associated with WalRcvImmediateInterruptOK. Instead, we need to take care that potentially-blocking operations in the walreceiver's data transmission logic (libpqwalreceiver.c) will respond reasonably promptly to the process's latch becoming set and then call ProcessWalRcvInterrupts. Much of the needed code for that was already present in libpqwalreceiver.c. I refactored things a bit so that all the uses of PQgetResult use latch-aware waiting, but didn't need to do much more. These changes should be enough to ensure that libpqwalreceiver.c will respond promptly to SIGTERM whenever it's waiting to receive data. In principle, it could block for a long time while waiting to send data too, and this patch does nothing to guard against that. I think that that hazard is mostly theoretical though: such blocking should occur only if we fill the kernel's data transmission buffers, and we don't generally send enough data to make that happen without waiting for input. If we find out that the hazard isn't just theoretical, we could fix it by using PQsetnonblocking, but that would require more ticklish changes than I care to make now. Back-patch of commit a1a789e. This problem goes all the way back to the origins of walreceiver; but given the substantial reworking the module received during the v10 cycle, it seems unsafe to assume that our testing on HEAD validates this patch for pre-v10 branches. And we'd need to back-patch some prerequisite patches (at least 597a87c and its followups, maybe other things), increasing the risk of problems. Given the dearth of field reports matching this problem, it's not worth much risk. Hence back-patch to v10 and v11 only. Patch by me; thanks to Thomas Munro for review. Discussion: https://postgr.es/m/20190416070119.GK2673@paquier.xyz
1 parent 2981e5a commit ac8f2e1

File tree

3 files changed

+89
-105
lines changed

3 files changed

+89
-105
lines changed

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 71 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
9595

9696
/* Prototypes for private functions */
9797
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
98+
static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
9899
static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
99100

100101
/*
@@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
196197
if (rc & WL_LATCH_SET)
197198
{
198199
ResetLatch(MyLatch);
199-
CHECK_FOR_INTERRUPTS();
200+
ProcessWalRcvInterrupts();
200201
}
201202

202203
/* If socket is ready, advance the libpq state machine */
@@ -427,6 +428,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
427428
{
428429
PGresult *res;
429430

431+
/*
432+
* Send copy-end message. As in libpqrcv_PQexec, this could theoretically
433+
* block, but the risk seems small.
434+
*/
430435
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
431436
PQflush(conn->streamConn))
432437
ereport(ERROR,
@@ -443,7 +448,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
443448
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
444449
* also possible in case we aborted the copy in mid-stream.
445450
*/
446-
res = PQgetResult(conn->streamConn);
451+
res = libpqrcv_PQgetResult(conn->streamConn);
447452
if (PQresultStatus(res) == PGRES_TUPLES_OK)
448453
{
449454
/*
@@ -457,7 +462,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
457462
PQclear(res);
458463

459464
/* the result set should be followed by CommandComplete */
460-
res = PQgetResult(conn->streamConn);
465+
res = libpqrcv_PQgetResult(conn->streamConn);
461466
}
462467
else if (PQresultStatus(res) == PGRES_COPY_OUT)
463468
{
@@ -470,7 +475,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
470475
pchomp(PQerrorMessage(conn->streamConn)))));
471476

472477
/* CommandComplete should follow */
473-
res = PQgetResult(conn->streamConn);
478+
res = libpqrcv_PQgetResult(conn->streamConn);
474479
}
475480

476481
if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -480,7 +485,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
480485
PQclear(res);
481486

482487
/* Verify that there are no more results */
483-
res = PQgetResult(conn->streamConn);
488+
res = libpqrcv_PQgetResult(conn->streamConn);
484489
if (res != NULL)
485490
ereport(ERROR,
486491
(errmsg("unexpected result after CommandComplete: %s",
@@ -543,12 +548,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
543548
* The function is modeled on PQexec() in libpq, but only implements
544549
* those parts that are in use in the walreceiver api.
545550
*
546-
* Queries are always executed on the connection in streamConn.
551+
* May return NULL, rather than an error result, on failure.
547552
*/
548553
static PGresult *
549554
libpqrcv_PQexec(PGconn *streamConn, const char *query)
550555
{
551-
PGresult *result = NULL;
552556
PGresult *lastResult = NULL;
553557

554558
/*
@@ -559,64 +563,26 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
559563
*/
560564

561565
/*
562-
* Submit a query. Since we don't use non-blocking mode, this also can
563-
* block. But its risk is relatively small, so we ignore that for now.
566+
* Submit the query. Since we don't use non-blocking mode, this could
567+
* theoretically block. In practice, since we don't send very long query
568+
* strings, the risk seems negligible.
564569
*/
565570
if (!PQsendQuery(streamConn, query))
566571
return NULL;
567572

568573
for (;;)
569574
{
570-
/*
571-
* Receive data until PQgetResult is ready to get the result without
572-
* blocking.
573-
*/
574-
while (PQisBusy(streamConn))
575-
{
576-
int rc;
577-
578-
/*
579-
* We don't need to break down the sleep into smaller increments,
580-
* since we'll get interrupted by signals and can either handle
581-
* interrupts here or elog(FATAL) within SIGTERM signal handler if
582-
* the signal arrives in the middle of establishment of
583-
* replication connection.
584-
*/
585-
rc = WaitLatchOrSocket(MyLatch,
586-
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
587-
WL_LATCH_SET,
588-
PQsocket(streamConn),
589-
0,
590-
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
591-
592-
/* Emergency bailout? */
593-
if (rc & WL_POSTMASTER_DEATH)
594-
exit(1);
595-
596-
/* Interrupted? */
597-
if (rc & WL_LATCH_SET)
598-
{
599-
ResetLatch(MyLatch);
600-
CHECK_FOR_INTERRUPTS();
601-
}
575+
/* Wait for, and collect, the next PGresult. */
576+
PGresult *result;
602577

603-
/* Consume whatever data is available from the socket */
604-
if (PQconsumeInput(streamConn) == 0)
605-
{
606-
/* trouble; drop whatever we had and return NULL */
607-
PQclear(lastResult);
608-
return NULL;
609-
}
610-
}
578+
result = libpqrcv_PQgetResult(streamConn);
579+
if (result == NULL)
580+
break; /* query is complete, or failure */
611581

612582
/*
613583
* Emulate PQexec()'s behavior of returning the last result when there
614584
* are many. We are fine with returning just last error message.
615585
*/
616-
result = PQgetResult(streamConn);
617-
if (result == NULL)
618-
break; /* query is complete */
619-
620586
PQclear(lastResult);
621587
lastResult = result;
622588

@@ -630,6 +596,55 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
630596
return lastResult;
631597
}
632598

599+
/*
600+
* Perform the equivalent of PQgetResult(), but watch for interrupts.
601+
*/
602+
static PGresult *
603+
libpqrcv_PQgetResult(PGconn *streamConn)
604+
{
605+
/*
606+
* Collect data until PQgetResult is ready to get the result without
607+
* blocking.
608+
*/
609+
while (PQisBusy(streamConn))
610+
{
611+
int rc;
612+
613+
/*
614+
* We don't need to break down the sleep into smaller increments,
615+
* since we'll get interrupted by signals and can handle any
616+
* interrupts here.
617+
*/
618+
rc = WaitLatchOrSocket(MyLatch,
619+
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
620+
WL_LATCH_SET,
621+
PQsocket(streamConn),
622+
0,
623+
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
624+
625+
/* Emergency bailout? */
626+
if (rc & WL_POSTMASTER_DEATH)
627+
exit(1);
628+
629+
/* Interrupted? */
630+
if (rc & WL_LATCH_SET)
631+
{
632+
ResetLatch(MyLatch);
633+
ProcessWalRcvInterrupts();
634+
}
635+
636+
/* Consume whatever data is available from the socket */
637+
if (PQconsumeInput(streamConn) == 0)
638+
{
639+
/* trouble; return NULL */
640+
return NULL;
641+
}
642+
}
643+
644+
/* Now we can collect and return the next PGresult */
645+
return PQgetResult(streamConn);
646+
}
647+
633648
/*
634649
* Disconnect connection to primary, if any.
635650
*/
@@ -691,13 +706,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
691706
{
692707
PGresult *res;
693708

694-
res = PQgetResult(conn->streamConn);
709+
res = libpqrcv_PQgetResult(conn->streamConn);
695710
if (PQresultStatus(res) == PGRES_COMMAND_OK)
696711
{
697712
PQclear(res);
698713

699714
/* Verify that there are no more results. */
700-
res = PQgetResult(conn->streamConn);
715+
res = libpqrcv_PQgetResult(conn->streamConn);
701716
if (res != NULL)
702717
{
703718
PQclear(res);
@@ -861,7 +876,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
861876
{
862877
char *cstrs[MaxTupleAttributeNumber];
863878

864-
CHECK_FOR_INTERRUPTS();
879+
ProcessWalRcvInterrupts();
865880

866881
/* Do the allocations in temporary context. */
867882
oldcontext = MemoryContextSwitchTo(rowcontext);

src/backend/replication/walreceiver.c

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -111,28 +111,7 @@ static struct
111111
static StringInfoData reply_message;
112112
static StringInfoData incoming_message;
113113

114-
/*
115-
* About SIGTERM handling:
116-
*
117-
* We can't just exit(1) within SIGTERM signal handler, because the signal
118-
* might arrive in the middle of some critical operation, like while we're
119-
* holding a spinlock. We also can't just set a flag in signal handler and
120-
* check it in the main loop, because we perform some blocking operations
121-
* like libpqrcv_PQexec(), which can take a long time to finish.
122-
*
123-
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
124-
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
125-
* sets got_SIGTERM flag, which is checked in the main loop when convenient.
126-
*
127-
* This is very much like what regular backends do with ImmediateInterruptOK,
128-
* ProcessInterrupts() etc.
129-
*/
130-
static volatile bool WalRcvImmediateInterruptOK = false;
131-
132114
/* Prototypes for private functions */
133-
static void ProcessWalRcvInterrupts(void);
134-
static void EnableWalRcvImmediateExit(void);
135-
static void DisableWalRcvImmediateExit(void);
136115
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
137116
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
138117
static void WalRcvDie(int code, Datum arg);
@@ -150,7 +129,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
150129
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
151130

152131

153-
static void
132+
/*
133+
* Process any interrupts the walreceiver process may have received.
134+
* This should be called any time the process's latch has become set.
135+
*
136+
* Currently, only SIGTERM is of interest. We can't just exit(1) within the
137+
* SIGTERM signal handler, because the signal might arrive in the middle of
138+
* some critical operation, like while we're holding a spinlock. Instead, the
139+
* signal handler sets a flag variable as well as setting the process's latch.
140+
* We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
141+
* latch has become set. Operations that could block for a long time, such as
142+
* reading from a remote server, must pay attention to the latch too; see
143+
* libpqrcv_PQgetResult for example.
144+
*/
145+
void
154146
ProcessWalRcvInterrupts(void)
155147
{
156148
/*
@@ -162,26 +154,12 @@ ProcessWalRcvInterrupts(void)
162154

163155
if (got_SIGTERM)
164156
{
165-
WalRcvImmediateInterruptOK = false;
166157
ereport(FATAL,
167158
(errcode(ERRCODE_ADMIN_SHUTDOWN),
168159
errmsg("terminating walreceiver process due to administrator command")));
169160
}
170161
}
171162

172-
static void
173-
EnableWalRcvImmediateExit(void)
174-
{
175-
WalRcvImmediateInterruptOK = true;
176-
ProcessWalRcvInterrupts();
177-
}
178-
179-
static void
180-
DisableWalRcvImmediateExit(void)
181-
{
182-
WalRcvImmediateInterruptOK = false;
183-
ProcessWalRcvInterrupts();
184-
}
185163

186164
/* Main entry point for walreceiver process */
187165
void
@@ -299,12 +277,10 @@ WalReceiverMain(void)
299277
PG_SETMASK(&UnBlockSig);
300278

301279
/* Establish the connection to the primary for XLOG streaming */
302-
EnableWalRcvImmediateExit();
303280
wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
304281
if (!wrconn)
305282
ereport(ERROR,
306283
(errmsg("could not connect to the primary server: %s", err)));
307-
DisableWalRcvImmediateExit();
308284

309285
/*
310286
* Save user-visible connection string. This clobbers the original
@@ -333,7 +309,6 @@ WalReceiverMain(void)
333309
* Check that we're connected to a valid server using the
334310
* IDENTIFY_SYSTEM replication command.
335311
*/
336-
EnableWalRcvImmediateExit();
337312
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
338313
&server_version);
339314

@@ -346,7 +321,6 @@ WalReceiverMain(void)
346321
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
347322
primary_sysid, standby_sysid)));
348323
}
349-
DisableWalRcvImmediateExit();
350324

351325
/*
352326
* Confirm that the current timeline of the primary is the same or
@@ -507,6 +481,8 @@ WalReceiverMain(void)
507481
if (rc & WL_LATCH_SET)
508482
{
509483
ResetLatch(walrcv->latch);
484+
ProcessWalRcvInterrupts();
485+
510486
if (walrcv->force_reply)
511487
{
512488
/*
@@ -584,9 +560,7 @@ WalReceiverMain(void)
584560
* The backend finished streaming. Exit streaming COPY-mode from
585561
* our side, too.
586562
*/
587-
EnableWalRcvImmediateExit();
588563
walrcv_endstreaming(wrconn, &primaryTLI);
589-
DisableWalRcvImmediateExit();
590564

591565
/*
592566
* If the server had switched to a new timeline that we didn't
@@ -740,9 +714,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
740714
(errmsg("fetching timeline history file for timeline %u from primary server",
741715
tli)));
742716

743-
EnableWalRcvImmediateExit();
744717
walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
745-
DisableWalRcvImmediateExit();
746718

747719
/*
748720
* Check that the filename on the master matches what we
@@ -819,7 +791,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
819791
errno = save_errno;
820792
}
821793

822-
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
794+
/* SIGTERM: set flag for ProcessWalRcvInterrupts */
823795
static void
824796
WalRcvShutdownHandler(SIGNAL_ARGS)
825797
{
@@ -830,10 +802,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
830802
if (WalRcv->latch)
831803
SetLatch(WalRcv->latch);
832804

833-
/* Don't joggle the elbow of proc_exit */
834-
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
835-
ProcessWalRcvInterrupts();
836-
837805
errno = save_errno;
838806
}
839807

src/include/replication/walreceiver.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ walrcv_clear_result(WalRcvExecResult *walres)
285285

286286
/* prototypes for functions in walreceiver.c */
287287
extern void WalReceiverMain(void) pg_attribute_noreturn();
288+
extern void ProcessWalRcvInterrupts(void);
288289

289290
/* prototypes for functions in walreceiverfuncs.c */
290291
extern Size WalRcvShmemSize(void);

0 commit comments

Comments
 (0)