From 5a01caeb4f5e2f6ca1773f0d2edb1a31a6dcac85 Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Mon, 19 Jan 2026 00:32:54 +0100 Subject: [PATCH] PS-10245 feature: Implement receiving binlog events in GTID mode (part 5) https://perconadev.atlassian.net/browse/PS-10245 GTID-based replication mode is now supported. Along with writing data to binlog files 'binsrv::storage' class now also creates and maintains binlog file metadata - separate files that follows the '.json' naming convention and store actual flushed file size (in both position-based and GTID-based replication modes) and accumulated GTID set (only in GTID-based replication mode). In addition, upon storage initialization, we now restore the GTID set from the latest binlog metadata file in order to continue GTID-based replication from this set. Implemented additional logic that discards events from the latest incomplete transaction from the storage buffer upon MySQL server disconnect. Main application extended with additional diagnostic messages informing about binlog files creation / opening / closing / reusing. In GTID mode 'binsrv::storage' class now also keeps track of the GTIDs associated with processed events that helps with updating binlog metadata files. 'binsrv::storage' constructor now performs additional checks to ensure that each file in the binlog index has a corresponding binlog file metadata ('.json') and the size from that metadata file matches the actual binlog file size. 'binsrv::basic_storage_backend::open_stream()' method now returns the size of the opened file or 0 if the file was reated. Concreate implementations ('binsrv::filesystem_storage_backend' and 'binsrv::s3_storage_backend') updated correspondingly. 'binsrv::storage::open_binlog()' method now returns extra info about file creating / opening: - created - opened_empty - opened_at_magic_paylod_offset - opened_with_data_present This helps with making a decision which events 'binsrv::event::reader_context' should expect next. 'binsrv::event::reader_context' extended with additional logic that helps to determine if the processed event is "info-only" and should not be written to the binlog files (this concerns artificial RORATE, FORMAT_DESCRIPTION and PREVIOUS_GTID_LOG events). Improved 'util::nv_tuple_to_json()' function - we now create keys for optional objects only when they are non-empty. Added new 'binlog_streaming.resume_streaming' MTR test case that checks various continuation scenarios bot the Binlog Server utility (starting from an empty storage, starting from a file that has just magic payload in it, receiving ROTATE as the last event, etc.). The testcase has 4 combinations for position-based / GTID-based replication in buffered / unbuffered (immediately flushing) mode. It is now possible to specify to set '$binsrv_replication_mode' (to either 'position' or 'gtid') before including the 'set_up_binsrv_environment.inc' MTR include file to set desired Binlog Server utility replication mode. --- CMakeLists.txt | 4 + .../include/set_up_binsrv_environment.inc | 3 +- .../r/resume_streaming.result | 105 ++++++++ mtr/binlog_streaming/t/binsrv.test | 1 + mtr/binlog_streaming/t/checkpointing.test | 3 +- mtr/binlog_streaming/t/kill_and_restart.test | 1 + mtr/binlog_streaming/t/pull_mode.test | 1 + .../t/resume_streaming.combinations | 15 ++ mtr/binlog_streaming/t/resume_streaming.test | 161 ++++++++++++ mtr/binlog_streaming/t/ssl_connection.test | 1 + src/app.cpp | 230 ++++++++++++----- src/binsrv/basic_storage_backend.cpp | 9 +- src/binsrv/basic_storage_backend.hpp | 9 +- src/binsrv/binlog_file_metadata.cpp | 90 +++++++ src/binsrv/binlog_file_metadata.hpp | 63 +++++ src/binsrv/binlog_file_metadata_fwd.hpp | 29 +++ src/binsrv/event/reader_context.cpp | 70 ++++- src/binsrv/event/reader_context.hpp | 11 + src/binsrv/filesystem_storage_backend.cpp | 15 +- src/binsrv/filesystem_storage_backend.hpp | 5 +- src/binsrv/s3_storage_backend.cpp | 19 +- src/binsrv/s3_storage_backend.hpp | 5 +- src/binsrv/storage.cpp | 243 ++++++++++++++---- src/binsrv/storage.hpp | 43 +++- src/binsrv/storage_fwd.hpp | 9 + src/binsrv/storage_metadata.cpp | 7 +- src/binsrv/storage_metadata_fwd.hpp | 3 +- src/util/nv_tuple_to_json.hpp | 18 +- 28 files changed, 1016 insertions(+), 157 deletions(-) create mode 100644 mtr/binlog_streaming/r/resume_streaming.result create mode 100644 mtr/binlog_streaming/t/resume_streaming.combinations create mode 100644 mtr/binlog_streaming/t/resume_streaming.test create mode 100644 src/binsrv/binlog_file_metadata.cpp create mode 100644 src/binsrv/binlog_file_metadata.hpp create mode 100644 src/binsrv/binlog_file_metadata_fwd.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a90c0fb..d98f543 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -224,6 +224,10 @@ set(source_files src/binsrv/basic_storage_backend.hpp src/binsrv/basic_storage_backend.cpp + src/binsrv/binlog_file_metadata_fwd.hpp + src/binsrv/binlog_file_metadata.hpp + src/binsrv/binlog_file_metadata.cpp + src/binsrv/cout_logger.hpp src/binsrv/cout_logger.cpp diff --git a/mtr/binlog_streaming/include/set_up_binsrv_environment.inc b/mtr/binlog_streaming/include/set_up_binsrv_environment.inc index 3c08ae2..7c611b1 100644 --- a/mtr/binlog_streaming/include/set_up_binsrv_environment.inc +++ b/mtr/binlog_streaming/include/set_up_binsrv_environment.inc @@ -14,6 +14,7 @@ # --let $binsrv_tls_version = TLSv1.2 (optional) # --let $binsrv_idle_time = 10 # --let $binsrv_verify_checksum = TRUE | FALSE +# --let $binsrv_replication_mode = position | gtid # --let $binsrv_checkpoint_size = 2M (optional) # --let $binsrv_checkpoint_interval = 30s (optional) # --source set_up_binsrv_environment.inc @@ -82,7 +83,7 @@ eval SET @binsrv_config_json = JSON_OBJECT( 'server_id', @@server_id + 1, 'idle_time', $binsrv_idle_time, 'verify_checksum', $binsrv_verify_checksum, - 'mode', 'position' + 'mode', '$binsrv_replication_mode' ), 'storage', JSON_OBJECT( 'backend', '$storage_backend', diff --git a/mtr/binlog_streaming/r/resume_streaming.result b/mtr/binlog_streaming/r/resume_streaming.result new file mode 100644 index 0000000..cfce614 --- /dev/null +++ b/mtr/binlog_streaming/r/resume_streaming.result @@ -0,0 +1,105 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** 1. Executing the Binlog Server utility to for the very first +*** time on an empty storage and receive no real events. + +*** 1a. Executing the Binlog Server utility on a storage that has +*** only one binlog file that has only magic payload in it and +*** receive no events. + +*** Creating a simple table +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** 2. Executing the Binlog Server utility on a storage that has +*** only one binlog file that has only magic payload in it and +*** receive events from the CREATE TABLE transaction. + +*** 2a. Executing the Binlog Server utility on a storage that has +*** only one binlog file that has CREATE TABLE transaction and +*** receive no events. + +*** Filling the table with some data (one transaction with one +*** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +*** 3. Executing the Binlog Server utility on a storage that has +*** only one binlog file with some data and receive events from +*** the INSERT transactions. + +*** 3a. Executing the Binlog Server utility on a storage that has +*** only one binlog file with some data and receive no events. + +*** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +*** 4. Executing the Binlog Server utility on a storage that has +*** only one binlog file with some data and receive a single +*** ROTATE event. + +*** 4a. Executing the Binlog Server utility on a storage that has +*** one binlog file with some data and another one with just +*** magic payload and receive no events. + +*** Filling the table with some more data (one transaction with one +*** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +*** 5. Executing the Binlog Server utility on a storage that has +*** one binlog file with some data and another one with just +*** magic payload and receive events from the second group of +*** INSERT transactions. + +*** 5a. Executing the Binlog Server utility on a storage that has +*** two binlog files with some data and receive no events + +*** Flushing the second binary log and switching to the third one. +*** Immediately after that updating data inserted previously. +FLUSH BINARY LOGS; +UPDATE t1 SET id = id + 100; + +*** 6. Executing the Binlog Server utility on a storage that has +*** two binlog files with some data and receive a ROTATE event +*** followed by events from the UPDATE transaction + +*** 6a. Executing the Binlog Server utility on a storage that has +*** three binlog files with some data and receive no events. + +*** Flushing the third binary log and switching to the fourth one. +*** Immediately after that deleting some data updated previously and +*** flushing one more time switching to the fifth binary log file. +FLUSH BINARY LOGS; +DELETE FROM t1 WHERE id <= 103; +FLUSH BINARY LOGS; + +*** 7. Executing the Binlog Server utility on a storage that has +*** three binlog files with some data and receive a ROTATE +*** event followed by a events from the DELETE transaction +*** followed by another ROTATE event + +*** 7a. Executing the Binlog Server utility on a storage that has +*** five binlog files with some data and receive no events. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/binsrv.test b/mtr/binlog_streaming/t/binsrv.test index e10ef3c..a1c30e5 100644 --- a/mtr/binlog_streaming/t/binsrv.test +++ b/mtr/binlog_streaming/t/binsrv.test @@ -42,6 +42,7 @@ INSERT INTO t1 VALUES(DEFAULT); --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = $extracted_init_connect_variable_value +--let $binsrv_replication_mode = position --let $binsrv_checkpoint_size = 1 --source ../include/set_up_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/checkpointing.test b/mtr/binlog_streaming/t/checkpointing.test index da76476..e20bf54 100644 --- a/mtr/binlog_streaming/t/checkpointing.test +++ b/mtr/binlog_streaming/t/checkpointing.test @@ -58,7 +58,8 @@ DROP TABLE t1; --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE -# Enabling checkointing at about 40% of expected single binlog file size, so that it +--let $binsrv_replication_mode = position +# Enabling checkpointing at about 40% of expected single binlog file size, so that it # will happen twice before rotation. # Like wise, from the time point of view, make interval pretty low so that # at lease a few interval checkpointing events will happen. diff --git a/mtr/binlog_streaming/t/kill_and_restart.test b/mtr/binlog_streaming/t/kill_and_restart.test index 3580346..3f89348 100644 --- a/mtr/binlog_streaming/t/kill_and_restart.test +++ b/mtr/binlog_streaming/t/kill_and_restart.test @@ -130,6 +130,7 @@ INSERT INTO t1 VALUES(DEFAULT); --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position --source ../include/set_up_binsrv_environment.inc --echo diff --git a/mtr/binlog_streaming/t/pull_mode.test b/mtr/binlog_streaming/t/pull_mode.test index ef8f86e..e5c2ccb 100644 --- a/mtr/binlog_streaming/t/pull_mode.test +++ b/mtr/binlog_streaming/t/pull_mode.test @@ -19,6 +19,7 @@ eval $stmt_reset_binary_logs_and_gtids; --let $binsrv_read_timeout = 3 --let $binsrv_idle_time = 1 --let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position --source ../include/set_up_binsrv_environment.inc --echo diff --git a/mtr/binlog_streaming/t/resume_streaming.combinations b/mtr/binlog_streaming/t/resume_streaming.combinations new file mode 100644 index 0000000..fa1c14d --- /dev/null +++ b/mtr/binlog_streaming/t/resume_streaming.combinations @@ -0,0 +1,15 @@ +[position_buffered] +init-connect = SET @binsrv_buffering_mode = 'buffered' + +[position_unbuffered] +init-connect = SET @binsrv_buffering_mode = 'unbuffered' + +[gtid_buffered] +gtid-mode=on +enforce-gtid-consistency +init-connect = SET @binsrv_buffering_mode = 'buffered' + +[gtid_unbuffered] +gtid-mode=on +enforce-gtid-consistency +init-connect = SET @binsrv_buffering_mode = 'unbuffered' diff --git a/mtr/binlog_streaming/t/resume_streaming.test b/mtr/binlog_streaming/t/resume_streaming.test new file mode 100644 index 0000000..4b1ddb3 --- /dev/null +++ b/mtr/binlog_streaming/t/resume_streaming.test @@ -0,0 +1,161 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# identifying utility buffering mode from the conbination +--let $extracted_init_connect_variable_name = binsrv_buffering_mode +--source ../include/extract_init_connect_variable_value.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')` +if ($extracted_init_connect_variable_value == 'buffered') +{ + --let $binsrv_checkpoint_size = 1G +} +if ($extracted_init_connect_variable_value == 'unbuffered') +{ + --let $binsrv_checkpoint_size = 1 +} +--source ../include/set_up_binsrv_environment.inc + +--echo +--echo *** 1. Executing the Binlog Server utility to for the very first +--echo *** time on an empty storage and receive no real events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 1a. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file that has only magic payload in it and +--echo *** receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Creating a simple table +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** 2. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file that has only magic payload in it and +--echo *** receive events from the CREATE TABLE transaction. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 2a. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file that has CREATE TABLE transaction and +--echo *** receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Filling the table with some data (one transaction with one +--echo *** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +--echo +--echo *** 3. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file with some data and receive events from +--echo *** the INSERT transactions. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 3a. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file with some data and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +--echo +--echo *** 4. Executing the Binlog Server utility on a storage that has +--echo *** only one binlog file with some data and receive a single +--echo *** ROTATE event. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 4a. Executing the Binlog Server utility on a storage that has +--echo *** one binlog file with some data and another one with just +--echo *** magic payload and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Filling the table with some more data (one transaction with one +--echo *** insert and another one with two inserts). +INSERT INTO t1 VALUES(DEFAULT); +START TRANSACTION; +INSERT INTO t1 VALUES(DEFAULT); +INSERT INTO t1 VALUES(DEFAULT); +COMMIT; + +--echo +--echo *** 5. Executing the Binlog Server utility on a storage that has +--echo *** one binlog file with some data and another one with just +--echo *** magic payload and receive events from the second group of +--echo *** INSERT transactions. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 5a. Executing the Binlog Server utility on a storage that has +--echo *** two binlog files with some data and receive no events +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing the second binary log and switching to the third one. +--echo *** Immediately after that updating data inserted previously. +FLUSH BINARY LOGS; +UPDATE t1 SET id = id + 100; + +--echo +--echo *** 6. Executing the Binlog Server utility on a storage that has +--echo *** two binlog files with some data and receive a ROTATE event +--echo *** followed by events from the UPDATE transaction +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 6a. Executing the Binlog Server utility on a storage that has +--echo *** three binlog files with some data and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Flushing the third binary log and switching to the fourth one. +--echo *** Immediately after that deleting some data updated previously and +--echo *** flushing one more time switching to the fifth binary log file. +FLUSH BINARY LOGS; +DELETE FROM t1 WHERE id <= 103; +FLUSH BINARY LOGS; + +--echo +--echo *** 7. Executing the Binlog Server utility on a storage that has +--echo *** three binlog files with some data and receive a ROTATE +--echo *** event followed by a events from the DELETE transaction +--echo *** followed by another ROTATE event +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 7a. Executing the Binlog Server utility on a storage that has +--echo *** five binlog files with some data and receive no events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/ssl_connection.test b/mtr/binlog_streaming/t/ssl_connection.test index b669fc6..8b59c29 100644 --- a/mtr/binlog_streaming/t/ssl_connection.test +++ b/mtr/binlog_streaming/t/ssl_connection.test @@ -36,6 +36,7 @@ eval GRANT REPLICATION SLAVE ON *.* TO '$x509_user_name'@'$localhost_ip'; --let $binsrv_read_timeout = 60 --let $binsrv_idle_time = 10 --let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position # a query that checks SSL connection status --let $ssl_status_query = SELECT IF(VARIABLE_VALUE = '', '', VARIABLE_VALUE) AS SSL_status FROM performance_schema.session_status WHERE VARIABLE_NAME = 'Ssl_version' diff --git a/src/app.cpp b/src/app.cpp index 35856ee..ed8a1be 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -355,66 +355,143 @@ void log_span_dump(binsrv::basic_logger &logger, } } -void process_binlog_event(const binsrv::event::event ¤t_event, - util::const_byte_span portion, - const binsrv::event::reader_context &context, - binsrv::storage &storage, bool &skip_open_binlog) { - const auto ¤t_common_header = current_event.get_common_header(); - const auto code = current_common_header.get_type_code(); - - const auto is_artificial{current_common_header.get_flags().has_element( - binsrv::event::common_header_flag_type::artificial)}; - const auto is_pseudo{current_common_header.get_next_event_position_raw() == - 0U}; - - if (code == binsrv::event::code_type::rotate && is_artificial) { - const auto ¤t_rotate_body = - current_event.get_body(); - if (skip_open_binlog) { - // we are supposed to get here after reconnection, so doing - // basic integrity checks - if (current_rotate_body.get_binlog() != - storage.get_current_binlog_name()) { - util::exception_location().raise( - "unexpected binlog name in artificial rotate event after " - "reconnection"); - } +void process_artificial_rotate_event(const binsrv::event::event ¤t_event, + binsrv::basic_logger &logger, + binsrv::event::reader_context &context, + binsrv::storage &storage) { + assert(current_event.get_common_header().get_type_code() == + binsrv::event::code_type::rotate); + assert(current_event.get_common_header().get_flags().has_element( + binsrv::event::common_header_flag_type::artificial)); + + const auto ¤t_rotate_body = + current_event.get_body(); + + bool binlog_opening_needed{true}; + + if (storage.is_binlog_open()) { + // here we take a "shortcut" path - upon losing connection to the MySQL + // server, we do not close storage's binlog file immediately expecting + // that upon reconnection we will be able to continue writing to the + // same file + + // so, here we just need to make sure that (binlog name, position) pair + // in the artificial ROTATE event matches the current storage state + + // also, in case when the server was not shut down properly, it won't + // have ROTATE or STOP event as the last one in the binlog, so here we + // handle this case by closing the old binlog and opening a new one + + if (current_rotate_body.get_binlog() == storage.get_current_binlog_name()) { + // in addition, in position-based replication mode we also need to check + // the position const auto ¤t_rotate_post_header = current_event.get_post_header(); if (current_rotate_post_header.get_position_raw() != storage.get_current_position()) { util::exception_location().raise( - "unexpected binlog position in artificial rotate event after " - "reconnection"); + "unexpected binlog position in artificial rotate event"); } - skip_open_binlog = false; + binlog_opening_needed = false; + // after reusing the existing storage binlog file, we should instruct + // the reader context to mark the upcoming FDE and PREVIOUS_GTIDS_LOG + // events as info-only + context.set_expect_ignorable_preamble_events(); + + const std::string current_binlog_name{storage.get_current_binlog_name()}; + logger.log(binsrv::log_severity::info, + "storage: reused already open binlog file: " + + current_binlog_name); + + } else { + // if names do not match, we need to close the currently open + // binlog and make sure that binlog_opening_needed is set to true, so + // that we will open a new one later + const std::string old_binlog_name{storage.get_current_binlog_name()}; + storage.close_binlog(); + logger.log(binsrv::log_severity::info, + "storage: closed binlog file left open: " + old_binlog_name); + // binlog_opening_needed remains true in this branch + assert(binlog_opening_needed); + } + } + if (binlog_opening_needed) { + const auto binlog_open_result{ + storage.open_binlog(current_rotate_body.get_binlog())}; + + // we also need to instruct the reader context that we opened an + // existing file (the one that was neither empty nor just had the + // magic payload writtent to it), so that it would mark the upcoming FDE + // and PREVIOUS_GTIDS_LOG events as info-only + + if (binlog_open_result == + binsrv::open_binlog_status::opened_with_data_present) { + context.set_expect_ignorable_preamble_events(); + } + + std::string message{"storage: "}; + if (binlog_open_result == binsrv::open_binlog_status::created) { + message += "created a new"; } else { - // in case when the server was not shut down properly, it won't have - // ROTATE or STOP event as the last one in the binlog, so here we - // handle this case by closing the old binlog and opening a new one - if (storage.is_binlog_open()) { - storage.close_binlog(); + message += "opened an existing"; + if (binlog_open_result == binsrv::open_binlog_status::opened_empty) { + message += " (empty)"; + } else if (binlog_open_result == + binsrv::open_binlog_status::opened_at_magic_paylod_offset) { + message += " (with magic payload only)"; } - storage.open_binlog(current_rotate_body.get_binlog()); } + message += " binlog file: "; + message += current_rotate_body.get_binlog(); + logger.log(binsrv::log_severity::info, message); } - if (!is_artificial && !is_pseudo) { - storage.write_event(portion, context.is_at_transaction_boundary()); +} + +void process_rotate_or_stop_event(binsrv::basic_logger &logger, + binsrv::storage &storage) { + const std::string old_binlog_name{storage.get_current_binlog_name()}; + storage.close_binlog(); + logger.log(binsrv::log_severity::info, + "storage: closed binlog file: " + old_binlog_name); +} + +void process_binlog_event(const binsrv::event::event ¤t_event, + util::const_byte_span portion, + binsrv::basic_logger &logger, + binsrv::event::reader_context &context, + binsrv::storage &storage) { + const auto ¤t_common_header = current_event.get_common_header(); + const auto code = current_common_header.get_type_code(); + + const auto is_artificial{current_common_header.get_flags().has_element( + binsrv::event::common_header_flag_type::artificial)}; + + // processing the very first event in the sequence - artificial ROTATE event + if (code == binsrv::event::code_type::rotate && is_artificial) { + process_artificial_rotate_event(current_event, logger, context, storage); + } + + // checking if the event needs to be written to the binlog + if (!context.is_event_info_only()) { + storage.write_event(portion, context.is_at_transaction_boundary(), + context.get_transaction_gtid()); } + + // processing the very last event in the sequence - either a non-artificial + // ROTATE event or a STOP event if ((code == binsrv::event::code_type::rotate && !is_artificial) || code == binsrv::event::code_type::stop) { - storage.close_binlog(); + process_rotate_or_stop_event(logger, storage); } } -void receive_binlog_events( - binsrv::operation_mode_type operation_mode, - const volatile std::atomic_flag &termination_flag, - binsrv::basic_logger &logger, const easymysql::library &mysql_lib, +bool open_connection_and_switch_to_replication( + binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger, + const easymysql::library &mysql_lib, const easymysql::connection_config &connection_config, - std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage) { - easymysql::connection connection{}; + std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage, + easymysql::connection &connection) { try { connection = mysql_lib.create_connection(connection_config); } catch (const easymysql::core_error &) { @@ -423,7 +500,7 @@ void receive_binlog_events( } logger.log(binsrv::log_severity::info, "unable to establish connection to mysql server"); - return; + return false; } logger.log(binsrv::log_severity::info, @@ -431,14 +508,24 @@ void receive_binlog_events( log_connection_info(logger, connection); - const auto replication_mode{storage.get_replication_mode()}; const auto blocking_mode{ operation_mode == binsrv::operation_mode_type::fetch ? easymysql::connection_replication_mode_type::non_blocking : easymysql::connection_replication_mode_type::blocking}; try { - if (replication_mode == binsrv::replication_mode_type::position) { + if (storage.is_in_gtid_replication_mode()) { + const auto >ids{storage.get_gtids()}; + const auto encoded_size{gtids.calculate_encoded_size()}; + + binsrv::gtids::gtid_set_storage encoded_gtids_buffer(encoded_size); + util::byte_span destination{encoded_gtids_buffer}; + gtids.encode_to(destination); + + connection.switch_to_gtid_replication( + server_id, util::const_byte_span{encoded_gtids_buffer}, + verify_checksum, blocking_mode); + } else { if (storage.has_current_binlog_name()) { connection.switch_to_position_replication( server_id, storage.get_current_binlog_name(), @@ -447,28 +534,32 @@ void receive_binlog_events( connection.switch_to_position_replication(server_id, verify_checksum, blocking_mode); } - } else { - const binsrv::gtids::gtid_set empty_gtids{}; - const auto encoded_size{empty_gtids.calculate_encoded_size()}; - - binsrv::gtids::gtid_set_storage buffer(encoded_size); - util::byte_span destination{buffer}; - empty_gtids.encode_to(destination); - - connection.switch_to_gtid_replication(server_id, - util::const_byte_span{buffer}, - verify_checksum, blocking_mode); } } catch (const easymysql::core_error &) { if (operation_mode == binsrv::operation_mode_type::fetch) { throw; } logger.log(binsrv::log_severity::info, "unable to switch to replication"); - return; + return false; } log_replication_info(logger, server_id, storage, verify_checksum, blocking_mode); + return true; +} + +void receive_binlog_events( + binsrv::operation_mode_type operation_mode, + const volatile std::atomic_flag &termination_flag, + binsrv::basic_logger &logger, const easymysql::library &mysql_lib, + const easymysql::connection_config &connection_config, + std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage) { + easymysql::connection connection{}; + if (!open_connection_and_switch_to_replication( + operation_mode, logger, mysql_lib, connection_config, server_id, + verify_checksum, storage, connection)) { + return; + } // Network streams are requested with COM_BINLOG_DUMP and // each Binlog Event response is prepended with 00 OK-byte. @@ -477,12 +568,9 @@ void receive_binlog_events( util::const_byte_span portion; binsrv::event::reader_context context{connection.get_server_version(), - verify_checksum, replication_mode}; + verify_checksum, + storage.get_replication_mode()}; - // if binlog is still open, there is no sense to close it and re-open - // instead, we will just instruct this loop to process the - // very first artificial rotate event in a special way - bool skip_open_binlog{storage.is_binlog_open()}; bool fetch_result{}; while (!termination_flag.test() && @@ -505,7 +593,8 @@ void receive_binlog_events( logger.log( binsrv::log_severity::info, "event: " + std::string{current_header.get_readable_type_code()} + - (readable_flags.empty() ? "" : "(" + readable_flags + ")")); + (readable_flags.empty() ? "" : " (" + readable_flags + ")") + + (context.is_event_info_only() ? " [info_only]" : "")); logger.log(binsrv::log_severity::debug, "Parsed event:\n" + boost::lexical_cast(current_event)); @@ -516,8 +605,7 @@ void receive_binlog_events( boost::lexical_cast(context.get_transaction_gtid())); } - process_binlog_event(current_event, portion, context, storage, - skip_open_binlog); + process_binlog_event(current_event, portion, logger, context, storage); } if (termination_flag.test()) { logger.log(binsrv::log_severity::info, @@ -533,6 +621,18 @@ void receive_binlog_events( util::exception_location().raise( "fetch operation did not reach EOF reading binlog events"); } + + // in GTID-based replication mode we also need to discard some data in the + // transaction event buffer to make sure that upon reconnection we will + // continue operation from the transaction boundary + + // in position-based replication mode this is not needed as it is not a + // problem to resume streaming fron a position that does not correspond to + // transaction boundary + if (storage.is_in_gtid_replication_mode()) { + storage.discard_incomplete_transaction_events(); + } + // TODO: here (upon timing out) we also need to flush internal buffers in // the storage logger.log(binsrv::log_severity::info, diff --git a/src/binsrv/basic_storage_backend.cpp b/src/binsrv/basic_storage_backend.cpp index 142a841..3159923 100644 --- a/src/binsrv/basic_storage_backend.cpp +++ b/src/binsrv/basic_storage_backend.cpp @@ -15,6 +15,7 @@ #include "binsrv/basic_storage_backend.hpp" +#include #include #include #include @@ -39,15 +40,17 @@ void basic_storage_backend::put_object(std::string_view name, do_put_object(name, content); } -void basic_storage_backend::open_stream(std::string_view name, - storage_backend_open_stream_mode mode) { +[[nodiscard]] std::uint64_t +basic_storage_backend::open_stream(std::string_view name, + storage_backend_open_stream_mode mode) { if (stream_open_) { util::exception_location().raise( "cannot open a new stream as the previous one has not been closed"); } - do_open_stream(name, mode); + const auto result{do_open_stream(name, mode)}; stream_open_ = true; + return result; } void basic_storage_backend::write_data_to_stream(util::const_byte_span data) { diff --git a/src/binsrv/basic_storage_backend.hpp b/src/binsrv/basic_storage_backend.hpp index 5d224ed..bc09690 100644 --- a/src/binsrv/basic_storage_backend.hpp +++ b/src/binsrv/basic_storage_backend.hpp @@ -40,8 +40,8 @@ class basic_storage_backend { void put_object(std::string_view name, util::const_byte_span content); [[nodiscard]] bool is_stream_open() const noexcept { return stream_open_; } - void open_stream(std::string_view name, - storage_backend_open_stream_mode mode); + [[nodiscard]] std::uint64_t + open_stream(std::string_view name, storage_backend_open_stream_mode mode); void write_data_to_stream(util::const_byte_span data); void close_stream(); @@ -55,8 +55,9 @@ class basic_storage_backend { virtual void do_put_object(std::string_view name, util::const_byte_span content) = 0; - virtual void do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) = 0; + [[nodiscard]] virtual std::uint64_t + do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) = 0; virtual void do_write_data_to_stream(util::const_byte_span data) = 0; virtual void do_close_stream() = 0; diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp new file mode 100644 index 0000000..a9f2678 --- /dev/null +++ b/src/binsrv/binlog_file_metadata.cpp @@ -0,0 +1,90 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/binlog_file_metadata.hpp" + +#include +#include +#include +#include + +#include + +#include +#include +#include + +// needed for binsrv::gtids::gtid_set_storage +#include // IWYU pragma: keep + +#include "binsrv/gtids/common_types.hpp" +#include "binsrv/gtids/gtid_set.hpp" + +#include "util/byte_span.hpp" +#include "util/nv_tuple_from_json.hpp" +#include "util/nv_tuple_to_json.hpp" + +namespace binsrv { + +binlog_file_metadata::binlog_file_metadata() + : impl_{{expected_binlog_file_metadata_version}, {}, {}} {} + +binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { + auto json_value = boost::json::parse(data); + util::nv_tuple_from_json(json_value, impl_); + + validate(); +} + +[[nodiscard]] std::string binlog_file_metadata::str() const { + boost::json::value json_value; + util::nv_tuple_to_json(json_value, impl_); + + return boost::json::serialize(json_value); +} + +[[nodiscard]] gtids::gtid_set binlog_file_metadata::get_gtids() const { + const auto &optional_gtids{root().get<"gtids">()}; + if (!optional_gtids.has_value()) { + return {}; + } + + const auto &encoded_gtids{optional_gtids.value()}; + std::string decoded_gtids(std::size(encoded_gtids) / 2U, 'x'); + boost::algorithm::unhex(encoded_gtids, std::data(decoded_gtids)); + return gtids::gtid_set{util::as_const_byte_span(decoded_gtids)}; +} + +void binlog_file_metadata::set_gtids(const gtids::gtid_set >ids) { + const auto encoded_size{gtids.calculate_encoded_size()}; + + gtids::gtid_set_storage buffer(encoded_size); + util::byte_span destination{buffer}; + gtids.encode_to(destination); + + std::string encoded_gtids(std::size(buffer) * 2U, 'x'); + const auto buffer_sv{util::as_string_view(std::as_const(buffer))}; + boost::algorithm::hex_lower(buffer_sv, std::data(encoded_gtids)); + root().get<"gtids">().emplace(std::move(encoded_gtids)); +} + +void binlog_file_metadata::validate() const { + if (root().get<"version">() != expected_binlog_file_metadata_version) { + util::exception_location().raise( + "unsupported binlog file metadata version"); + } +} + +} // namespace binsrv diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp new file mode 100644 index 0000000..30d6e84 --- /dev/null +++ b/src/binsrv/binlog_file_metadata.hpp @@ -0,0 +1,63 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_BINLOG_FILE_METADATA_HPP +#define BINSRV_BINLOG_FILE_METADATA_HPP + +#include "binsrv/binlog_file_metadata_fwd.hpp" // IWYU pragma: export + +#include +#include +#include + +#include "binsrv/gtids/gtid_set.hpp" + +#include "util/common_optional_types.hpp" +#include "util/nv_tuple.hpp" + +namespace binsrv { + +class [[nodiscard]] binlog_file_metadata { +private: + using impl_type = util::nv_tuple< + // clang-format off + util::nv<"version", std::uint32_t>, + util::nv<"size", std::uint64_t>, + util::nv<"gtids", util::optional_string> + // clang-format on + >; + +public: + binlog_file_metadata(); + + explicit binlog_file_metadata(std::string_view data); + + [[nodiscard]] std::string str() const; + + [[nodiscard]] auto &root() noexcept { return impl_; } + [[nodiscard]] const auto &root() const noexcept { return impl_; } + + [[nodiscard]] gtids::gtid_set get_gtids() const; + void set_gtids(const gtids::gtid_set >ids); + +private: + impl_type impl_; + + void validate() const; +}; + +} // namespace binsrv + +#endif // BINSRV_BINLOG_FILE_METADATA_HPP diff --git a/src/binsrv/binlog_file_metadata_fwd.hpp b/src/binsrv/binlog_file_metadata_fwd.hpp new file mode 100644 index 0000000..cc20108 --- /dev/null +++ b/src/binsrv/binlog_file_metadata_fwd.hpp @@ -0,0 +1,29 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_BINLOG_FILE_METADATA_FWD_HPP +#define BINSRV_BINLOG_FILE_METADATA_FWD_HPP + +#include + +namespace binsrv { + +class binlog_file_metadata; + +inline constexpr std::uint32_t expected_binlog_file_metadata_version{1U}; + +} // namespace binsrv + +#endif // BINSRV_BINLOG_FILE_METADATA_FWD_HPP diff --git a/src/binsrv/event/reader_context.cpp b/src/binsrv/event/reader_context.cpp index 5b372af..018fda5 100644 --- a/src/binsrv/event/reader_context.cpp +++ b/src/binsrv/event/reader_context.cpp @@ -118,6 +118,7 @@ reader_context::process_event_in_rotate_artificial_expected_state( position_ = static_cast( current_event.get_post_header().get_position_raw()); + info_only_event_ = true; // transition to the next state state_ = state_type::format_description_expected; return true; @@ -183,17 +184,57 @@ reader_context::process_event_in_format_description_expected_state( // from a particular position (created only for the purpose of making // clients aware of the protocol specifics) - // the latter can be distinguished by having next_event_position set to - // zero - const auto is_pseudo{common_header.get_next_event_position_raw() == 0U}; - if (!is_pseudo) { + // in position-based mode we have 2 cases: + // 1) Starting from the beginning of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position != 0) - should be written + // PREVIOUS_GTIDS_LOG - should be written + // ... + // 2) Resuming from the middle of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position == 0) - should not be written + // ... + + // in GTID-based mode we have 2 cases: + // 1) Starting from the beginning of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position != 0) - should be written + // PREVIOUS_GTIDS_LOG - should be written + // ... + // 2) Resuming from the middle of a binary log file + // artificial ROTATE - should not be written + // FDE (common_header.next_event_position != 0) - should not be written + // PREVIOUS_GTIDS_LOG - should not be written + // ... + + // in other words, in GTID-based mode there is no way to distinguish whether + // the FDE / PREVIOUS_GTIDS_LOG is pseudo and should not be written, or not - + // that is why we rely only on externally supplied + // "start_from_new_binlog_file" constructor's argument + info_only_event_ = expect_ignorable_preamble_events_; + if (replication_mode_ == replication_mode_type::position && + info_only_event_) { + if (common_header.get_next_event_position_raw() != 0U) { + util::exception_location().raise( + "expected next event position set to zero in pseudo format " + "description event"); + } + } + if (!info_only_event_) { validate_position_and_advance(common_header); } - // transition to the next state: if the current format description event - // is a pseudo one, then the next event is expected to be one of the - // gtid log events, otherwise previous gtids log event - state_ = (is_pseudo ? state_type::gtid_log_expected - : state_type::previous_gtids_expected); + + // transition to the next state: + // the next expected event is PREVIOUS_GTIDS_LOG, unless we are in + // position-based replication mode and this we resumed streaming from the + // middle of a binlog file + if (replication_mode_ == replication_mode_type::position && + info_only_event_) { + state_ = state_type::gtid_log_expected; + expect_ignorable_preamble_events_ = false; + } else { + state_ = state_type::previous_gtids_expected; + } return true; } @@ -217,7 +258,11 @@ reader_context::process_event_in_previous_gtids_expected_state( "previous gtids log event is not expected to be artificial"); } - validate_position_and_advance(common_header); + info_only_event_ = expect_ignorable_preamble_events_; + expect_ignorable_preamble_events_ = false; + if (!info_only_event_) { + validate_position_and_advance(common_header); + } state_ = state_type::gtid_log_expected; return true; @@ -266,6 +311,7 @@ reader_context::process_event_in_previous_gtids_expected_state( start_transaction(current_event); validate_position_and_advance(common_header); + info_only_event_ = false; state_ = state_type::any_other_expected; return true; } @@ -326,6 +372,8 @@ reader_context::process_event_in_previous_gtids_expected_state( } update_transaction(common_header); validate_position_and_advance(common_header); + + info_only_event_ = false; // not changing the state here - remain in 'any_other_expected' return true; } @@ -370,6 +418,8 @@ reader_context::process_event_in_rotate_or_stop_expected_state( // FORMAT_DESCRIPTION validate_position(common_header); reset_position(); + + info_only_event_ = false; state_ = state_type::rotate_artificial_expected; return true; diff --git a/src/binsrv/event/reader_context.hpp b/src/binsrv/event/reader_context.hpp index 45cf998..c1c5d91 100644 --- a/src/binsrv/event/reader_context.hpp +++ b/src/binsrv/event/reader_context.hpp @@ -63,6 +63,14 @@ class [[nodiscard]] reader_context { (state_ == state_type::rotate_artificial_expected); } + [[nodiscard]] bool is_event_info_only() const noexcept { + return info_only_event_; + } + + void set_expect_ignorable_preamble_events() noexcept { + expect_ignorable_preamble_events_ = true; + } + private: // this class implements the logic of the following state machine // ( @@ -91,6 +99,9 @@ class [[nodiscard]] reader_context { std::uint32_t expected_transaction_length_{0U}; std::uint32_t current_transaction_length_{0U}; + bool expect_ignorable_preamble_events_{false}; + bool info_only_event_{false}; + void process_event(const event ¤t_event); [[nodiscard]] bool process_event_in_rotate_artificial_expected_state(const event ¤t_event); diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index 5de6820..7638e6a 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -16,6 +16,7 @@ #include "binsrv/filesystem_storage_backend.hpp" #include +#include #include #include #include @@ -154,20 +155,24 @@ void filesystem_storage_backend::do_put_object(std::string_view name, } } -void filesystem_storage_backend::do_open_stream( +[[nodiscard]] std::uint64_t filesystem_storage_backend::do_open_stream( std::string_view name, storage_backend_open_stream_mode mode) { assert(!ofs_.is_open()); const std::filesystem::path current_file_path{get_object_path(name)}; - auto open_mode{std::ios_base::out | std::ios_base::binary | - (mode == storage_backend_open_stream_mode::create - ? std::ios_base::trunc - : std::ios_base::app)}; + const auto open_mode{std::ios_base::out | std::ios_base::binary | + (mode == storage_backend_open_stream_mode::create + ? std::ios_base::trunc + : std::ios_base::app | std::ios_base::ate)}; ofs_.open(current_file_path, open_mode); if (!ofs_.is_open()) { util::exception_location().raise( "cannot open underlying file for the stream"); } + + const auto open_position{static_cast(ofs_.tellp())}; + + return static_cast(open_position); } void filesystem_storage_backend::do_write_data_to_stream( diff --git a/src/binsrv/filesystem_storage_backend.hpp b/src/binsrv/filesystem_storage_backend.hpp index 1b1f01c..b83c8bc 100644 --- a/src/binsrv/filesystem_storage_backend.hpp +++ b/src/binsrv/filesystem_storage_backend.hpp @@ -48,8 +48,9 @@ class [[nodiscard]] filesystem_storage_backend final void do_put_object(std::string_view name, util::const_byte_span content) override; - void do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) override; + [[nodiscard]] std::uint64_t + do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) override; void do_write_data_to_stream(util::const_byte_span data) override; void do_close_stream() override; diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index bd8f19e..6344f3e 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -650,8 +651,9 @@ void s3_storage_backend::do_put_object(std::string_view name, {.bucket = bucket_, .object_path = get_object_path(name)}, content); } -void s3_storage_backend::do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) { +[[nodiscard]] std::uint64_t +s3_storage_backend::do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) { assert(!tmp_fstream_.is_open()); current_name_ = name; current_tmp_file_path_ = generate_tmp_file_path(); @@ -662,13 +664,20 @@ void s3_storage_backend::do_open_stream(std::string_view name, current_tmp_file_path_); } - tmp_fstream_.open(current_tmp_file_path_, - std::ios_base::in | std::ios_base::out | - std::ios_base::binary | std::ios_base::app); + const auto open_mode{std::ios_base::in | std::ios_base::out | + std::ios_base::binary | + (mode == storage_backend_open_stream_mode::create + ? std::ios_base::trunc + : std::ios_base::app | std::ios_base::ate)}; + tmp_fstream_.open(current_tmp_file_path_, open_mode); if (!tmp_fstream_.is_open()) { util::exception_location().raise( "cannot open temporary file for S3 object body stream"); } + + const auto open_position{static_cast(tmp_fstream_.tellp())}; + + return static_cast(open_position); } void s3_storage_backend::do_write_data_to_stream(util::const_byte_span data) { diff --git a/src/binsrv/s3_storage_backend.hpp b/src/binsrv/s3_storage_backend.hpp index f088f0a..421f097 100644 --- a/src/binsrv/s3_storage_backend.hpp +++ b/src/binsrv/s3_storage_backend.hpp @@ -73,8 +73,9 @@ class [[nodiscard]] s3_storage_backend final : public basic_storage_backend { void do_put_object(std::string_view name, util::const_byte_span content) override; - void do_open_stream(std::string_view name, - storage_backend_open_stream_mode mode) override; + [[nodiscard]] std::uint64_t + do_open_stream(std::string_view name, + storage_backend_open_stream_mode mode) override; void do_write_data_to_stream(util::const_byte_span data) override; void do_close_stream() override; diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 9d52c18..f751acc 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -28,13 +28,17 @@ #include #include "binsrv/basic_storage_backend.hpp" -#include "binsrv/replication_mode_type_fwd.hpp" +#include "binsrv/binlog_file_metadata.hpp" +#include "binsrv/replication_mode_type.hpp" #include "binsrv/storage_backend_factory.hpp" #include "binsrv/storage_config.hpp" #include "binsrv/storage_metadata.hpp" #include "binsrv/event/protocol_traits_fwd.hpp" +#include "binsrv/gtids/gtid.hpp" +#include "binsrv/gtids/gtid_set.hpp" + #include "util/byte_span.hpp" #include "util/exception_location_helpers.hpp" @@ -56,42 +60,56 @@ storage::storage(const storage_config &config, backend_ = storage_backend_factory::create(config); - const auto storage_objects{backend_->list_objects()}; + auto storage_objects{backend_->list_objects()}; if (storage_objects.empty()) { // initialized on a new / empty storage - just save metadata and return save_metadata(); return; } - if (!storage_objects.contains(metadata_name)) { + const auto metadata_it{storage_objects.find(metadata_name)}; + if (metadata_it == std::end(storage_objects)) { util::exception_location().raise( "storage is not empty but does not contain metadata"); } + storage_objects.erase(metadata_it); + load_metadata(); validate_metadata(replication_mode); - if (!storage_objects.contains(default_binlog_index_name)) { + const auto binlog_index_it{storage_objects.find(default_binlog_index_name)}; + if (binlog_index_it == std::end(storage_objects)) { util::exception_location().raise( "storage is not empty but does not contain binlog index"); } - + storage_objects.erase(binlog_index_it); + + // extracting all binlog file metadata files into a separate container + storage_object_name_container storage_metadata_objects; + for (auto storage_object_it{std::begin(storage_objects)}; + storage_object_it != std::end(storage_objects);) { + const std::filesystem::path object_name{storage_object_it->first}; + if (object_name.has_extension() && + object_name.extension() == binlog_metadata_extension) { + auto object_node = storage_objects.extract(storage_object_it++); + storage_metadata_objects.insert(std::move(object_node)); + } else { + ++storage_object_it; + } + } load_binlog_index(); validate_binlog_index(storage_objects); - if (!binlog_names_.empty()) { - // call to validate_binlog_index() guarantees that the name will be - // found here - position_ = storage_objects.at(binlog_names_.back()); - } + load_and_validate_binlog_metadata_set(storage_objects, + storage_metadata_objects); } storage::~storage() { - if (!backend_->is_stream_open()) { - return; - } // bugprone-empty-catch should not be that strict in destructors try { - backend_->close_stream(); + if (has_event_data_to_flush()) { + flush_event_buffer(); + } } catch (...) { // NOLINT(bugprone-empty-catch) } } @@ -100,6 +118,10 @@ storage::~storage() { return backend_->get_description(); } +[[nodiscard]] bool storage::is_in_gtid_replication_mode() const noexcept { + return replication_mode_ == replication_mode_type::gtid; +} + [[nodiscard]] bool storage::check_binlog_name(std::string_view binlog_name) noexcept { // TODO: parse binlog name into "base name" and "rotation number" @@ -115,17 +137,40 @@ storage::check_binlog_name(std::string_view binlog_name) noexcept { return backend_->is_stream_open(); } -void storage::open_binlog(std::string_view binlog_name) { +[[nodiscard]] open_binlog_status +storage::open_binlog(std::string_view binlog_name) { + auto result{open_binlog_status::opened_with_data_present}; + if (!check_binlog_name(binlog_name)) { util::exception_location().raise( "cannot create a binlog with invalid name"); } - const auto mode{position_ == 0ULL ? storage_backend_open_stream_mode::create - : storage_backend_open_stream_mode::append}; - backend_->open_stream(binlog_name, mode); + // here we either create a new binlog file if its name is not presen in the + // "binlog_names_", or we open an existing one and append to it, in which + // case we need to make sure that the current position is properly set + const bool binlog_exists{std::ranges::find(binlog_names_, binlog_name) != + std::end(binlog_names_)}; - if (mode == storage_backend_open_stream_mode::create) { + // in the case when binlog exists, the name must be equal to the last item in + // "binlog_names_" list and "position_" must be set to a non-zero value + if (binlog_exists) { + if (binlog_name != get_current_binlog_name()) { + util::exception_location().raise( + "cannot open an existing binlog that is not the latest one for " + "append"); + } + if (get_current_position() == 0ULL) { + util::exception_location().raise( + "invalid position set when opening an existing binlog"); + } + } + + const auto mode{binlog_exists ? storage_backend_open_stream_mode::append + : storage_backend_open_stream_mode::create}; + const auto open_stream_offset{backend_->open_stream(binlog_name, mode)}; + + if (!binlog_exists) { // writing the magic binlog footprint only if this is a newly // created file backend_->write_data_to_stream(event::magic_binlog_payload); @@ -133,25 +178,48 @@ void storage::open_binlog(std::string_view binlog_name) { binlog_names_.emplace_back(binlog_name); save_binlog_index(); position_ = event::magic_binlog_offset; + save_binlog_metadata(get_current_binlog_name()); + result = open_binlog_status::created; + } else { + assert(position_ == open_stream_offset); + if (open_stream_offset == 0ULL) { + backend_->write_data_to_stream(event::magic_binlog_payload); + position_ = event::magic_binlog_offset; + result = open_binlog_status::opened_empty; + } else if (open_stream_offset == event::magic_binlog_offset) { + result = open_binlog_status::opened_at_magic_paylod_offset; + } else { + // position is beyond magic payload offset + assert(open_stream_offset > event::magic_binlog_offset); + result = open_binlog_status::opened_with_data_present; + } } if (size_checkpointing_enabled()) { - last_checkpoint_position_ = position_; + last_checkpoint_position_ = get_current_position(); } if (interval_checkpointing_enabled()) { last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); } + assert(std::size(event_buffer_) == 0U); event_buffer_.reserve(default_event_buffer_size_in_bytes); - assert(last_transaction_boundary_position_in_event_buffer_ == 0U); + assert(!has_event_data_to_flush()); + assert(gtids_in_event_buffer_.is_empty()); + + return result; } void storage::write_event(util::const_byte_span event_data, - bool at_transaction_boundary) { + bool at_transaction_boundary, + const gtids::gtid &transaction_gtid) { event_buffer_.insert(std::end(event_buffer_), std::cbegin(event_data), std::cend(event_data)); if (at_transaction_boundary) { last_transaction_boundary_position_in_event_buffer_ = std::size(event_buffer_); + if (is_in_gtid_replication_mode() && !transaction_gtid.is_empty()) { + gtids_in_event_buffer_ += transaction_gtid; + } } const auto event_data_size{std::size(event_data)}; @@ -160,25 +228,43 @@ void storage::write_event(util::const_byte_span event_data, // now we are writing data from the event buffer to the storage backend if // event buffer has some data in it that can be considered a complete // transaction and a checkpoint event (either size-based or time-based) - // occurred - if (last_transaction_boundary_position_in_event_buffer_ != 0U) { + // occurred or we are processing the very last event in the binlog file + + if (has_event_data_to_flush()) { + const auto ready_to_flush_position{get_ready_to_flush_position()}; const auto now_ts{std::chrono::steady_clock::now()}; - if ((size_checkpointing_enabled() && - (position_ >= last_checkpoint_position_ + checkpoint_size_bytes_)) || - (interval_checkpointing_enabled() && - (now_ts >= - last_checkpoint_timestamp_ + checkpoint_interval_seconds_))) { + bool needs_flush{false}; + if (at_transaction_boundary && transaction_gtid.is_empty()) { + // a special combination of parameters when at_transaction_boundary is + // true and transaction_gtid is empty means that we received either ROTATE + // or STOP event at the very end of a binary log file - in this case we + // need to flush the event data buffer immediately regardless of whether + // one of the checkpointing events occurred or not + needs_flush = true; + } else { + // here we perform size-based checkpointing calculations based on + // calculated "ready_to_flush_position" instead of + // "get_current_position()" directly to take into account that some event + // data may remain buffered + needs_flush = (size_checkpointing_enabled() && + (ready_to_flush_position >= + last_checkpoint_position_ + checkpoint_size_bytes_)) || + (interval_checkpointing_enabled() && + (now_ts >= last_checkpoint_timestamp_ + + checkpoint_interval_seconds_)); + } + if (needs_flush) { flush_event_buffer(); - last_checkpoint_position_ = position_; + last_checkpoint_position_ = ready_to_flush_position; last_checkpoint_timestamp_ = now_ts; } } } void storage::close_binlog() { - if (last_transaction_boundary_position_in_event_buffer_ != 0U) { + if (has_event_data_to_flush()) { flush_event_buffer(); } event_buffer_.clear(); @@ -187,22 +273,37 @@ void storage::close_binlog() { backend_->close_stream(); position_ = 0ULL; if (size_checkpointing_enabled()) { - last_checkpoint_position_ = position_; + last_checkpoint_position_ = get_current_position(); } if (interval_checkpointing_enabled()) { last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); } } +void storage::discard_incomplete_transaction_events() { + const std::size_t bytes_to_discard{ + std::size(event_buffer_) - + last_transaction_boundary_position_in_event_buffer_}; + position_ -= bytes_to_discard; + event_buffer_.resize(last_transaction_boundary_position_in_event_buffer_); +} + void storage::flush_event_buffer() { + assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= std::size(event_buffer_)); + const util::const_byte_span transactions_data{ std::data(event_buffer_), last_transaction_boundary_position_in_event_buffer_}; // writing bytes from // the beginning of the event buffer backend_->write_data_to_stream(transactions_data); + if (is_in_gtid_replication_mode()) { + gtids_ += gtids_in_event_buffer_; + } + save_binlog_metadata(get_current_binlog_name()); + const auto begin_it{std::begin(event_buffer_)}; const auto portion_it{std::next( begin_it, static_cast( @@ -211,6 +312,9 @@ void storage::flush_event_buffer() { // from the beginning of this buffer event_buffer_.erase(begin_it, portion_it); last_transaction_boundary_position_in_event_buffer_ = 0U; + if (is_in_gtid_replication_mode()) { + gtids_in_event_buffer_.clear(); + } } void storage::load_binlog_index() { @@ -248,23 +352,17 @@ void storage::load_binlog_index() { } void storage::validate_binlog_index( - const storage_object_name_container &object_names) { - std::size_t known_entries{0U}; + const storage_object_name_container &object_names) const { for (auto const &[object_name, object_size] : object_names) { - if (object_name == default_binlog_index_name || - object_name == metadata_name) { - continue; - } if (std::ranges::find(binlog_names_, object_name) == std::end(binlog_names_)) { util::exception_location().raise( "storage contains an object that is not " "referenced in the binlog index"); } - ++known_entries; } - if (known_entries != std::size(binlog_names_)) { + if (std::size(object_names) != std::size(binlog_names_)) { util::exception_location().raise( "binlog index contains a reference to a non-existing object"); } @@ -273,7 +371,7 @@ void storage::validate_binlog_index( // files in the index } -void storage::save_binlog_index() { +void storage::save_binlog_index() const { std::ostringstream oss; for (const auto &binlog_name : binlog_names_) { std::filesystem::path binlog_path{default_binlog_index_entry_path}; @@ -291,7 +389,7 @@ void storage::load_metadata() { replication_mode_ = metadata.root().get<"mode">(); } -void storage::validate_metadata(replication_mode_type replication_mode) { +void storage::validate_metadata(replication_mode_type replication_mode) const { if (replication_mode != replication_mode_) { util::exception_location().raise( "replication mode provided to initialize storage differs from the one " @@ -299,11 +397,70 @@ void storage::validate_metadata(replication_mode_type replication_mode) { } } -void storage::save_metadata() { +void storage::save_metadata() const { storage_metadata metadata{}; metadata.root().get<"mode">() = replication_mode_; const auto content{metadata.str()}; backend_->put_object(metadata_name, util::as_const_byte_span(content)); } +[[nodiscard]] std::string +storage::generate_binlog_metadata_name(std::string_view binlog_name) { + std::string binlog_metadata_name{binlog_name}; + binlog_metadata_name += storage::binlog_metadata_extension; + return binlog_metadata_name; +} + +void storage::load_binlog_metadata(std::string_view binlog_name) { + const auto content{ + backend_->get_object(generate_binlog_metadata_name(binlog_name))}; + binlog_file_metadata metadata{content}; + position_ = metadata.root().get<"size">(); + if (is_in_gtid_replication_mode()) { + gtids_ = metadata.get_gtids(); + } +} + +void storage::save_binlog_metadata(std::string_view binlog_name) const { + binlog_file_metadata metadata{}; + metadata.root().get<"size">() = get_ready_to_flush_position(); + if (is_in_gtid_replication_mode()) { + metadata.set_gtids(get_gtids()); + } + const auto content{metadata.str()}; + backend_->put_object(generate_binlog_metadata_name(binlog_name), + util::as_const_byte_span(content)); +} + +void storage::load_and_validate_binlog_metadata_set( + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + const storage_object_name_container &object_names, + const storage_object_name_container &object_metadata_names) { + for (const auto &binlog_name : binlog_names_) { + const auto binlog_metadata_name{generate_binlog_metadata_name(binlog_name)}; + if (!object_metadata_names.contains(binlog_metadata_name)) { + util::exception_location().raise( + "missing metadata for a binlog listed in the binlog index"); + } + load_binlog_metadata(binlog_name); + if (get_current_position() != object_names.at(binlog_name)) { + util::exception_location().raise( + "size from the binlog metadata does not match the actual binlog " + "size"); + } + if (!is_in_gtid_replication_mode() && !gtids_.is_empty()) { + util::exception_location().raise( + "found non-empty GTID set in the binlog metadata while in position " + "replication mode"); + } + } + // after this loop position_ and gtids_ should store the values from the last + // binlog file metadata + + if (std::size(object_metadata_names) != std::size(binlog_names_)) { + util::exception_location().raise( + "found metadata for a non-existing binlog"); + } +} + } // namespace binsrv diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 8ac0036..4ce277a 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -27,6 +27,7 @@ #include "binsrv/replication_mode_type_fwd.hpp" #include "binsrv/storage_config_fwd.hpp" +#include "binsrv/gtids/gtid_fwd.hpp" #include "binsrv/gtids/gtid_set.hpp" #include "util/byte_span_fwd.hpp" @@ -38,6 +39,7 @@ class [[nodiscard]] storage { static constexpr std::string_view default_binlog_index_name{"binlog.index"}; static constexpr std::string_view default_binlog_index_entry_path{"."}; static constexpr std::string_view metadata_name{"metadata.json"}; + static constexpr std::string_view binlog_metadata_extension{".json"}; static constexpr std::size_t default_event_buffer_size_in_bytes{16384U}; @@ -57,6 +59,7 @@ class [[nodiscard]] storage { [[nodiscard]] replication_mode_type get_replication_mode() const noexcept { return replication_mode_; } + [[nodiscard]] bool is_in_gtid_replication_mode() const noexcept; [[nodiscard]] bool has_current_binlog_name() const noexcept { return !binlog_names_.empty(); @@ -77,11 +80,15 @@ class [[nodiscard]] storage { check_binlog_name(std::string_view binlog_name) noexcept; [[nodiscard]] bool is_binlog_open() const noexcept; - void open_binlog(std::string_view binlog_name); + + [[nodiscard]] open_binlog_status open_binlog(std::string_view binlog_name); void write_event(util::const_byte_span event_data, - bool at_transaction_boundary); + bool at_transaction_boundary, + const gtids::gtid &transaction_gtid); void close_binlog(); + void discard_incomplete_transaction_events(); + private: basic_storage_backend_ptr backend_; @@ -101,9 +108,10 @@ class [[nodiscard]] storage { using event_buffer_type = std::vector; event_buffer_type event_buffer_{}; std::size_t last_transaction_boundary_position_in_event_buffer_{}; + gtids::gtid_set gtids_in_event_buffer_{}; [[nodiscard]] bool size_checkpointing_enabled() const noexcept { - return checkpoint_size_bytes_ != 0; + return checkpoint_size_bytes_ != 0ULL; } [[nodiscard]] bool interval_checkpointing_enabled() const noexcept { @@ -111,14 +119,35 @@ class [[nodiscard]] storage { std::chrono::steady_clock::duration{}; } + [[nodiscard]] bool has_event_data_to_flush() const noexcept { + return last_transaction_boundary_position_in_event_buffer_ != 0ULL; + } + [[nodiscard]] std::uint64_t get_flushed_position() const noexcept { + return get_current_position() - std::size(event_buffer_); + } + [[nodiscard]] std::uint64_t get_ready_to_flush_position() const noexcept { + return get_flushed_position() + + last_transaction_boundary_position_in_event_buffer_; + } void flush_event_buffer(); void load_binlog_index(); - void validate_binlog_index(const storage_object_name_container &object_names); - void save_binlog_index(); + void validate_binlog_index( + const storage_object_name_container &object_names) const; + void save_binlog_index() const; + void load_metadata(); - void validate_metadata(replication_mode_type replication_mode); - void save_metadata(); + void validate_metadata(replication_mode_type replication_mode) const; + void save_metadata() const; + + [[nodiscard]] static std::string + generate_binlog_metadata_name(std::string_view binlog_name); + void load_binlog_metadata(std::string_view binlog_name); + void save_binlog_metadata(std::string_view binlog_name) const; + + void load_and_validate_binlog_metadata_set( + const storage_object_name_container &object_names, + const storage_object_name_container &object_metadata_names); }; } // namespace binsrv diff --git a/src/binsrv/storage_fwd.hpp b/src/binsrv/storage_fwd.hpp index 0f7c972..84b0268 100644 --- a/src/binsrv/storage_fwd.hpp +++ b/src/binsrv/storage_fwd.hpp @@ -16,8 +16,17 @@ #ifndef BINSRV_STORAGE_FWD_HPP #define BINSRV_STORAGE_FWD_HPP +#include + namespace binsrv { +enum class open_binlog_status : std::uint8_t { + created, + opened_empty, + opened_at_magic_paylod_offset, + opened_with_data_present +}; + class storage; } // namespace binsrv diff --git a/src/binsrv/storage_metadata.cpp b/src/binsrv/storage_metadata.cpp index 408f235..3b68336 100644 --- a/src/binsrv/storage_metadata.cpp +++ b/src/binsrv/storage_metadata.cpp @@ -31,9 +31,8 @@ namespace binsrv { -storage_metadata::storage_metadata() : impl_{} { - root().get<"version">() = expected_metadata_version; -} +storage_metadata::storage_metadata() + : impl_{{expected_storage_metadata_version}, {}} {} storage_metadata::storage_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); @@ -50,7 +49,7 @@ storage_metadata::storage_metadata(std::string_view data) : impl_{} { } void storage_metadata::validate() const { - if (root().get<"version">() != expected_metadata_version) { + if (root().get<"version">() != expected_storage_metadata_version) { util::exception_location().raise( "unsupported storage metadata version"); } diff --git a/src/binsrv/storage_metadata_fwd.hpp b/src/binsrv/storage_metadata_fwd.hpp index f86aeee..30125c3 100644 --- a/src/binsrv/storage_metadata_fwd.hpp +++ b/src/binsrv/storage_metadata_fwd.hpp @@ -22,7 +22,8 @@ namespace binsrv { class storage_metadata; -inline constexpr std::uint32_t expected_metadata_version{1U}; +inline constexpr std::uint32_t expected_storage_metadata_version{1U}; + } // namespace binsrv #endif // BINSRV_STORAGE_METADATA_FWD_HPP diff --git a/src/util/nv_tuple_to_json.hpp b/src/util/nv_tuple_to_json.hpp index 9a33dcb..f91d8ab 100644 --- a/src/util/nv_tuple_to_json.hpp +++ b/src/util/nv_tuple_to_json.hpp @@ -34,16 +34,26 @@ namespace detail { // classes declared in different namespaces. struct insertion_context {}; +// A helper function that emplaces a single nv value to the specified JSON +// object and is used by the tag_invoke() overload for nv_tuple<..>. +template +void emplace_element_into_json_value(boost::json::object &json_object, + const nv_tuple &obj, + const insertion_context &insertion_ctx) { + auto subvalue{ + boost::json::value_from(obj.template get(), insertion_ctx)}; + if (!subvalue.is_null()) { + json_object.emplace(NV::name.sv(), std::move(subvalue)); + } +} + // The tag_invoke() overload for nv_tuple<..>. template void tag_invoke(boost::json::value_from_tag /*unused*/, boost::json::value &json_value, const nv_tuple &obj, const insertion_context &insertion_ctx) { boost::json::object json_object{}; - - (json_object.emplace(NVPack::name.sv(), - boost::json::value_from(obj.template get(), - insertion_ctx)), + (emplace_element_into_json_value(json_object, obj, insertion_ctx), ...); json_value = std::move(json_object); }