@@ -640,6 +640,18 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
640640
641641 PQclear (res );
642642
643+ /*
644+ * Switch to a new WAL segment. It is necessary to get archived WAL
645+ * segment, which includes start LSN of current backup.
646+ *
647+ * Do not switch for standby node and if backup is stream.
648+ */
649+ if (!stream_wal )
650+ pg_switch_wal (conn );
651+ /* Wait for start_lsn to be received by replica */
652+ if (from_replica )
653+ wait_replica_wal_lsn (backup -> start_lsn , true);
654+
643655 if (!stream_wal )
644656 /*
645657 * Do not wait start_lsn for stream backup.
@@ -658,16 +670,15 @@ pg_switch_wal(PGconn *conn)
658670 PGresult * res ;
659671
660672 /* Remove annoying NOTICE messages generated by backend */
661- res = pgut_execute (conn , "SET client_min_messages = warning;" , 0 ,
662- NULL );
673+ res = pgut_execute (conn , "SET client_min_messages = warning;" , 0 , NULL );
663674 PQclear (res );
664675
665676 if (server_version >= 100000 )
666677 res = pgut_execute (conn , "SELECT * FROM pg_switch_wal()" , 0 ,
667- NULL );
678+ NULL );
668679 else
669680 res = pgut_execute (conn , "SELECT * FROM pg_switch_xlog()" , 0 ,
670- NULL );
681+ NULL );
671682
672683 PQclear (res );
673684}
@@ -908,7 +919,7 @@ wait_wal_lsn(XLogRecPtr lsn)
908919}
909920
910921/*
911- * Wait for target 'lsn' on replica instance.
922+ * Wait for target 'lsn' on replica instance from master .
912923 */
913924static void
914925wait_replica_wal_lsn (XLogRecPtr lsn , bool is_start_backup )
@@ -973,6 +984,7 @@ wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup)
973984static void
974985pg_stop_backup (pgBackup * backup )
975986{
987+ PGconn * conn ;
976988 PGresult * res ;
977989 uint32 xlogid ;
978990 uint32 xrecoff ;
@@ -990,8 +1002,11 @@ pg_stop_backup(pgBackup *backup)
9901002 if (!backup_in_progress )
9911003 elog (FATAL , "backup is not in progress" );
9921004
1005+ /* For replica we call pg_stop_backup() on master */
1006+ conn = (from_replica ) ? master_conn : backup_conn ;
1007+
9931008 /* Remove annoying NOTICE messages generated by backend */
994- res = pgut_execute (backup_conn , "SET client_min_messages = warning;" ,
1009+ res = pgut_execute (conn , "SET client_min_messages = warning;" ,
9951010 0 , NULL );
9961011 PQclear (res );
9971012
@@ -1005,69 +1020,16 @@ pg_stop_backup(pgBackup *backup)
10051020 backup_id = base36enc (backup -> start_time );
10061021
10071022 if (!from_replica )
1008- {
10091023 snprintf (name , lengthof (name ), "pg_probackup, backup_id %s" ,
10101024 backup_id );
1011- params [0 ] = name ;
1012-
1013- res = pgut_execute (backup_conn , "SELECT pg_create_restore_point($1)" ,
1014- 1 , params );
1015- PQclear (res );
1016- }
10171025 else
1018- {
1019- uint32 try_count = 0 ;
1020-
10211026 snprintf (name , lengthof (name ), "pg_probackup, backup_id %s. Replica Backup" ,
10221027 backup_id );
1023- params [0 ] = name ;
1028+ params [0 ] = name ;
10241029
1025- res = pgut_execute (master_conn , "SELECT pg_create_restore_point($1)" ,
1026- 1 , params );
1027- /* Extract timeline and LSN from result */
1028- XLogDataFromLSN (PQgetvalue (res , 0 , 0 ), & xlogid , & xrecoff );
1029- /* Calculate LSN */
1030- restore_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
1031- PQclear (res );
1032-
1033- /* Switch WAL on master to retreive restore_lsn */
1034- pg_switch_wal (master_conn );
1035-
1036- /* Wait for restore_lsn from master */
1037- while (true)
1038- {
1039- XLogRecPtr min_recovery_lsn ;
1040-
1041- res = pgut_execute (backup_conn , "SELECT min_recovery_end_location from pg_control_recovery()" ,
1042- 0 , NULL );
1043- /* Extract timeline and LSN from result */
1044- XLogDataFromLSN (PQgetvalue (res , 0 , 0 ), & xlogid , & xrecoff );
1045- /* Calculate LSN */
1046- min_recovery_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
1047- PQclear (res );
1048-
1049- /* restore_lsn was streamed and applied to the replica */
1050- if (min_recovery_lsn >= restore_lsn )
1051- break ;
1052-
1053- sleep (1 );
1054- if (interrupted )
1055- elog (ERROR , "Interrupted during waiting for restore point LSN" );
1056- try_count ++ ;
1057-
1058- /* Inform user if restore_lsn is absent in first attempt */
1059- if (try_count == 1 )
1060- elog (INFO , "Wait for restore point LSN %X/%X to be streamed "
1061- "to replica" ,
1062- (uint32 ) (restore_lsn >> 32 ), (uint32 ) restore_lsn );
1063-
1064- if (replica_timeout > 0 && try_count > replica_timeout )
1065- elog (ERROR , "Restore point LSN %X/%X could not be "
1066- "streamed to replica in %d seconds" ,
1067- (uint32 ) (restore_lsn >> 32 ), (uint32 ) restore_lsn ,
1068- replica_timeout );
1069- }
1070- }
1030+ res = pgut_execute (conn , "SELECT pg_create_restore_point($1)" ,
1031+ 1 , params );
1032+ PQclear (res );
10711033
10721034 pfree (backup_id );
10731035 }
@@ -1084,13 +1046,13 @@ pg_stop_backup(pgBackup *backup)
10841046 * pg_stop_backup(false) copy of the backup label and tablespace map
10851047 * so they can be written to disk by the caller.
10861048 */
1087- sent = pgut_send (backup_conn ,
1049+ sent = pgut_send (conn ,
10881050 "SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
10891051 " current_timestamp(0)::timestamp"
10901052 " FROM pg_stop_backup(false)" ,
10911053 0 , NULL , WARNING );
10921054 else
1093- sent = pgut_send (backup_conn ,
1055+ sent = pgut_send (conn ,
10941056 "SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
10951057 " current_timestamp(0)::timestamp"
10961058 " FROM pg_stop_backup()" ,
@@ -1108,30 +1070,30 @@ pg_stop_backup(pgBackup *backup)
11081070
11091071 while (1 )
11101072 {
1111- if (!PQconsumeInput (backup_conn ) || PQisBusy (backup_conn ))
1073+ if (!PQconsumeInput (conn ) || PQisBusy (conn ))
11121074 {
1113- pg_stop_backup_timeout ++ ;
1114- sleep (1 );
1075+ pg_stop_backup_timeout ++ ;
1076+ sleep (1 );
11151077
1116- if (interrupted )
1117- {
1118- pgut_cancel (backup_conn );
1119- elog (ERROR , "interrupted during waiting for pg_stop_backup" );
1120- }
1121- /*
1122- * If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1123- * send an interrupt.
1124- */
1125- if (pg_stop_backup_timeout > PG_STOP_BACKUP_TIMEOUT )
1126- {
1127- pgut_cancel (backup_conn );
1128- elog (ERROR , "pg_stop_backup doesn't answer in %d seconds, cancel it" ,
1129- PG_STOP_BACKUP_TIMEOUT );
1130- }
1078+ if (interrupted )
1079+ {
1080+ pgut_cancel (conn );
1081+ elog (ERROR , "interrupted during waiting for pg_stop_backup" );
1082+ }
1083+ /*
1084+ * If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
1085+ * send an interrupt.
1086+ */
1087+ if (pg_stop_backup_timeout > PG_STOP_BACKUP_TIMEOUT )
1088+ {
1089+ pgut_cancel (conn );
1090+ elog (ERROR , "pg_stop_backup doesn't answer in %d seconds, cancel it" ,
1091+ PG_STOP_BACKUP_TIMEOUT );
1092+ }
11311093 }
11321094 else
11331095 {
1134- res = PQgetResult (backup_conn );
1096+ res = PQgetResult (conn );
11351097 break ;
11361098 }
11371099 }
@@ -1228,22 +1190,22 @@ pg_stop_backup(pgBackup *backup)
12281190 if (sscanf (PQgetvalue (res , 0 , 3 ), XID_FMT , & recovery_xid ) != 1 )
12291191 elog (ERROR ,
12301192 "result of txid_snapshot_xmax() is invalid: %s" ,
1231- PQerrorMessage (backup_conn ));
1193+ PQerrorMessage (conn ));
12321194 if (!parse_time (PQgetvalue (res , 0 , 4 ), & recovery_time ))
12331195 elog (ERROR ,
12341196 "result of current_timestamp is invalid: %s" ,
1235- PQerrorMessage (backup_conn ));
1197+ PQerrorMessage (conn ));
12361198 }
12371199 else
12381200 {
12391201 if (sscanf (PQgetvalue (res , 0 , 1 ), XID_FMT , & recovery_xid ) != 1 )
12401202 elog (ERROR ,
12411203 "result of txid_snapshot_xmax() is invalid: %s" ,
1242- PQerrorMessage (backup_conn ));
1204+ PQerrorMessage (conn ));
12431205 if (!parse_time (PQgetvalue (res , 0 , 2 ), & recovery_time ))
12441206 elog (ERROR ,
12451207 "result of current_timestamp is invalid: %s" ,
1246- PQerrorMessage (backup_conn ));
1208+ PQerrorMessage (conn ));
12471209 }
12481210
12491211 PQclear (res );
@@ -1258,6 +1220,9 @@ pg_stop_backup(pgBackup *backup)
12581220 char * xlog_path ,
12591221 stream_xlog_path [MAXPGPATH ];
12601222
1223+ /* Wait for stop_lsn to be received by replica */
1224+ if (from_replica )
1225+ wait_replica_wal_lsn (stop_backup_lsn , false);
12611226 /*
12621227 * Wait for stop_lsn to be archived or streamed.
12631228 * We wait for stop_lsn in stream mode just in case.
0 commit comments