@@ -640,6 +640,18 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
640
640
641
641
PQclear (res );
642
642
643
+ /*
644
+ * Switch to a new WAL segment. It is necessary to get archived WAL
645
+ * segment, which includes start LSN of current backup.
646
+ *
647
+ * Do not switch for standby node and if backup is stream.
648
+ */
649
+ if (!stream_wal )
650
+ pg_switch_wal (conn );
651
+ /* Wait for start_lsn to be received by replica */
652
+ if (from_replica )
653
+ wait_replica_wal_lsn (backup -> start_lsn , true);
654
+
643
655
if (!stream_wal )
644
656
/*
645
657
* Do not wait start_lsn for stream backup.
@@ -658,16 +670,15 @@ pg_switch_wal(PGconn *conn)
658
670
PGresult * res ;
659
671
660
672
/* Remove annoying NOTICE messages generated by backend */
661
- res = pgut_execute (conn , "SET client_min_messages = warning;" , 0 ,
662
- NULL );
673
+ res = pgut_execute (conn , "SET client_min_messages = warning;" , 0 , NULL );
663
674
PQclear (res );
664
675
665
676
if (server_version >= 100000 )
666
677
res = pgut_execute (conn , "SELECT * FROM pg_switch_wal()" , 0 ,
667
- NULL );
678
+ NULL );
668
679
else
669
680
res = pgut_execute (conn , "SELECT * FROM pg_switch_xlog()" , 0 ,
670
- NULL );
681
+ NULL );
671
682
672
683
PQclear (res );
673
684
}
@@ -908,7 +919,7 @@ wait_wal_lsn(XLogRecPtr lsn)
908
919
}
909
920
910
921
/*
911
- * Wait for target 'lsn' on replica instance.
922
+ * Wait for target 'lsn' on replica instance from master .
912
923
*/
913
924
static void
914
925
wait_replica_wal_lsn (XLogRecPtr lsn , bool is_start_backup )
@@ -973,6 +984,7 @@ wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup)
973
984
static void
974
985
pg_stop_backup (pgBackup * backup )
975
986
{
987
+ PGconn * conn ;
976
988
PGresult * res ;
977
989
uint32 xlogid ;
978
990
uint32 xrecoff ;
@@ -990,8 +1002,11 @@ pg_stop_backup(pgBackup *backup)
990
1002
if (!backup_in_progress )
991
1003
elog (FATAL , "backup is not in progress" );
992
1004
1005
+ /* For replica we call pg_stop_backup() on master */
1006
+ conn = (from_replica ) ? master_conn : backup_conn ;
1007
+
993
1008
/* Remove annoying NOTICE messages generated by backend */
994
- res = pgut_execute (backup_conn , "SET client_min_messages = warning;" ,
1009
+ res = pgut_execute (conn , "SET client_min_messages = warning;" ,
995
1010
0 , NULL );
996
1011
PQclear (res );
997
1012
@@ -1005,69 +1020,16 @@ pg_stop_backup(pgBackup *backup)
1005
1020
backup_id = base36enc (backup -> start_time );
1006
1021
1007
1022
if (!from_replica )
1008
- {
1009
1023
snprintf (name , lengthof (name ), "pg_probackup, backup_id %s" ,
1010
1024
backup_id );
1011
- params [0 ] = name ;
1012
-
1013
- res = pgut_execute (backup_conn , "SELECT pg_create_restore_point($1)" ,
1014
- 1 , params );
1015
- PQclear (res );
1016
- }
1017
1025
else
1018
- {
1019
- uint32 try_count = 0 ;
1020
-
1021
1026
snprintf (name , lengthof (name ), "pg_probackup, backup_id %s. Replica Backup" ,
1022
1027
backup_id );
1023
- params [0 ] = name ;
1028
+ params [0 ] = name ;
1024
1029
1025
- res = pgut_execute (master_conn , "SELECT pg_create_restore_point($1)" ,
1026
- 1 , params );
1027
- /* Extract timeline and LSN from result */
1028
- XLogDataFromLSN (PQgetvalue (res , 0 , 0 ), & xlogid , & xrecoff );
1029
- /* Calculate LSN */
1030
- restore_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
1031
- PQclear (res );
1032
-
1033
- /* Switch WAL on master to retreive restore_lsn */
1034
- pg_switch_wal (master_conn );
1035
-
1036
- /* Wait for restore_lsn from master */
1037
- while (true)
1038
- {
1039
- XLogRecPtr min_recovery_lsn ;
1040
-
1041
- res = pgut_execute (backup_conn , "SELECT min_recovery_end_location from pg_control_recovery()" ,
1042
- 0 , NULL );
1043
- /* Extract timeline and LSN from result */
1044
- XLogDataFromLSN (PQgetvalue (res , 0 , 0 ), & xlogid , & xrecoff );
1045
- /* Calculate LSN */
1046
- min_recovery_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
1047
- PQclear (res );
1048
-
1049
- /* restore_lsn was streamed and applied to the replica */
1050
- if (min_recovery_lsn >= restore_lsn )
1051
- break ;
1052
-
1053
- sleep (1 );
1054
- if (interrupted )
1055
- elog (ERROR , "Interrupted during waiting for restore point LSN" );
1056
- try_count ++ ;
1057
-
1058
- /* Inform user if restore_lsn is absent in first attempt */
1059
- if (try_count == 1 )
1060
- elog (INFO , "Wait for restore point LSN %X/%X to be streamed "
1061
- "to replica" ,
1062
- (uint32 ) (restore_lsn >> 32 ), (uint32 ) restore_lsn );
1063
-
1064
- if (replica_timeout > 0 && try_count > replica_timeout )
1065
- elog (ERROR , "Restore point LSN %X/%X could not be "
1066
- "streamed to replica in %d seconds" ,
1067
- (uint32 ) (restore_lsn >> 32 ), (uint32 ) restore_lsn ,
1068
- replica_timeout );
1069
- }
1070
- }
1030
+ res = pgut_execute (conn , "SELECT pg_create_restore_point($1)" ,
1031
+ 1 , params );
1032
+ PQclear (res );
1071
1033
1072
1034
pfree (backup_id );
1073
1035
}
@@ -1084,13 +1046,13 @@ pg_stop_backup(pgBackup *backup)
1084
1046
* pg_stop_backup(false) copy of the backup label and tablespace map
1085
1047
* so they can be written to disk by the caller.
1086
1048
*/
1087
- sent = pgut_send (backup_conn ,
1049
+ sent = pgut_send (conn ,
1088
1050
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
1089
1051
" current_timestamp(0)::timestamp"
1090
1052
" FROM pg_stop_backup(false)" ,
1091
1053
0 , NULL , WARNING );
1092
1054
else
1093
- sent = pgut_send (backup_conn ,
1055
+ sent = pgut_send (conn ,
1094
1056
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
1095
1057
" current_timestamp(0)::timestamp"
1096
1058
" FROM pg_stop_backup()" ,
@@ -1108,30 +1070,30 @@ pg_stop_backup(pgBackup *backup)
1108
1070
1109
1071
while (1 )
1110
1072
{
1111
- if (!PQconsumeInput (backup_conn ) || PQisBusy (backup_conn ))
1073
+ if (!PQconsumeInput (conn ) || PQisBusy (conn ))
1112
1074
{
1113
- pg_stop_backup_timeout ++ ;
1114
- sleep (1 );
1075
+ pg_stop_backup_timeout ++ ;
1076
+ sleep (1 );
1115
1077
1116
- if (interrupted )
1117
- {
1118
- pgut_cancel (backup_conn );
1119
- elog (ERROR , "interrupted during waiting for pg_stop_backup" );
1120
- }
1121
- /*
1122
- * If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1123
- * send an interrupt.
1124
- */
1125
- if (pg_stop_backup_timeout > PG_STOP_BACKUP_TIMEOUT )
1126
- {
1127
- pgut_cancel (backup_conn );
1128
- elog (ERROR , "pg_stop_backup doesn't answer in %d seconds, cancel it" ,
1129
- PG_STOP_BACKUP_TIMEOUT );
1130
- }
1078
+ if (interrupted )
1079
+ {
1080
+ pgut_cancel (conn );
1081
+ elog (ERROR , "interrupted during waiting for pg_stop_backup" );
1082
+ }
1083
+ /*
1084
+ * If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1085
+ * send an interrupt.
1086
+ */
1087
+ if (pg_stop_backup_timeout > PG_STOP_BACKUP_TIMEOUT )
1088
+ {
1089
+ pgut_cancel (conn );
1090
+ elog (ERROR , "pg_stop_backup doesn't answer in %d seconds, cancel it" ,
1091
+ PG_STOP_BACKUP_TIMEOUT );
1092
+ }
1131
1093
}
1132
1094
else
1133
1095
{
1134
- res = PQgetResult (backup_conn );
1096
+ res = PQgetResult (conn );
1135
1097
break ;
1136
1098
}
1137
1099
}
@@ -1228,22 +1190,22 @@ pg_stop_backup(pgBackup *backup)
1228
1190
if (sscanf (PQgetvalue (res , 0 , 3 ), XID_FMT , & recovery_xid ) != 1 )
1229
1191
elog (ERROR ,
1230
1192
"result of txid_snapshot_xmax() is invalid: %s" ,
1231
- PQerrorMessage (backup_conn ));
1193
+ PQerrorMessage (conn ));
1232
1194
if (!parse_time (PQgetvalue (res , 0 , 4 ), & recovery_time ))
1233
1195
elog (ERROR ,
1234
1196
"result of current_timestamp is invalid: %s" ,
1235
- PQerrorMessage (backup_conn ));
1197
+ PQerrorMessage (conn ));
1236
1198
}
1237
1199
else
1238
1200
{
1239
1201
if (sscanf (PQgetvalue (res , 0 , 1 ), XID_FMT , & recovery_xid ) != 1 )
1240
1202
elog (ERROR ,
1241
1203
"result of txid_snapshot_xmax() is invalid: %s" ,
1242
- PQerrorMessage (backup_conn ));
1204
+ PQerrorMessage (conn ));
1243
1205
if (!parse_time (PQgetvalue (res , 0 , 2 ), & recovery_time ))
1244
1206
elog (ERROR ,
1245
1207
"result of current_timestamp is invalid: %s" ,
1246
- PQerrorMessage (backup_conn ));
1208
+ PQerrorMessage (conn ));
1247
1209
}
1248
1210
1249
1211
PQclear (res );
@@ -1258,6 +1220,9 @@ pg_stop_backup(pgBackup *backup)
1258
1220
char * xlog_path ,
1259
1221
stream_xlog_path [MAXPGPATH ];
1260
1222
1223
+ /* Wait for stop_lsn to be received by replica */
1224
+ if (from_replica )
1225
+ wait_replica_wal_lsn (stop_backup_lsn , false);
1261
1226
/*
1262
1227
* Wait for stop_lsn to be archived or streamed.
1263
1228
* We wait for stop_lsn in stream mode just in case.
0 commit comments