diff --git a/CMakeLists.txt b/CMakeLists.txt index a90c0fb..d98f543 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -224,6 +224,10 @@ set(source_files src/binsrv/basic_storage_backend.hpp src/binsrv/basic_storage_backend.cpp + src/binsrv/binlog_file_metadata_fwd.hpp + src/binsrv/binlog_file_metadata.hpp + src/binsrv/binlog_file_metadata.cpp + src/binsrv/cout_logger.hpp src/binsrv/cout_logger.cpp diff --git a/mtr/binlog_streaming/include/set_up_binsrv_environment.inc b/mtr/binlog_streaming/include/set_up_binsrv_environment.inc index 3c08ae2..7c611b1 100644 --- a/mtr/binlog_streaming/include/set_up_binsrv_environment.inc +++ b/mtr/binlog_streaming/include/set_up_binsrv_environment.inc @@ -14,6 +14,7 @@ # --let $binsrv_tls_version = TLSv1.2 (optional) # --let $binsrv_idle_time = 10 # --let $binsrv_verify_checksum = TRUE | FALSE +# --let $binsrv_replication_mode = position | gtid # --let $binsrv_checkpoint_size = 2M (optional) # --let $binsrv_checkpoint_interval = 30s (optional) # --source set_up_binsrv_environment.inc @@ -82,7 +83,7 @@ eval SET @binsrv_config_json = JSON_OBJECT( 'server_id', @@server_id + 1, 'idle_time', $binsrv_idle_time, 'verify_checksum', $binsrv_verify_checksum, - 'mode', 'position' + 'mode', '$binsrv_replication_mode' ), 'storage', JSON_OBJECT( 'backend', '$storage_backend', diff --git a/mtr/binlog_streaming/r/resume_streaming.result b/mtr/binlog_streaming/r/resume_streaming.result new file mode 100644 index 0000000..cfce614 --- /dev/null +++ b/mtr/binlog_streaming/r/resume_streaming.result @@ -0,0 +1,105 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** 1. Executing the Binlog Server utility to for the very first +*** time on an empty storage and receive no real events. + +*** 1a. Executing the Binlog Server utility on a storage that has +*** only one binlog file that has only magic payload in it and +*** receive no events. + +*** Creating a simple table +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** 2. Executing the Binlog Server utility on a storage that has +*** only one binlog file that has only magic payload in it and +*** receive events from the CREATE TABLE transaction. + +*** 2a. Executing the Binlog Server utility on a storage that has +*** only one binlog file that has CREATE TABLE transaction and +*** receive no events. + +*** Filling the table with some data (one transaction with one +*** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +*** 3. Executing the Binlog Server utility on a storage that has +*** only one binlog file with some data and receive events from +*** the INSERT transactions. + +*** 3a. Executing the Binlog Server utility on a storage that has +*** only one binlog file with some data and receive no events. + +*** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +*** 4. Executing the Binlog Server utility on a storage that has +*** only one binlog file with some data and receive a single +*** ROTATE event. + +*** 4a. Executing the Binlog Server utility on a storage that has +*** one binlog file with some data and another one with just +*** magic payload and receive no events. + +*** Filling the table with some more data (one transaction with one +*** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +*** 5. Executing the Binlog Server utility on a storage that has +*** one binlog file with some data and another one with just +*** magic payload and receive events from the second group of +*** INSERT transactions. + +*** 5a. Executing the Binlog Server utility on a storage that has +*** two binlog files with some data and receive no events + +*** Flushing the second binary log and switching to the third one. +*** Immediately after that updating data inserted previously. +FLUSH BINARY LOGS; +UPDATE t1 SET id = id + 100; + +*** 6. Executing the Binlog Server utility on a storage that has +*** two binlog files with some data and receive a ROTATE event +*** followed by events from the UPDATE transaction + +*** 6a. Executing the Binlog Server utility on a storage that has +*** three binlog files with some data and receive no events. + +*** Flushing the third binary log and switching to the fourth one. +*** Immediately after that deleting some data updated previously and +*** flushing one more time switching to the fifth binary log file. +FLUSH BINARY LOGS; +DELETE FROM t1 WHERE id <= 103; +FLUSH BINARY LOGS; + +*** 7. Executing the Binlog Server utility on a storage that has +*** three binlog files with some data and receive a ROTATE +*** event followed by a events from the DELETE transaction +*** followed by another ROTATE event + +*** 7a. Executing the Binlog Server utility on a storage that has +*** five binlog files with some data and receive no events. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/binsrv.test b/mtr/binlog_streaming/t/binsrv.test index e10ef3c..a1c30e5 100644 --- a/mtr/binlog_streaming/t/binsrv.test +++ b/mtr/binlog_streaming/t/binsrv.test @@ -42,6 +42,7 @@ INSERT INTO t1 VALUES(DEFAULT); --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = $extracted_init_connect_variable_value +--let $binsrv_replication_mode = position --let $binsrv_checkpoint_size = 1 --source ../include/set_up_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/checkpointing.test b/mtr/binlog_streaming/t/checkpointing.test index da76476..e20bf54 100644 --- a/mtr/binlog_streaming/t/checkpointing.test +++ b/mtr/binlog_streaming/t/checkpointing.test @@ -58,7 +58,8 @@ DROP TABLE t1; --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE -# Enabling checkointing at about 40% of expected single binlog file size, so that it +--let $binsrv_replication_mode = position +# Enabling checkpointing at about 40% of expected single binlog file size, so that it # will happen twice before rotation. # Like wise, from the time point of view, make interval pretty low so that # at lease a few interval checkpointing events will happen. diff --git a/mtr/binlog_streaming/t/kill_and_restart.test b/mtr/binlog_streaming/t/kill_and_restart.test index 3580346..3f89348 100644 --- a/mtr/binlog_streaming/t/kill_and_restart.test +++ b/mtr/binlog_streaming/t/kill_and_restart.test @@ -130,6 +130,7 @@ INSERT INTO t1 VALUES(DEFAULT); --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position --source ../include/set_up_binsrv_environment.inc --echo diff --git a/mtr/binlog_streaming/t/pull_mode.test b/mtr/binlog_streaming/t/pull_mode.test index ef8f86e..e5c2ccb 100644 --- a/mtr/binlog_streaming/t/pull_mode.test +++ b/mtr/binlog_streaming/t/pull_mode.test @@ -19,6 +19,7 @@ eval $stmt_reset_binary_logs_and_gtids; --let $binsrv_read_timeout = 3 --let $binsrv_idle_time = 1 --let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position --source ../include/set_up_binsrv_environment.inc --echo diff --git a/mtr/binlog_streaming/t/resume_streaming.combinations b/mtr/binlog_streaming/t/resume_streaming.combinations new file mode 100644 index 0000000..fa1c14d --- /dev/null +++ b/mtr/binlog_streaming/t/resume_streaming.combinations @@ -0,0 +1,15 @@ +[position_buffered] +init-connect = SET @binsrv_buffering_mode = 'buffered' + +[position_unbuffered] +init-connect = SET @binsrv_buffering_mode = 'unbuffered' + +[gtid_buffered] +gtid-mode=on +enforce-gtid-consistency +init-connect = SET @binsrv_buffering_mode = 'buffered' + +[gtid_unbuffered] +gtid-mode=on +enforce-gtid-consistency +init-connect = SET @binsrv_buffering_mode = 'unbuffered' diff --git a/mtr/binlog_streaming/t/resume_streaming.test b/mtr/binlog_streaming/t/resume_streaming.test new file mode 100644 index 0000000..4b1ddb3 --- /dev/null +++ b/mtr/binlog_streaming/t/resume_streaming.test @@ -0,0 +1,161 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# identifying utility buffering mode from the conbination +--let $extracted_init_connect_variable_name = binsrv_buffering_mode +--source ../include/extract_init_connect_variable_value.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')` +if ($extracted_init_connect_variable_value == 'buffered') +{ + --let $binsrv_checkpoint_size = 1G +} +if ($extracted_init_connect_variable_value == 'unbuffered') +{ + --let $binsrv_checkpoint_size = 1 +} +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** 1. Executing the Binlog Server utility to for the very first +--echo *** time on an empty storage and receive no real events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 1a. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file that has only magic payload in it and +--echo *** receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Creating a simple table +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** 2. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file that has only magic payload in it and +--echo *** receive events from the CREATE TABLE transaction. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 2a. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file that has CREATE TABLE transaction and +--echo *** receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Filling the table with some data (one transaction with one +--echo *** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +--echo +--echo *** 3. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file with some data and receive events from +--echo *** the INSERT transactions. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 3a. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file with some data and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +--echo +--echo *** 4. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file with some data and receive a single +--echo *** ROTATE event. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 4a. Executing the Binlog Server utility on a storage that has +--echo *** one binlog file with some data and another one with just +--echo *** magic payload and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Filling the table with some more data (one transaction with one +--echo *** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +--echo +--echo *** 5. Executing the Binlog Server utility on a storage that has +--echo *** one binlog file with some data and another one with just +--echo *** magic payload and receive events from the second group of +--echo *** INSERT transactions. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 5a. Executing the Binlog Server utility on a storage that has +--echo *** two binlog files with some data and receive no events +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing the second binary log and switching to the third one. +--echo *** Immediately after that updating data inserted previously. +FLUSH BINARY LOGS; +UPDATE t1 SET id = id + 100; + +--echo +--echo *** 6. Executing the Binlog Server utility on a storage that has +--echo *** two binlog files with some data and receive a ROTATE event +--echo *** followed by events from the UPDATE transaction +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 6a. Executing the Binlog Server utility on a storage that has +--echo *** three binlog files with some data and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing the third binary log and switching to the fourth one. +--echo *** Immediately after that deleting some data updated previously and +--echo *** flushing one more time switching to the fifth binary log file. +FLUSH BINARY LOGS; +DELETE FROM t1 WHERE id <= 103; +FLUSH BINARY LOGS; + +--echo +--echo *** 7. Executing the Binlog Server utility on a storage that has +--echo *** three binlog files with some data and receive a ROTATE +--echo *** event followed by a events from the DELETE transaction +--echo *** followed by another ROTATE event +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 7a. Executing the Binlog Server utility on a storage that has +--echo *** five binlog files with some data and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/ssl_connection.test b/mtr/binlog_streaming/t/ssl_connection.test index b669fc6..8b59c29 100644 --- a/mtr/binlog_streaming/t/ssl_connection.test +++ b/mtr/binlog_streaming/t/ssl_connection.test @@ -36,6 +36,7 @@ eval GRANT REPLICATION SLAVE ON *.* TO '$x509_user_name'@'$localhost_ip'; --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position # a query that checks SSL connection status --let $ssl_status_query = SELECT IF(VARIABLE_VALUE = '', '', VARIABLE_VALUE) AS SSL_status FROM performance_schema.session_status WHERE VARIABLE_NAME = 'Ssl_version' diff --git a/src/app.cpp b/src/app.cpp index 35856ee..ed8a1be 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -355,66 +355,143 @@ void log_span_dump(binsrv::basic_logger &logger, } } -void process_binlog_event(const binsrv::event::event ¤t_event, - util::const_byte_span portion, - const binsrv::event::reader_context &context, - binsrv::storage &storage, bool &skip_open_binlog) { - const auto ¤t_common_header = current_event.get_common_header(); - const auto code = current_common_header.get_type_code(); - - const auto is_artificial{current_common_header.get_flags().has_element( - binsrv::event::common_header_flag_type::artificial)}; - const auto is_pseudo{current_common_header.get_next_event_position_raw() == - 0U}; - - if (code == binsrv::event::code_type::rotate && is_artificial) { - const auto ¤t_rotate_body = - current_event.get_body(); - if (skip_open_binlog) { - // we are supposed to get here after reconnection, so doing - // basic integrity checks - if (current_rotate_body.get_binlog() != - storage.get_current_binlog_name()) { - util::exception_location().raise( - "unexpected binlog name in artificial rotate event after " - "reconnection"); - } +void process_artificial_rotate_event(const binsrv::event::event ¤t_event, + binsrv::basic_logger &logger, + binsrv::event::reader_context &context, + binsrv::storage &storage) { + assert(current_event.get_common_header().get_type_code() == + binsrv::event::code_type::rotate); + assert(current_event.get_common_header().get_flags().has_element( + binsrv::event::common_header_flag_type::artificial)); + + const auto ¤t_rotate_body = + current_event.get_body(); + + bool binlog_opening_needed{true}; + + if (storage.is_binlog_open()) { + // here we take a "shortcut" path - upon losing connection to the MySQL + // server, we do not close storage's binlog file immediately expecting + // that upon reconnection we will be able to continue writing to the + // same file + + // so, here we just need to make sure that (binlog name, position) pair + // in the artificial ROTATE event matches the current storage state + + // also, in case when the server was not shut down properly, it won't + // have ROTATE or STOP event as the last one in the binlog, so here we + // handle this case by closing the old binlog and opening a new one + + if (current_rotate_body.get_binlog() == storage.get_current_binlog_name()) { + // in addition, in position-based replication mode we also need to check + // the position const auto ¤t_rotate_post_header = current_event.get_post_header(); if (current_rotate_post_header.get_position_raw() != storage.get_current_position()) { util::exception_location().raise( - "unexpected binlog position in artificial rotate event after " - "reconnection"); + "unexpected binlog position in artificial rotate event"); } - skip_open_binlog = false; + binlog_opening_needed = false; + // after reusing the existing storage binlog file, we should instruct + // the reader context to mark the upcoming FDE and PREVIOUS_GTIDS_LOG + // events as info-only + context.set_expect_ignorable_preamble_events(); + + const std::string current_binlog_name{storage.get_current_binlog_name()}; + logger.log(binsrv::log_severity::info, + "storage: reused already open binlog file: " + + current_binlog_name); + + } else { + // if names do not match, we need to close the currently open + // binlog and make sure that binlog_opening_needed is set to true, so + // that we will open a new one later + const std::string old_binlog_name{storage.get_current_binlog_name()}; + storage.close_binlog(); + logger.log(binsrv::log_severity::info, + "storage: closed binlog file left open: " + old_binlog_name); + // binlog_opening_needed remains true in this branch + assert(binlog_opening_needed); + } + } + if (binlog_opening_needed) { + const auto binlog_open_result{ + storage.open_binlog(current_rotate_body.get_binlog())}; + + // we also need to instruct the reader context that we opened an + // existing file (the one that was neither empty nor just had the + // magic payload writtent to it), so that it would mark the upcoming FDE + // and PREVIOUS_GTIDS_LOG events as info-only + + if (binlog_open_result == + binsrv::open_binlog_status::opened_with_data_present) { + context.set_expect_ignorable_preamble_events(); + } + + std::string message{"storage: "}; + if (binlog_open_result == binsrv::open_binlog_status::created) { + message += "created a new"; } else { - // in case when the server was not shut down properly, it won't have - // ROTATE or STOP event as the last one in the binlog, so here we - // handle this case by closing the old binlog and opening a new one - if (storage.is_binlog_open()) { - storage.close_binlog(); + message += "opened an existing"; + if (binlog_open_result == binsrv::open_binlog_status::opened_empty) { + message += " (empty)"; + } else if (binlog_open_result == + binsrv::open_binlog_status::opened_at_magic_paylod_offset) { + message += " (with magic payload only)"; } - storage.open_binlog(current_rotate_body.get_binlog()); } + message += " binlog file: "; + message += current_rotate_body.get_binlog(); + logger.log(binsrv::log_severity::info, message); } - if (!is_artificial && !is_pseudo) { - storage.write_event(portion, context.is_at_transaction_boundary()); +} + +void process_rotate_or_stop_event(binsrv::basic_logger &logger, + binsrv::storage &storage) { + const std::string old_binlog_name{storage.get_current_binlog_name()}; + storage.close_binlog(); + logger.log(binsrv::log_severity::info, + "storage: closed binlog file: " + old_binlog_name); +} + +void process_binlog_event(const binsrv::event::event ¤t_event, + util::const_byte_span portion, + binsrv::basic_logger &logger, + binsrv::event::reader_context &context, + binsrv::storage &storage) { + const auto ¤t_common_header = current_event.get_common_header(); + const auto code = current_common_header.get_type_code(); + + const auto is_artificial{current_common_header.get_flags().has_element( + binsrv::event::common_header_flag_type::artificial)}; + + // processing the very first event in the sequence - artificial ROTATE event + if (code == binsrv::event::code_type::rotate && is_artificial) { + process_artificial_rotate_event(current_event, logger, context, storage); + } + + // checking if the event needs to be written to the binlog + if (!context.is_event_info_only()) { + storage.write_event(portion, context.is_at_transaction_boundary(), + context.get_transaction_gtid()); } + + // processing the very last event in the sequence - either a non-artificial + // ROTATE event or a STOP event if ((code == binsrv::event::code_type::rotate && !is_artificial) || code == binsrv::event::code_type::stop) { - storage.close_binlog(); + process_rotate_or_stop_event(logger, storage); } } -void receive_binlog_events( - binsrv::operation_mode_type operation_mode, - const volatile std::atomic_flag &termination_flag, - binsrv::basic_logger &logger, const easymysql::library &mysql_lib, +bool open_connection_and_switch_to_replication( + binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger, + const easymysql::library &mysql_lib, const easymysql::connection_config &connection_config, - std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage) { - easymysql::connection connection{}; + std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage, + easymysql::connection &connection) { try { connection = mysql_lib.create_connection(connection_config); } catch (const easymysql::core_error &) { @@ -423,7 +500,7 @@ void receive_binlog_events( } logger.log(binsrv::log_severity::info, "unable to establish connection to mysql server"); - return; + return false; } logger.log(binsrv::log_severity::info, @@ -431,14 +508,24 @@ void receive_binlog_events( log_connection_info(logger, connection); - const auto replication_mode{storage.get_replication_mode()}; const auto blocking_mode{ operation_mode == binsrv::operation_mode_type::fetch ? easymysql::connection_replication_mode_type::non_blocking : easymysql::connection_replication_mode_type::blocking}; try { - if (replication_mode == binsrv::replication_mode_type::position) { + if (storage.is_in_gtid_replication_mode()) { + const auto >ids{storage.get_gtids()}; + const auto encoded_size{gtids.calculate_encoded_size()}; + + binsrv::gtids::gtid_set_storage encoded_gtids_buffer(encoded_size); + util::byte_span destination{encoded_gtids_buffer}; + gtids.encode_to(destination); + + connection.switch_to_gtid_replication( + server_id, util::const_byte_span{encoded_gtids_buffer}, + verify_checksum, blocking_mode); + } else { if (storage.has_current_binlog_name()) { connection.switch_to_position_replication( server_id, storage.get_current_binlog_name(), @@ -447,28 +534,32 @@ void receive_binlog_events( connection.switch_to_position_replication(server_id, verify_checksum, blocking_mode); } - } else { - const binsrv::gtids::gtid_set empty_gtids{}; - const auto encoded_size{empty_gtids.calculate_encoded_size()}; - - binsrv::gtids::gtid_set_storage buffer(encoded_size); - util::byte_span destination{buffer}; - empty_gtids.encode_to(destination); - - connection.switch_to_gtid_replication(server_id, - util::const_byte_span{buffer}, - verify_checksum, blocking_mode); } } catch (const easymysql::core_error &) { if (operation_mode == binsrv::operation_mode_type::fetch) { throw; } logger.log(binsrv::log_severity::info, "unable to switch to replication"); - return; + return false; } log_replication_info(logger, server_id, storage, verify_checksum, blocking_mode); + return true; +} + +void receive_binlog_events( + binsrv::operation_mode_type operation_mode, + const volatile std::atomic_flag &termination_flag, + binsrv::basic_logger &logger, const easymysql::library &mysql_lib, + const easymysql::connection_config &connection_config, + std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage) { + easymysql::connection connection{}; + if (!open_connection_and_switch_to_replication( + operation_mode, logger, mysql_lib, connection_config, server_id, + verify_checksum, storage, connection)) { + return; + } // Network streams are requested with COM_BINLOG_DUMP and // each Binlog Event response is prepended with 00 OK-byte. @@ -477,12 +568,9 @@ void receive_binlog_events( util::const_byte_span portion; binsrv::event::reader_context context{connection.get_server_version(), - verify_checksum, replication_mode}; + verify_checksum, + storage.get_replication_mode()}; - // if binlog is still open, there is no sense to close it and re-open - // instead, we will just instruct this loop to process the - // very first artificial rotate event in a special way - bool skip_open_binlog{storage.is_binlog_open()}; bool fetch_result{}; while (!termination_flag.test() && @@ -505,7 +593,8 @@ void receive_binlog_events( logger.log( binsrv::log_severity::info, "event: " + std::string{current_header.get_readable_type_code()} + - (readable_flags.empty() ? "" : "(" + readable_flags + ")")); + (readable_flags.empty() ? "" : " (" + readable_flags + ")") + + (context.is_event_info_only() ? " [info_only]" : "")); logger.log(binsrv::log_severity::debug, "Parsed event:\n" + boost::lexical_cast(current_event)); @@ -516,8 +605,7 @@ void receive_binlog_events( boost::lexical_cast(context.get_transaction_gtid())); } - process_binlog_event(current_event, portion, context, storage, - skip_open_binlog); + process_binlog_event(current_event, portion, logger, context, storage); } if (termination_flag.test()) { logger.log(binsrv::log_severity::info, @@ -533,6 +621,18 @@ void receive_binlog_events( util::exception_location().raise( "fetch operation did not reach EOF reading binlog events"); } + + // in GTID-based replication mode we also need to discard some data in the + // transaction event buffer to make sure that upon reconnection we will + // continue operation from the transaction boundary + + // in position-based replication mode this is not needed as it is not a + // problem to resume streaming fron a position that does not correspond to + // transaction boundary + if (storage.is_in_gtid_replication_mode()) { + storage.discard_incomplete_transaction_events(); + } + // TODO: here (upon timing out) we also need to flush internal buffers in // the storage logger.log(binsrv::log_severity::info, diff --git a/src/binsrv/basic_storage_backend.cpp b/src/binsrv/basic_storage_backend.cpp index 142a841..3159923 100644 --- a/src/binsrv/basic_storage_backend.cpp +++ b/src/binsrv/basic_storage_backend.cpp @@ -15,6 +15,7 @@ #include "binsrv/basic_storage_backend.hpp" +#include #include #include #include @@ -39,15 +40,17 @@ void basic_storage_backend::put_object(std::string_view name, do_put_object(name, content); } -void basic_storage_backend::open_stream(std::string_view name, - storage_backend_open_stream_mode mode) { +[[nodiscard]] std::uint64_t +basic_storage_backend::open_stream(std::string_view name, + storage_backend_open_stream_mode mode) { if (stream_open_) { util::exception_location().raise( "cannot open a new stream as the previous one has not been closed"); } - do_open_stream(name, mode); + const auto result{do_open_stream(name, mode)}; stream_open_ = true; + return result; } void basic_storage_backend::write_data_to_stream(util::const_byte_span data) { diff --git a/src/binsrv/basic_storage_backend.hpp b/src/binsrv/basic_storage_backend.hpp index 5d224ed..bc09690 100644 --- a/src/binsrv/basic_storage_backend.hpp +++ b/src/binsrv/basic_storage_backend.hpp @@ -40,8 +40,8 @@ class basic_storage_backend { void put_object(std::string_view name, util::const_byte_span content); [[nodiscard]] bool is_stream_open() const noexcept { return stream_open_; } - void open_stream(std::string_view name, - storage_backend_open_stream_mode mode); + [[nodiscard]] std::uint64_t + open_stream(std::string_view name, storage_backend_open_stream_mode mode); void write_data_to_stream(util::const_byte_span data); void close_stream(); @@ -55,8 +55,9 @@ class basic_storage_backend { virtual void do_put_object(std::string_view name, util::const_byte_span content) = 0; - virtual void do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) = 0; + [[nodiscard]] virtual std::uint64_t + do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) = 0; virtual void do_write_data_to_stream(util::const_byte_span data) = 0; virtual void do_close_stream() = 0; diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp new file mode 100644 index 0000000..a9f2678 --- /dev/null +++ b/src/binsrv/binlog_file_metadata.cpp @@ -0,0 +1,90 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/binlog_file_metadata.hpp" + +#include +#include +#include +#include + +#include + +#include +#include +#include + +// needed for binsrv::gtids::gtid_set_storage +#include // IWYU pragma: keep + +#include "binsrv/gtids/common_types.hpp" +#include "binsrv/gtids/gtid_set.hpp" + +#include "util/byte_span.hpp" +#include "util/nv_tuple_from_json.hpp" +#include "util/nv_tuple_to_json.hpp" + +namespace binsrv { + +binlog_file_metadata::binlog_file_metadata() + : impl_{{expected_binlog_file_metadata_version}, {}, {}} {} + +binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { + auto json_value = boost::json::parse(data); + util::nv_tuple_from_json(json_value, impl_); + + validate(); +} + +[[nodiscard]] std::string binlog_file_metadata::str() const { + boost::json::value json_value; + util::nv_tuple_to_json(json_value, impl_); + + return boost::json::serialize(json_value); +} + +[[nodiscard]] gtids::gtid_set binlog_file_metadata::get_gtids() const { + const auto &optional_gtids{root().get<"gtids">()}; + if (!optional_gtids.has_value()) { + return {}; + } + + const auto &encoded_gtids{optional_gtids.value()}; + std::string decoded_gtids(std::size(encoded_gtids) / 2U, 'x'); + boost::algorithm::unhex(encoded_gtids, std::data(decoded_gtids)); + return gtids::gtid_set{util::as_const_byte_span(decoded_gtids)}; +} + +void binlog_file_metadata::set_gtids(const gtids::gtid_set >ids) { + const auto encoded_size{gtids.calculate_encoded_size()}; + + gtids::gtid_set_storage buffer(encoded_size); + util::byte_span destination{buffer}; + gtids.encode_to(destination); + + std::string encoded_gtids(std::size(buffer) * 2U, 'x'); + const auto buffer_sv{util::as_string_view(std::as_const(buffer))}; + boost::algorithm::hex_lower(buffer_sv, std::data(encoded_gtids)); + root().get<"gtids">().emplace(std::move(encoded_gtids)); +} + +void binlog_file_metadata::validate() const { + if (root().get<"version">() != expected_binlog_file_metadata_version) { + util::exception_location().raise( + "unsupported binlog file metadata version"); + } +} + +} // namespace binsrv diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp new file mode 100644 index 0000000..30d6e84 --- /dev/null +++ b/src/binsrv/binlog_file_metadata.hpp @@ -0,0 +1,63 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_BINLOG_FILE_METADATA_HPP +#define BINSRV_BINLOG_FILE_METADATA_HPP + +#include "binsrv/binlog_file_metadata_fwd.hpp" // IWYU pragma: export + +#include +#include +#include + +#include "binsrv/gtids/gtid_set.hpp" + +#include "util/common_optional_types.hpp" +#include "util/nv_tuple.hpp" + +namespace binsrv { + +class [[nodiscard]] binlog_file_metadata { +private: + using impl_type = util::nv_tuple< + // clang-format off + util::nv<"version", std::uint32_t>, + util::nv<"size", std::uint64_t>, + util::nv<"gtids", util::optional_string> + // clang-format on + >; + +public: + binlog_file_metadata(); + + explicit binlog_file_metadata(std::string_view data); + + [[nodiscard]] std::string str() const; + + [[nodiscard]] auto &root() noexcept { return impl_; } + [[nodiscard]] const auto &root() const noexcept { return impl_; } + + [[nodiscard]] gtids::gtid_set get_gtids() const; + void set_gtids(const gtids::gtid_set >ids); + +private: + impl_type impl_; + + void validate() const; +}; + +} // namespace binsrv + +#endif // BINSRV_BINLOG_FILE_METADATA_HPP diff --git a/src/binsrv/binlog_file_metadata_fwd.hpp b/src/binsrv/binlog_file_metadata_fwd.hpp new file mode 100644 index 0000000..cc20108 --- /dev/null +++ b/src/binsrv/binlog_file_metadata_fwd.hpp @@ -0,0 +1,29 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_BINLOG_FILE_METADATA_FWD_HPP +#define BINSRV_BINLOG_FILE_METADATA_FWD_HPP + +#include + +namespace binsrv { + +class binlog_file_metadata; + +inline constexpr std::uint32_t expected_binlog_file_metadata_version{1U}; + +} // namespace binsrv + +#endif // BINSRV_BINLOG_FILE_METADATA_FWD_HPP diff --git a/src/binsrv/event/reader_context.cpp b/src/binsrv/event/reader_context.cpp index 5b372af..018fda5 100644 --- a/src/binsrv/event/reader_context.cpp +++ b/src/binsrv/event/reader_context.cpp @@ -118,6 +118,7 @@ reader_context::process_event_in_rotate_artificial_expected_state( position_ = static_cast( current_event.get_post_header().get_position_raw()); + info_only_event_ = true; // transition to the next state state_ = state_type::format_description_expected; return true; @@ -183,17 +184,57 @@ reader_context::process_event_in_format_description_expected_state( // from a particular position (created only for the purpose of making // clients aware of the protocol specifics) - // the latter can be distinguished by having next_event_position set to - // zero - const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; - if (!is_pseudo) { + // in position-based mode we have 2 cases: + // 1) Starting from the beginning of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position != 0) - should be written + // PREVIOUS_GTIDS_LOG - should be written + // ... + // 2) Resuming from the middle of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position == 0) - should not be written + // ... + + // in GTID-based mode we have 2 cases: + // 1) Starting from the beginning of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position != 0) - should be written + // PREVIOUS_GTIDS_LOG - should be written + // ... + // 2) Resuming from the middle of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position != 0) - should not be written + // PREVIOUS_GTIDS_LOG - should not be written + // ... + + // in other words, in GTID-based mode there is no way to distinguish whether + // the FDE / PREVIOUS_GTIDS_LOG is pseudo and should not be written, or not - + // that is why we rely only on externally supplied + // "start_from_new_binlog_file" constructor's argument + info_only_event_ = expect_ignorable_preamble_events_; + if (replication_mode_ == replication_mode_type::position && + info_only_event_) { + if (common_header.get_next_event_position_raw() != 0U) { + util::exception_location().raise( + "expected next event position set to zero in pseudo format " + "description event"); + } + } + if (!info_only_event_) { validate_position_and_advance(common_header); } - // transition to the next state: if the current format description event - // is a pseudo one, then the next event is expected to be one of the - // gtid log events, otherwise previous gtids log event - state_ = (is_pseudo ? state_type::gtid_log_expected - : state_type::previous_gtids_expected); + + // transition to the next state: + // the next expected event is PREVIOUS_GTIDS_LOG, unless we are in + // position-based replication mode and this we resumed streaming from the + // middle of a binlog file + if (replication_mode_ == replication_mode_type::position && + info_only_event_) { + state_ = state_type::gtid_log_expected; + expect_ignorable_preamble_events_ = false; + } else { + state_ = state_type::previous_gtids_expected; + } return true; } @@ -217,7 +258,11 @@ reader_context::process_event_in_previous_gtids_expected_state( "previous gtids log event is not expected to be artificial"); } - validate_position_and_advance(common_header); + info_only_event_ = expect_ignorable_preamble_events_; + expect_ignorable_preamble_events_ = false; + if (!info_only_event_) { + validate_position_and_advance(common_header); + } state_ = state_type::gtid_log_expected; return true; @@ -266,6 +311,7 @@ reader_context::process_event_in_previous_gtids_expected_state( start_transaction(current_event); validate_position_and_advance(common_header); + info_only_event_ = false; state_ = state_type::any_other_expected; return true; } @@ -326,6 +372,8 @@ reader_context::process_event_in_previous_gtids_expected_state( } update_transaction(common_header); validate_position_and_advance(common_header); + + info_only_event_ = false; // not changing the state here - remain in 'any_other_expected' return true; } @@ -370,6 +418,8 @@ reader_context::process_event_in_rotate_or_stop_expected_state( // FORMAT_DESCRIPTION validate_position(common_header); reset_position(); + + info_only_event_ = false; state_ = state_type::rotate_artificial_expected; return true; diff --git a/src/binsrv/event/reader_context.hpp b/src/binsrv/event/reader_context.hpp index 45cf998..c1c5d91 100644 --- a/src/binsrv/event/reader_context.hpp +++ b/src/binsrv/event/reader_context.hpp @@ -63,6 +63,14 @@ class [[nodiscard]] reader_context { (state_ == state_type::rotate_artificial_expected); } + [[nodiscard]] bool is_event_info_only() const noexcept { + return info_only_event_; + } + + void set_expect_ignorable_preamble_events() noexcept { + expect_ignorable_preamble_events_ = true; + } + private: // this class implements the logic of the following state machine // ( @@ -91,6 +99,9 @@ class [[nodiscard]] reader_context { std::uint32_t expected_transaction_length_{0U}; std::uint32_t current_transaction_length_{0U}; + bool expect_ignorable_preamble_events_{false}; + bool info_only_event_{false}; + void process_event(const event ¤t_event); [[nodiscard]] bool process_event_in_rotate_artificial_expected_state(const event ¤t_event); diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index 5de6820..7638e6a 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -16,6 +16,7 @@ #include "binsrv/filesystem_storage_backend.hpp" #include +#include #include #include #include @@ -154,20 +155,24 @@ void filesystem_storage_backend::do_put_object(std::string_view name, } } -void filesystem_storage_backend::do_open_stream( +[[nodiscard]] std::uint64_t filesystem_storage_backend::do_open_stream( std::string_view name, storage_backend_open_stream_mode mode) { assert(!ofs_.is_open()); const std::filesystem::path current_file_path{get_object_path(name)}; - auto open_mode{std::ios_base::out | std::ios_base::binary | - (mode == storage_backend_open_stream_mode::create - ? std::ios_base::trunc - : std::ios_base::app)}; + const auto open_mode{std::ios_base::out | std::ios_base::binary | + (mode == storage_backend_open_stream_mode::create + ? std::ios_base::trunc + : std::ios_base::app | std::ios_base::ate)}; ofs_.open(current_file_path, open_mode); if (!ofs_.is_open()) { util::exception_location().raise( "cannot open underlying file for the stream"); } + + const auto open_position{static_cast(ofs_.tellp())}; + + return static_cast(open_position); } void filesystem_storage_backend::do_write_data_to_stream( diff --git a/src/binsrv/filesystem_storage_backend.hpp b/src/binsrv/filesystem_storage_backend.hpp index 1b1f01c..b83c8bc 100644 --- a/src/binsrv/filesystem_storage_backend.hpp +++ b/src/binsrv/filesystem_storage_backend.hpp @@ -48,8 +48,9 @@ class [[nodiscard]] filesystem_storage_backend final void do_put_object(std::string_view name, util::const_byte_span content) override; - void do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) override; + [[nodiscard]] std::uint64_t + do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) override; void do_write_data_to_stream(util::const_byte_span data) override; void do_close_stream() override; diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index bd8f19e..6344f3e 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -650,8 +651,9 @@ void s3_storage_backend::do_put_object(std::string_view name, {.bucket = bucket_, .object_path = get_object_path(name)}, content); } -void s3_storage_backend::do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) { +[[nodiscard]] std::uint64_t +s3_storage_backend::do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) { assert(!tmp_fstream_.is_open()); current_name_ = name; current_tmp_file_path_ = generate_tmp_file_path(); @@ -662,13 +664,20 @@ void s3_storage_backend::do_open_stream(std::string_view name, current_tmp_file_path_); } - tmp_fstream_.open(current_tmp_file_path_, - std::ios_base::in | std::ios_base::out | - std::ios_base::binary | std::ios_base::app); + const auto open_mode{std::ios_base::in | std::ios_base::out | + std::ios_base::binary | + (mode == storage_backend_open_stream_mode::create + ? std::ios_base::trunc + : std::ios_base::app | std::ios_base::ate)}; + tmp_fstream_.open(current_tmp_file_path_, open_mode); if (!tmp_fstream_.is_open()) { util::exception_location().raise( "cannot open temporary file for S3 object body stream"); } + + const auto open_position{static_cast(tmp_fstream_.tellp())}; + + return static_cast(open_position); } void s3_storage_backend::do_write_data_to_stream(util::const_byte_span data) { diff --git a/src/binsrv/s3_storage_backend.hpp b/src/binsrv/s3_storage_backend.hpp index f088f0a..421f097 100644 --- a/src/binsrv/s3_storage_backend.hpp +++ b/src/binsrv/s3_storage_backend.hpp @@ -73,8 +73,9 @@ class [[nodiscard]] s3_storage_backend final : public basic_storage_backend { void do_put_object(std::string_view name, util::const_byte_span content) override; - void do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) override; + [[nodiscard]] std::uint64_t + do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) override; void do_write_data_to_stream(util::const_byte_span data) override; void do_close_stream() override; diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 9d52c18..f751acc 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -28,13 +28,17 @@ #include #include "binsrv/basic_storage_backend.hpp" -#include "binsrv/replication_mode_type_fwd.hpp" +#include "binsrv/binlog_file_metadata.hpp" +#include "binsrv/replication_mode_type.hpp" #include "binsrv/storage_backend_factory.hpp" #include "binsrv/storage_config.hpp" #include "binsrv/storage_metadata.hpp" #include "binsrv/event/protocol_traits_fwd.hpp" +#include "binsrv/gtids/gtid.hpp" +#include "binsrv/gtids/gtid_set.hpp" + #include "util/byte_span.hpp" #include "util/exception_location_helpers.hpp" @@ -56,42 +60,56 @@ storage::storage(const storage_config &config, backend_ = storage_backend_factory::create(config); - const auto storage_objects{backend_->list_objects()}; + auto storage_objects{backend_->list_objects()}; if (storage_objects.empty()) { // initialized on a new / empty storage - just save metadata and return save_metadata(); return; } - if (!storage_objects.contains(metadata_name)) { + const auto metadata_it{storage_objects.find(metadata_name)}; + if (metadata_it == std::end(storage_objects)) { util::exception_location().raise( "storage is not empty but does not contain metadata"); } + storage_objects.erase(metadata_it); + load_metadata(); validate_metadata(replication_mode); - if (!storage_objects.contains(default_binlog_index_name)) { + const auto binlog_index_it{storage_objects.find(default_binlog_index_name)}; + if (binlog_index_it == std::end(storage_objects)) { util::exception_location().raise( "storage is not empty but does not contain binlog index"); } - + storage_objects.erase(binlog_index_it); + + // extracting all binlog file metadata files into a separate container + storage_object_name_container storage_metadata_objects; + for (auto storage_object_it{std::begin(storage_objects)}; + storage_object_it != std::end(storage_objects);) { + const std::filesystem::path object_name{storage_object_it->first}; + if (object_name.has_extension() && + object_name.extension() == binlog_metadata_extension) { + auto object_node = storage_objects.extract(storage_object_it++); + storage_metadata_objects.insert(std::move(object_node)); + } else { + ++storage_object_it; + } + } load_binlog_index(); validate_binlog_index(storage_objects); - if (!binlog_names_.empty()) { - // call to validate_binlog_index() guarantees that the name will be - // found here - position_ = storage_objects.at(binlog_names_.back()); - } + load_and_validate_binlog_metadata_set(storage_objects, + storage_metadata_objects); } storage::~storage() { - if (!backend_->is_stream_open()) { - return; - } // bugprone-empty-catch should not be that strict in destructors try { - backend_->close_stream(); + if (has_event_data_to_flush()) { + flush_event_buffer(); + } } catch (...) { // NOLINT(bugprone-empty-catch) } } @@ -100,6 +118,10 @@ storage::~storage() { return backend_->get_description(); } +[[nodiscard]] bool storage::is_in_gtid_replication_mode() const noexcept { + return replication_mode_ == replication_mode_type::gtid; +} + [[nodiscard]] bool storage::check_binlog_name(std::string_view binlog_name) noexcept { // TODO: parse binlog name into "base name" and "rotation number" @@ -115,17 +137,40 @@ storage::check_binlog_name(std::string_view binlog_name) noexcept { return backend_->is_stream_open(); } -void storage::open_binlog(std::string_view binlog_name) { +[[nodiscard]] open_binlog_status +storage::open_binlog(std::string_view binlog_name) { + auto result{open_binlog_status::opened_with_data_present}; + if (!check_binlog_name(binlog_name)) { util::exception_location().raise( "cannot create a binlog with invalid name"); } - const auto mode{position_ == 0ULL ? storage_backend_open_stream_mode::create - : storage_backend_open_stream_mode::append}; - backend_->open_stream(binlog_name, mode); + // here we either create a new binlog file if its name is not presen in the + // "binlog_names_", or we open an existing one and append to it, in which + // case we need to make sure that the current position is properly set + const bool binlog_exists{std::ranges::find(binlog_names_, binlog_name) != + std::end(binlog_names_)}; - if (mode == storage_backend_open_stream_mode::create) { + // in the case when binlog exists, the name must be equal to the last item in + // "binlog_names_" list and "position_" must be set to a non-zero value + if (binlog_exists) { + if (binlog_name != get_current_binlog_name()) { + util::exception_location().raise( + "cannot open an existing binlog that is not the latest one for " + "append"); + } + if (get_current_position() == 0ULL) { + util::exception_location().raise( + "invalid position set when opening an existing binlog"); + } + } + + const auto mode{binlog_exists ? storage_backend_open_stream_mode::append + : storage_backend_open_stream_mode::create}; + const auto open_stream_offset{backend_->open_stream(binlog_name, mode)}; + + if (!binlog_exists) { // writing the magic binlog footprint only if this is a newly // created file backend_->write_data_to_stream(event::magic_binlog_payload); @@ -133,25 +178,48 @@ void storage::open_binlog(std::string_view binlog_name) { binlog_names_.emplace_back(binlog_name); save_binlog_index(); position_ = event::magic_binlog_offset; + save_binlog_metadata(get_current_binlog_name()); + result = open_binlog_status::created; + } else { + assert(position_ == open_stream_offset); + if (open_stream_offset == 0ULL) { + backend_->write_data_to_stream(event::magic_binlog_payload); + position_ = event::magic_binlog_offset; + result = open_binlog_status::opened_empty; + } else if (open_stream_offset == event::magic_binlog_offset) { + result = open_binlog_status::opened_at_magic_paylod_offset; + } else { + // position is beyond magic payload offset + assert(open_stream_offset > event::magic_binlog_offset); + result = open_binlog_status::opened_with_data_present; + } } if (size_checkpointing_enabled()) { - last_checkpoint_position_ = position_; + last_checkpoint_position_ = get_current_position(); } if (interval_checkpointing_enabled()) { last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); } + assert(std::size(event_buffer_) == 0U); event_buffer_.reserve(default_event_buffer_size_in_bytes); - assert(last_transaction_boundary_position_in_event_buffer_ == 0U); + assert(!has_event_data_to_flush()); + assert(gtids_in_event_buffer_.is_empty()); + + return result; } void storage::write_event(util::const_byte_span event_data, - bool at_transaction_boundary) { + bool at_transaction_boundary, + const gtids::gtid &transaction_gtid) { event_buffer_.insert(std::end(event_buffer_), std::cbegin(event_data), std::cend(event_data)); if (at_transaction_boundary) { last_transaction_boundary_position_in_event_buffer_ = std::size(event_buffer_); + if (is_in_gtid_replication_mode() && !transaction_gtid.is_empty()) { + gtids_in_event_buffer_ += transaction_gtid; + } } const auto event_data_size{std::size(event_data)}; @@ -160,25 +228,43 @@ void storage::write_event(util::const_byte_span event_data, // now we are writing data from the event buffer to the storage backend if // event buffer has some data in it that can be considered a complete // transaction and a checkpoint event (either size-based or time-based) - // occurred - if (last_transaction_boundary_position_in_event_buffer_ != 0U) { + // occurred or we are processing the very last event in the binlog file + + if (has_event_data_to_flush()) { + const auto ready_to_flush_position{get_ready_to_flush_position()}; const auto now_ts{std::chrono::steady_clock::now()}; - if ((size_checkpointing_enabled() && - (position_ >= last_checkpoint_position_ + checkpoint_size_bytes_)) || - (interval_checkpointing_enabled() && - (now_ts >= - last_checkpoint_timestamp_ + checkpoint_interval_seconds_))) { + bool needs_flush{false}; + if (at_transaction_boundary && transaction_gtid.is_empty()) { + // a special combination of parameters when at_transaction_boundary is + // true and transaction_gtid is empty means that we received either ROTATE + // or STOP event at the very end of a binary log file - in this case we + // need to flush the event data buffer immediately regardless of whether + // one of the checkpointing events occurred or not + needs_flush = true; + } else { + // here we perform size-based checkpointing calculations based on + // calculated "ready_to_flush_position" instead of + // "get_current_position()" directly to take into account that some event + // data may remain buffered + needs_flush = (size_checkpointing_enabled() && + (ready_to_flush_position >= + last_checkpoint_position_ + checkpoint_size_bytes_)) || + (interval_checkpointing_enabled() && + (now_ts >= last_checkpoint_timestamp_ + + checkpoint_interval_seconds_)); + } + if (needs_flush) { flush_event_buffer(); - last_checkpoint_position_ = position_; + last_checkpoint_position_ = ready_to_flush_position; last_checkpoint_timestamp_ = now_ts; } } } void storage::close_binlog() { - if (last_transaction_boundary_position_in_event_buffer_ != 0U) { + if (has_event_data_to_flush()) { flush_event_buffer(); } event_buffer_.clear(); @@ -187,22 +273,37 @@ void storage::close_binlog() { backend_->close_stream(); position_ = 0ULL; if (size_checkpointing_enabled()) { - last_checkpoint_position_ = position_; + last_checkpoint_position_ = get_current_position(); } if (interval_checkpointing_enabled()) { last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); } } +void storage::discard_incomplete_transaction_events() { + const std::size_t bytes_to_discard{ + std::size(event_buffer_) - + last_transaction_boundary_position_in_event_buffer_}; + position_ -= bytes_to_discard; + event_buffer_.resize(last_transaction_boundary_position_in_event_buffer_); +} + void storage::flush_event_buffer() { + assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= std::size(event_buffer_)); + const util::const_byte_span transactions_data{ std::data(event_buffer_), last_transaction_boundary_position_in_event_buffer_}; // writing bytes from // the beginning of the event buffer backend_->write_data_to_stream(transactions_data); + if (is_in_gtid_replication_mode()) { + gtids_ += gtids_in_event_buffer_; + } + save_binlog_metadata(get_current_binlog_name()); + const auto begin_it{std::begin(event_buffer_)}; const auto portion_it{std::next( begin_it, static_cast( @@ -211,6 +312,9 @@ void storage::flush_event_buffer() { // from the beginning of this buffer event_buffer_.erase(begin_it, portion_it); last_transaction_boundary_position_in_event_buffer_ = 0U; + if (is_in_gtid_replication_mode()) { + gtids_in_event_buffer_.clear(); + } } void storage::load_binlog_index() { @@ -248,23 +352,17 @@ void storage::load_binlog_index() { } void storage::validate_binlog_index( - const storage_object_name_container &object_names) { - std::size_t known_entries{0U}; + const storage_object_name_container &object_names) const { for (auto const &[object_name, object_size] : object_names) { - if (object_name == default_binlog_index_name || - object_name == metadata_name) { - continue; - } if (std::ranges::find(binlog_names_, object_name) == std::end(binlog_names_)) { util::exception_location().raise( "storage contains an object that is not " "referenced in the binlog index"); } - ++known_entries; } - if (known_entries != std::size(binlog_names_)) { + if (std::size(object_names) != std::size(binlog_names_)) { util::exception_location().raise( "binlog index contains a reference to a non-existing object"); } @@ -273,7 +371,7 @@ void storage::validate_binlog_index( // files in the index } -void storage::save_binlog_index() { +void storage::save_binlog_index() const { std::ostringstream oss; for (const auto &binlog_name : binlog_names_) { std::filesystem::path binlog_path{default_binlog_index_entry_path}; @@ -291,7 +389,7 @@ void storage::load_metadata() { replication_mode_ = metadata.root().get<"mode">(); } -void storage::validate_metadata(replication_mode_type replication_mode) { +void storage::validate_metadata(replication_mode_type replication_mode) const { if (replication_mode != replication_mode_) { util::exception_location().raise( "replication mode provided to initialize storage differs from the one " @@ -299,11 +397,70 @@ void storage::validate_metadata(replication_mode_type replication_mode) { } } -void storage::save_metadata() { +void storage::save_metadata() const { storage_metadata metadata{}; metadata.root().get<"mode">() = replication_mode_; const auto content{metadata.str()}; backend_->put_object(metadata_name, util::as_const_byte_span(content)); } +[[nodiscard]] std::string +storage::generate_binlog_metadata_name(std::string_view binlog_name) { + std::string binlog_metadata_name{binlog_name}; + binlog_metadata_name += storage::binlog_metadata_extension; + return binlog_metadata_name; +} + +void storage::load_binlog_metadata(std::string_view binlog_name) { + const auto content{ + backend_->get_object(generate_binlog_metadata_name(binlog_name))}; + binlog_file_metadata metadata{content}; + position_ = metadata.root().get<"size">(); + if (is_in_gtid_replication_mode()) { + gtids_ = metadata.get_gtids(); + } +} + +void storage::save_binlog_metadata(std::string_view binlog_name) const { + binlog_file_metadata metadata{}; + metadata.root().get<"size">() = get_ready_to_flush_position(); + if (is_in_gtid_replication_mode()) { + metadata.set_gtids(get_gtids()); + } + const auto content{metadata.str()}; + backend_->put_object(generate_binlog_metadata_name(binlog_name), + util::as_const_byte_span(content)); +} + +void storage::load_and_validate_binlog_metadata_set( + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + const storage_object_name_container &object_names, + const storage_object_name_container &object_metadata_names) { + for (const auto &binlog_name : binlog_names_) { + const auto binlog_metadata_name{generate_binlog_metadata_name(binlog_name)}; + if (!object_metadata_names.contains(binlog_metadata_name)) { + util::exception_location().raise( + "missing metadata for a binlog listed in the binlog index"); + } + load_binlog_metadata(binlog_name); + if (get_current_position() != object_names.at(binlog_name)) { + util::exception_location().raise( + "size from the binlog metadata does not match the actual binlog " + "size"); + } + if (!is_in_gtid_replication_mode() && !gtids_.is_empty()) { + util::exception_location().raise( + "found non-empty GTID set in the binlog metadata while in position " + "replication mode"); + } + } + // after this loop position_ and gtids_ should store the values from the last + // binlog file metadata + + if (std::size(object_metadata_names) != std::size(binlog_names_)) { + util::exception_location().raise( + "found metadata for a non-existing binlog"); + } +} + } // namespace binsrv diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 8ac0036..4ce277a 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -27,6 +27,7 @@ #include "binsrv/replication_mode_type_fwd.hpp" #include "binsrv/storage_config_fwd.hpp" +#include "binsrv/gtids/gtid_fwd.hpp" #include "binsrv/gtids/gtid_set.hpp" #include "util/byte_span_fwd.hpp" @@ -38,6 +39,7 @@ class [[nodiscard]] storage { static constexpr std::string_view default_binlog_index_name{"binlog.index"}; static constexpr std::string_view default_binlog_index_entry_path{"."}; static constexpr std::string_view metadata_name{"metadata.json"}; + static constexpr std::string_view binlog_metadata_extension{".json"}; static constexpr std::size_t default_event_buffer_size_in_bytes{16384U}; @@ -57,6 +59,7 @@ class [[nodiscard]] storage { [[nodiscard]] replication_mode_type get_replication_mode() const noexcept { return replication_mode_; } + [[nodiscard]] bool is_in_gtid_replication_mode() const noexcept; [[nodiscard]] bool has_current_binlog_name() const noexcept { return !binlog_names_.empty(); @@ -77,11 +80,15 @@ class [[nodiscard]] storage { check_binlog_name(std::string_view binlog_name) noexcept; [[nodiscard]] bool is_binlog_open() const noexcept; - void open_binlog(std::string_view binlog_name); + + [[nodiscard]] open_binlog_status open_binlog(std::string_view binlog_name); void write_event(util::const_byte_span event_data, - bool at_transaction_boundary); + bool at_transaction_boundary, + const gtids::gtid &transaction_gtid); void close_binlog(); + void discard_incomplete_transaction_events(); + private: basic_storage_backend_ptr backend_; @@ -101,9 +108,10 @@ class [[nodiscard]] storage { using event_buffer_type = std::vector; event_buffer_type event_buffer_{}; std::size_t last_transaction_boundary_position_in_event_buffer_{}; + gtids::gtid_set gtids_in_event_buffer_{}; [[nodiscard]] bool size_checkpointing_enabled() const noexcept { - return checkpoint_size_bytes_ != 0; + return checkpoint_size_bytes_ != 0ULL; } [[nodiscard]] bool interval_checkpointing_enabled() const noexcept { @@ -111,14 +119,35 @@ class [[nodiscard]] storage { std::chrono::steady_clock::duration{}; } + [[nodiscard]] bool has_event_data_to_flush() const noexcept { + return last_transaction_boundary_position_in_event_buffer_ != 0ULL; + } + [[nodiscard]] std::uint64_t get_flushed_position() const noexcept { + return get_current_position() - std::size(event_buffer_); + } + [[nodiscard]] std::uint64_t get_ready_to_flush_position() const noexcept { + return get_flushed_position() + + last_transaction_boundary_position_in_event_buffer_; + } void flush_event_buffer(); void load_binlog_index(); - void validate_binlog_index(const storage_object_name_container &object_names); - void save_binlog_index(); + void validate_binlog_index( + const storage_object_name_container &object_names) const; + void save_binlog_index() const; + void load_metadata(); - void validate_metadata(replication_mode_type replication_mode); - void save_metadata(); + void validate_metadata(replication_mode_type replication_mode) const; + void save_metadata() const; + + [[nodiscard]] static std::string + generate_binlog_metadata_name(std::string_view binlog_name); + void load_binlog_metadata(std::string_view binlog_name); + void save_binlog_metadata(std::string_view binlog_name) const; + + void load_and_validate_binlog_metadata_set( + const storage_object_name_container &object_names, + const storage_object_name_container &object_metadata_names); }; } // namespace binsrv diff --git a/src/binsrv/storage_fwd.hpp b/src/binsrv/storage_fwd.hpp index 0f7c972..84b0268 100644 --- a/src/binsrv/storage_fwd.hpp +++ b/src/binsrv/storage_fwd.hpp @@ -16,8 +16,17 @@ #ifndef BINSRV_STORAGE_FWD_HPP #define BINSRV_STORAGE_FWD_HPP +#include + namespace binsrv { +enum class open_binlog_status : std::uint8_t { + created, + opened_empty, + opened_at_magic_paylod_offset, + opened_with_data_present +}; + class storage; } // namespace binsrv diff --git a/src/binsrv/storage_metadata.cpp b/src/binsrv/storage_metadata.cpp index 408f235..3b68336 100644 --- a/src/binsrv/storage_metadata.cpp +++ b/src/binsrv/storage_metadata.cpp @@ -31,9 +31,8 @@ namespace binsrv { -storage_metadata::storage_metadata() : impl_{} { - root().get<"version">() = expected_metadata_version; -} +storage_metadata::storage_metadata() + : impl_{{expected_storage_metadata_version}, {}} {} storage_metadata::storage_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); @@ -50,7 +49,7 @@ storage_metadata::storage_metadata(std::string_view data) : impl_{} { } void storage_metadata::validate() const { - if (root().get<"version">() != expected_metadata_version) { + if (root().get<"version">() != expected_storage_metadata_version) { util::exception_location().raise( "unsupported storage metadata version"); } diff --git a/src/binsrv/storage_metadata_fwd.hpp b/src/binsrv/storage_metadata_fwd.hpp index f86aeee..30125c3 100644 --- a/src/binsrv/storage_metadata_fwd.hpp +++ b/src/binsrv/storage_metadata_fwd.hpp @@ -22,7 +22,8 @@ namespace binsrv { class storage_metadata; -inline constexpr std::uint32_t expected_metadata_version{1U}; +inline constexpr std::uint32_t expected_storage_metadata_version{1U}; + } // namespace binsrv #endif // BINSRV_STORAGE_METADATA_FWD_HPP diff --git a/src/util/nv_tuple_to_json.hpp b/src/util/nv_tuple_to_json.hpp index 9a33dcb..f91d8ab 100644 --- a/src/util/nv_tuple_to_json.hpp +++ b/src/util/nv_tuple_to_json.hpp @@ -34,16 +34,26 @@ namespace detail { // classes declared in different namespaces. struct insertion_context {}; +// A helper function that emplaces a single nv value to the specified JSON +// object and is used by the tag_invoke() overload for nv_tuple<..>. +template +void emplace_element_into_json_value(boost::json::object &json_object, + const nv_tuple &obj, + const insertion_context &insertion_ctx) { + auto subvalue{ + boost::json::value_from(obj.template get(), insertion_ctx)}; + if (!subvalue.is_null()) { + json_object.emplace(NV::name.sv(), std::move(subvalue)); + } +} + // The tag_invoke() overload for nv_tuple<..>. template void tag_invoke(boost::json::value_from_tag /*unused*/, boost::json::value &json_value, const nv_tuple &obj, const insertion_context &insertion_ctx) { boost::json::object json_object{}; - - (json_object.emplace(NVPack::name.sv(), - boost::json::value_from(obj.template get(), - insertion_ctx)), + (emplace_element_into_json_value(json_object, obj, insertion_ctx), ...); json_value = std::move(json_object); }