Skip to content

Commit 45fc4c3

Browse files
author
Arthur Zakirov
committed
Use wait_replica_wal_lsn() in pg_start_backup() and pg_stop_backup()
1 parent a0c7fcd commit 45fc4c3

File tree

1 file changed

+53
-88
lines changed

1 file changed

+53
-88
lines changed

src/backup.c

Lines changed: 53 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,18 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
640640

641641
PQclear(res);
642642

643+
/*
644+
* Switch to a new WAL segment. It is necessary to get archived WAL
645+
* segment, which includes start LSN of current backup.
646+
*
647+
* Do not switch for standby node and if backup is stream.
648+
*/
649+
if (!stream_wal)
650+
pg_switch_wal(conn);
651+
/* Wait for start_lsn to be received by replica */
652+
if (from_replica)
653+
wait_replica_wal_lsn(backup->start_lsn, true);
654+
643655
if (!stream_wal)
644656
/*
645657
* Do not wait start_lsn for stream backup.
@@ -658,16 +670,15 @@ pg_switch_wal(PGconn *conn)
658670
PGresult *res;
659671

660672
/* Remove annoying NOTICE messages generated by backend */
661-
res = pgut_execute(conn, "SET client_min_messages = warning;", 0,
662-
NULL);
673+
res = pgut_execute(conn, "SET client_min_messages = warning;", 0, NULL);
663674
PQclear(res);
664675

665676
if (server_version >= 100000)
666677
res = pgut_execute(conn, "SELECT * FROM pg_switch_wal()", 0,
667-
NULL);
678+
NULL);
668679
else
669680
res = pgut_execute(conn, "SELECT * FROM pg_switch_xlog()", 0,
670-
NULL);
681+
NULL);
671682

672683
PQclear(res);
673684
}
@@ -908,7 +919,7 @@ wait_wal_lsn(XLogRecPtr lsn)
908919
}
909920

910921
/*
911-
* Wait for target 'lsn' on replica instance.
922+
* Wait for target 'lsn' on replica instance from master.
912923
*/
913924
static void
914925
wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup)
@@ -973,6 +984,7 @@ wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup)
973984
static void
974985
pg_stop_backup(pgBackup *backup)
975986
{
987+
PGconn *conn;
976988
PGresult *res;
977989
uint32 xlogid;
978990
uint32 xrecoff;
@@ -990,8 +1002,11 @@ pg_stop_backup(pgBackup *backup)
9901002
if (!backup_in_progress)
9911003
elog(FATAL, "backup is not in progress");
9921004

1005+
/* For replica we call pg_stop_backup() on master */
1006+
conn = (from_replica) ? master_conn : backup_conn;
1007+
9931008
/* Remove annoying NOTICE messages generated by backend */
994-
res = pgut_execute(backup_conn, "SET client_min_messages = warning;",
1009+
res = pgut_execute(conn, "SET client_min_messages = warning;",
9951010
0, NULL);
9961011
PQclear(res);
9971012

@@ -1005,69 +1020,16 @@ pg_stop_backup(pgBackup *backup)
10051020
backup_id = base36enc(backup->start_time);
10061021

10071022
if (!from_replica)
1008-
{
10091023
snprintf(name, lengthof(name), "pg_probackup, backup_id %s",
10101024
backup_id);
1011-
params[0] = name;
1012-
1013-
res = pgut_execute(backup_conn, "SELECT pg_create_restore_point($1)",
1014-
1, params);
1015-
PQclear(res);
1016-
}
10171025
else
1018-
{
1019-
uint32 try_count = 0;
1020-
10211026
snprintf(name, lengthof(name), "pg_probackup, backup_id %s. Replica Backup",
10221027
backup_id);
1023-
params[0] = name;
1028+
params[0] = name;
10241029

1025-
res = pgut_execute(master_conn, "SELECT pg_create_restore_point($1)",
1026-
1, params);
1027-
/* Extract timeline and LSN from result */
1028-
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
1029-
/* Calculate LSN */
1030-
restore_lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
1031-
PQclear(res);
1032-
1033-
/* Switch WAL on master to retreive restore_lsn */
1034-
pg_switch_wal(master_conn);
1035-
1036-
/* Wait for restore_lsn from master */
1037-
while (true)
1038-
{
1039-
XLogRecPtr min_recovery_lsn;
1040-
1041-
res = pgut_execute(backup_conn, "SELECT min_recovery_end_location from pg_control_recovery()",
1042-
0, NULL);
1043-
/* Extract timeline and LSN from result */
1044-
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
1045-
/* Calculate LSN */
1046-
min_recovery_lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
1047-
PQclear(res);
1048-
1049-
/* restore_lsn was streamed and applied to the replica */
1050-
if (min_recovery_lsn >= restore_lsn)
1051-
break;
1052-
1053-
sleep(1);
1054-
if (interrupted)
1055-
elog(ERROR, "Interrupted during waiting for restore point LSN");
1056-
try_count++;
1057-
1058-
/* Inform user if restore_lsn is absent in first attempt */
1059-
if (try_count == 1)
1060-
elog(INFO, "Wait for restore point LSN %X/%X to be streamed "
1061-
"to replica",
1062-
(uint32) (restore_lsn >> 32), (uint32) restore_lsn);
1063-
1064-
if (replica_timeout > 0 && try_count > replica_timeout)
1065-
elog(ERROR, "Restore point LSN %X/%X could not be "
1066-
"streamed to replica in %d seconds",
1067-
(uint32) (restore_lsn >> 32), (uint32) restore_lsn,
1068-
replica_timeout);
1069-
}
1070-
}
1030+
res = pgut_execute(conn, "SELECT pg_create_restore_point($1)",
1031+
1, params);
1032+
PQclear(res);
10711033

10721034
pfree(backup_id);
10731035
}
@@ -1084,13 +1046,13 @@ pg_stop_backup(pgBackup *backup)
10841046
* pg_stop_backup(false) copy of the backup label and tablespace map
10851047
* so they can be written to disk by the caller.
10861048
*/
1087-
sent = pgut_send(backup_conn,
1049+
sent = pgut_send(conn,
10881050
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
10891051
" current_timestamp(0)::timestamp"
10901052
" FROM pg_stop_backup(false)",
10911053
0, NULL, WARNING);
10921054
else
1093-
sent = pgut_send(backup_conn,
1055+
sent = pgut_send(conn,
10941056
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
10951057
" current_timestamp(0)::timestamp"
10961058
" FROM pg_stop_backup()",
@@ -1108,30 +1070,30 @@ pg_stop_backup(pgBackup *backup)
11081070

11091071
while (1)
11101072
{
1111-
if (!PQconsumeInput(backup_conn) || PQisBusy(backup_conn))
1073+
if (!PQconsumeInput(conn) || PQisBusy(conn))
11121074
{
1113-
pg_stop_backup_timeout++;
1114-
sleep(1);
1075+
pg_stop_backup_timeout++;
1076+
sleep(1);
11151077

1116-
if (interrupted)
1117-
{
1118-
pgut_cancel(backup_conn);
1119-
elog(ERROR, "interrupted during waiting for pg_stop_backup");
1120-
}
1121-
/*
1122-
* If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1123-
* send an interrupt.
1124-
*/
1125-
if (pg_stop_backup_timeout > PG_STOP_BACKUP_TIMEOUT)
1126-
{
1127-
pgut_cancel(backup_conn);
1128-
elog(ERROR, "pg_stop_backup doesn't answer in %d seconds, cancel it",
1129-
PG_STOP_BACKUP_TIMEOUT);
1130-
}
1078+
if (interrupted)
1079+
{
1080+
pgut_cancel(conn);
1081+
elog(ERROR, "interrupted during waiting for pg_stop_backup");
1082+
}
1083+
/*
1084+
* If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1085+
* send an interrupt.
1086+
*/
1087+
if (pg_stop_backup_timeout > PG_STOP_BACKUP_TIMEOUT)
1088+
{
1089+
pgut_cancel(conn);
1090+
elog(ERROR, "pg_stop_backup doesn't answer in %d seconds, cancel it",
1091+
PG_STOP_BACKUP_TIMEOUT);
1092+
}
11311093
}
11321094
else
11331095
{
1134-
res = PQgetResult(backup_conn);
1096+
res = PQgetResult(conn);
11351097
break;
11361098
}
11371099
}
@@ -1228,22 +1190,22 @@ pg_stop_backup(pgBackup *backup)
12281190
if (sscanf(PQgetvalue(res, 0, 3), XID_FMT, &recovery_xid) != 1)
12291191
elog(ERROR,
12301192
"result of txid_snapshot_xmax() is invalid: %s",
1231-
PQerrorMessage(backup_conn));
1193+
PQerrorMessage(conn));
12321194
if (!parse_time(PQgetvalue(res, 0, 4), &recovery_time))
12331195
elog(ERROR,
12341196
"result of current_timestamp is invalid: %s",
1235-
PQerrorMessage(backup_conn));
1197+
PQerrorMessage(conn));
12361198
}
12371199
else
12381200
{
12391201
if (sscanf(PQgetvalue(res, 0, 1), XID_FMT, &recovery_xid) != 1)
12401202
elog(ERROR,
12411203
"result of txid_snapshot_xmax() is invalid: %s",
1242-
PQerrorMessage(backup_conn));
1204+
PQerrorMessage(conn));
12431205
if (!parse_time(PQgetvalue(res, 0, 2), &recovery_time))
12441206
elog(ERROR,
12451207
"result of current_timestamp is invalid: %s",
1246-
PQerrorMessage(backup_conn));
1208+
PQerrorMessage(conn));
12471209
}
12481210

12491211
PQclear(res);
@@ -1258,6 +1220,9 @@ pg_stop_backup(pgBackup *backup)
12581220
char *xlog_path,
12591221
stream_xlog_path[MAXPGPATH];
12601222

1223+
/* Wait for stop_lsn to be received by replica */
1224+
if (from_replica)
1225+
wait_replica_wal_lsn(stop_backup_lsn, false);
12611226
/*
12621227
* Wait for stop_lsn to be archived or streamed.
12631228
* We wait for stop_lsn in stream mode just in case.

0 commit comments

Comments
 (0)