From 1d5fda100a2bdb6bb590e55ec7f5029293738fab Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Fri, 24 Sep 2021 15:55:23 +0900 Subject: [PATCH 1/5] Parse status variables in query event packet Status variables contain useful information: 1. List of databases affected by the query 2. Runtime value of system variables (SQL_MODE, AUTOCOMMIT, CHARSET_SERVER, etc.) 3. timezone of the master For further information please refer to; [Syntax of system variables field in query event](https://dev.mysql.com/doc/internals/en/query-event.html#q-microseconds) [Definition of enumeration for system variable keys](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532) [Semantics of system variable values](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L156-L448) --- .../constants/STATUS_VAR_KEY.py | 68 +++++++++++++++ pymysqlreplication/constants/__init__.py | 1 + pymysqlreplication/event.py | 84 ++++++++++++++++++- pymysqlreplication/packet.py | 29 +++++++ pymysqlreplication/tests/test_data_type.py | 7 ++ 5 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 pymysqlreplication/constants/STATUS_VAR_KEY.py diff --git a/pymysqlreplication/constants/STATUS_VAR_KEY.py b/pymysqlreplication/constants/STATUS_VAR_KEY.py new file mode 100644 index 00000000..e1320278 --- /dev/null +++ b/pymysqlreplication/constants/STATUS_VAR_KEY.py @@ -0,0 +1,68 @@ +#from enum import IntEnum + +#class StatusVarsKey(IntEnum): +"""List of Query_event_status_vars + + A status variable in query events is a sequence of status KEY-VALUE pairs. + The class variables enumerated below are KEYs. + Each KEY determines the length of corresponding VALUE. + + For further details refer to: + mysql-server: https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532 + MySQL Documentation: https://dev.mysql.com/doc/internals/en/query-event.html + + From mysql-server source code, edited buy dongwook-chan +""" + +# KEY +Q_FLAGS2_CODE = 0x00 +Q_SQL_MODE_CODE = 0X01 +Q_CATALOG_CODE = 0x02 +Q_AUTO_INCREMENT = 0x03 +Q_CHARSET_CODE = 0x04 +Q_TIME_ZONE_CODE = 0x05 +Q_CATALOG_NZ_CODE = 0x06 +Q_LC_TIME_NAMES_CODE = 0x07 +Q_CHARSET_DATABASE_CODE = 0x08 +Q_TABLE_MAP_FOR_UPDATE_CODE = 0x09 +Q_MASTER_DATA_WRITTEN_CODE = 0x0A +Q_INVOKER = 0x0B +Q_UPDATED_DB_NAMES = 0x0C +Q_MICROSECONDS = 0x0D +Q_COMMIT_TS = 0x0E +Q_COMMIT_TS2 = 0X0F +Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP = 0X10 +Q_DDL_LOGGED_WITH_XID = 0X11 +Q_DEFAULT_COLLATION_FOR_UTF8MB4 = 0X12 +Q_SQL_REQUIRE_PRIMARY_KEY = 0X13 +Q_DEFAULT_TABLE_ENCRYPTION = 0X14 + +""" + is_fixed_length_value = [ + True, + True, + False, + True, + True, + + ] + + # length for VALUE + value_length_fixed_for_key = [ + + ] + + value_length_multiplier_for_key = [ + + ] + + value_length_remainder_for_key = [ + + ] +""" +""" + #@classmethod + @staticmethod + def value_length_for_key(): + pass +""" \ No newline at end of file diff --git a/pymysqlreplication/constants/__init__.py b/pymysqlreplication/constants/__init__.py index 0c9d19fd..11bff9d8 100644 --- a/pymysqlreplication/constants/__init__.py +++ b/pymysqlreplication/constants/__init__.py @@ -2,3 +2,4 @@ from .BINLOG import * from .FIELD_TYPE import * +from .STATUS_VAR_KEY import * diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index dbba589d..43670c74 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,6 +3,8 @@ import binascii import struct import datetime +#from .constants import STATUS_VAR_KEY +from pymysqlreplication.constants.STATUS_VAR_KEY import * class BinLogEvent(object): @@ -167,7 +169,15 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.status_vars_length = self.packet.read_uint16() # Payload - self.status_vars = self.packet.read(self.status_vars_length) + status_vars_end_pos = self.packet.read_bytes + self.status_vars_length + self.keys = [] #@ + while self.packet.read_bytes < status_vars_end_pos: # while 남은 data length가 얼마만큼? OR read_bytes + # read KEY for status variable + status_vars_key = self.packet.read_uint8() + self.keys.append(status_vars_key) #@ + # read VALUE for status variable + self._read_status_vars_value_for_key(status_vars_key) + self.schema = self.packet.read(self.schema_length) self.packet.advance(1) @@ -181,6 +191,78 @@ def _dump(self): print("Execution time: %d" % (self.execution_time)) print("Query: %s" % (self.query)) + + # TODO: check if instance attribute with the same name already exists + # TODO: put all the instace attribute in separate class? called status_vars + # TODO: does length need to be remembered? + # TODO: ref(mysql doc. and mysql-server) for each hunk + def _read_status_vars_value_for_key(self, key): + """ + https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336 + + From mysql-server source code edited by dongwook-chan + """ + if key == Q_FLAGS2_CODE: # 0x00 + self.flags2 = self.packet.read_uint32() + elif key == Q_SQL_MODE_CODE: # 0x01 + self.sql_mode = self.packet.read_uint64() + elif key == Q_CATALOG_CODE: # 0x02 + catalog_len = self.packet.read_uint8() + if catalog_len: + self.catalog_nz_code = self.packet.read(catalog_len + 1) + elif key == Q_AUTO_INCREMENT: # 0x03 + self.auto_increment_increment = self.packet.read_uint16() + self.auto_increment_offset = self.packet.read_uint16() + elif key == Q_CHARSET_CODE: # 0x04 + self.character_set_client = self.packet.read_uint16() + self.collation_connection = self.packet.read_uint16() + self.collation_server = self.packet.read_uint16() + elif key == Q_TIME_ZONE_CODE: # 0x05 + time_zone_len = self.packet.read_uint8() + if time_zone_len: + self.time_zone = self.packet.read(time_zone_len) + elif key == Q_CATALOG_NZ_CODE: # 0x06 + catalog_len = self.packet.read_uint8() + if catalog_len: + self.catalog_nz_code = self.packet.read(catalog_len) + elif key == Q_LC_TIME_NAMES_CODE: # 0x07 + self.lc_time_names_number = self.packet.read_uint16() + elif key == Q_CHARSET_DATABASE_CODE: # 0x08 + self.charset_database_number = self.packet.read_uint16() + elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09 + self.table_map_for_update = self.packet.read_uint64() + elif key == Q_MASTER_DATA_WRITTEN_CODE: # 0x0A + pass + elif key == Q_INVOKER: # 0x0B + user_len = self.packet.read_uint8() + if user_len: + self.user = self.packet.read(user_len) + host_len = self.packet.read_uint8() + if host_len: + self.host = self.packet.read(host_len) + elif key == Q_UPDATED_DB_NAMES: # 0x0C + mts_accessed_dbs = self.packet.read_uint8() + dbs = [] + for i in range(mts_accessed_dbs): + db = self.packet.read_string() + dbs.append(db) + self.mts_accessed_db_names = dbs + elif key == Q_MICROSECONDS: # 0x0D + self.microseconds = self.packet.read_uint24() + elif key == Q_COMMIT_TS: # 0x0E + pass + elif key == Q_COMMIT_TS2: # 0x0F + pass + elif key == Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:# 0x10 + self.explicit_defaults_ts = self.packet.read_uint8() + elif key == Q_DDL_LOGGED_WITH_XID: # 0x11 + self.ddl_xid = self.packet.read_uint64() + elif key == Q_DEFAULT_COLLATION_FOR_UTF8MB4: # 0x12 + self.default_collation_for_utf8mb4_number = self.packet.read_uint16() + elif key == Q_SQL_REQUIRE_PRIMARY_KEY: # 0x13 + self.sql_require_primary_key = self.packet.read_uint8() + elif key == Q_DEFAULT_TABLE_ENCRYPTION: # 0x14 + self.default_table_encryption = self.packet.read_uint8() class BeginLoadQueryEvent(BinLogEvent): """ diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index de29f4f7..4bc58094 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -461,3 +461,32 @@ def _read(x): return self.read_binary_json_type(x[0], length) return [_read(x) for x in values_type_offset_inline] + + def read_string(self): + """Read a 'Length Coded String' from the data buffer. + + A 'Length Coded String' consists first of a length coded + (unsigned, positive) integer represented in 1-9 bytes followed by + that many bytes of binary data. (For example "cat" would be "3cat".) + + From PyMYSQL source code edited by dongwook-chan + """ + string = b'' + while True: + char = self.read(1) + if char == b'\0': + break + string += char + + return string + + """ + #self.buf = __data_buffer #@ + end_pos = self.__data_buffer.find(b"\0") + self.read_bytes += end_pos + 1 + if end_pos < 0: + return None + result = self.__data_buffer[ : end_pos] + self.__data_buffer = self.__data_buffer[end_pos + 1 : ] + return result + """ diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 169e581e..80201e95 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -641,5 +641,12 @@ def test_partition_id(self): self.assertEqual(event.extra_data_type, 1) self.assertEqual(event.partition_id, 3) + def test_status_vars(self): + create_query = "CREATE TABLE test (id INTEGER)" + insert_query = "SELECT null" + event = self.create_and_insert_value(create_query, insert_query) + self.assertEqual(event.catalog_nz_code, b'std') + self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test']) + if __name__ == "__main__": unittest.main() From bdb8492ec15b5c6775db07fd00d51d4e5254c3ca Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Fri, 24 Sep 2021 16:37:40 +0900 Subject: [PATCH 2/5] Clean codes for parsing status variables --- pymysqlreplication/tests/test_data_type.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 80201e95..93325399 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -643,7 +643,7 @@ def test_partition_id(self): def test_status_vars(self): create_query = "CREATE TABLE test (id INTEGER)" - insert_query = "SELECT null" + insert_query = "insert into test values (1)" # not necessary event = self.create_and_insert_value(create_query, insert_query) self.assertEqual(event.catalog_nz_code, b'std') self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test']) From 1b789e6570c0f94902c989834efc7148fee2a86e Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Fri, 24 Sep 2021 19:03:29 +0900 Subject: [PATCH 3/5] Add create_table function for testing query events only --- .../constants/STATUS_VAR_KEY.py | 30 ------------------- pymysqlreplication/event.py | 1 - pymysqlreplication/packet.py | 17 +---------- pymysqlreplication/tests/test_data_type.py | 15 ++++++++-- 4 files changed, 14 insertions(+), 49 deletions(-) diff --git a/pymysqlreplication/constants/STATUS_VAR_KEY.py b/pymysqlreplication/constants/STATUS_VAR_KEY.py index e1320278..46bce077 100644 --- a/pymysqlreplication/constants/STATUS_VAR_KEY.py +++ b/pymysqlreplication/constants/STATUS_VAR_KEY.py @@ -36,33 +36,3 @@ Q_DEFAULT_COLLATION_FOR_UTF8MB4 = 0X12 Q_SQL_REQUIRE_PRIMARY_KEY = 0X13 Q_DEFAULT_TABLE_ENCRYPTION = 0X14 - -""" - is_fixed_length_value = [ - True, - True, - False, - True, - True, - - ] - - # length for VALUE - value_length_fixed_for_key = [ - - ] - - value_length_multiplier_for_key = [ - - ] - - value_length_remainder_for_key = [ - - ] -""" -""" - #@classmethod - @staticmethod - def value_length_for_key(): - pass -""" \ No newline at end of file diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 43670c74..9ffa0336 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,7 +3,6 @@ import binascii import struct import datetime -#from .constants import STATUS_VAR_KEY from pymysqlreplication.constants.STATUS_VAR_KEY import * diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 4bc58094..c027bcf1 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -465,11 +465,7 @@ def _read(x): def read_string(self): """Read a 'Length Coded String' from the data buffer. - A 'Length Coded String' consists first of a length coded - (unsigned, positive) integer represented in 1-9 bytes followed by - that many bytes of binary data. (For example "cat" would be "3cat".) - - From PyMYSQL source code edited by dongwook-chan + """ string = b'' while True: @@ -479,14 +475,3 @@ def read_string(self): string += char return string - - """ - #self.buf = __data_buffer #@ - end_pos = self.__data_buffer.find(b"\0") - self.read_bytes += end_pos + 1 - if end_pos < 0: - return None - result = self.__data_buffer[ : end_pos] - self.__data_buffer = self.__data_buffer[end_pos + 1 : ] - return result - """ diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 93325399..eb7efced 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -57,6 +57,18 @@ def create_and_insert_value(self, create_query, insert_query): self.assertIsInstance(event, WriteRowsEvent) return event + def create_table(self, create_query): + self.execute(create_query) + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + + event = self.stream.fetchone() + + self.assertEqual(event.event_type, QueryEvent) + + return event + def test_decimal(self): create_query = "CREATE TABLE test (test DECIMAL(2,1))" insert_query = "INSERT INTO test VALUES(4.2)" @@ -643,8 +655,7 @@ def test_partition_id(self): def test_status_vars(self): create_query = "CREATE TABLE test (id INTEGER)" - insert_query = "insert into test values (1)" # not necessary - event = self.create_and_insert_value(create_query, insert_query) + event = self.create_table(create_query) self.assertEqual(event.catalog_nz_code, b'std') self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test']) From b7f85af33340fce4dfe4cc3fb85dbe7a92904f0d Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Fri, 24 Sep 2021 19:41:46 +0900 Subject: [PATCH 4/5] Correct comparison object for status variable test --- pymysqlreplication/tests/test_data_type.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index eb7efced..c4ffd153 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -65,7 +65,7 @@ def create_table(self, create_query): event = self.stream.fetchone() - self.assertEqual(event.event_type, QueryEvent) + self.assertEqual(event.event_type, QUERY_EVENT) return event From d8bef6a93a3bbdb49e0ac09cf15a9532d3524e19 Mon Sep 17 00:00:00 2001 From: dongwook-chan Date: Fri, 24 Sep 2021 20:17:30 +0900 Subject: [PATCH 5/5] Add comments for parse of status variables in query events --- .../constants/STATUS_VAR_KEY.py | 2 +- pymysqlreplication/event.py | 18 ++++++++-------- pymysqlreplication/packet.py | 5 ++++- pymysqlreplication/tests/test_data_type.py | 21 +++++++++++++++++++ 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/pymysqlreplication/constants/STATUS_VAR_KEY.py b/pymysqlreplication/constants/STATUS_VAR_KEY.py index 46bce077..aa0ffe78 100644 --- a/pymysqlreplication/constants/STATUS_VAR_KEY.py +++ b/pymysqlreplication/constants/STATUS_VAR_KEY.py @@ -11,7 +11,7 @@ mysql-server: https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532 MySQL Documentation: https://dev.mysql.com/doc/internals/en/query-event.html - From mysql-server source code, edited buy dongwook-chan + Status variable key names From mysql-server source code, edited by dongwook-chan """ # KEY diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 9ffa0336..d5ee1060 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -169,11 +169,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload status_vars_end_pos = self.packet.read_bytes + self.status_vars_length - self.keys = [] #@ while self.packet.read_bytes < status_vars_end_pos: # while 남은 data length가 얼마만큼? OR read_bytes # read KEY for status variable status_vars_key = self.packet.read_uint8() - self.keys.append(status_vars_key) #@ # read VALUE for status variable self._read_status_vars_value_for_key(status_vars_key) @@ -196,19 +194,21 @@ def _dump(self): # TODO: does length need to be remembered? # TODO: ref(mysql doc. and mysql-server) for each hunk def _read_status_vars_value_for_key(self, key): - """ + """parse status variable VALUE for given KEY + + A status variable in query events is a sequence of status KEY-VALUE pairs. + Parsing logic from mysql-server source code edited by dongwook-chan https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336 - From mysql-server source code edited by dongwook-chan + Args: + key: key for status variable """ if key == Q_FLAGS2_CODE: # 0x00 self.flags2 = self.packet.read_uint32() elif key == Q_SQL_MODE_CODE: # 0x01 self.sql_mode = self.packet.read_uint64() - elif key == Q_CATALOG_CODE: # 0x02 - catalog_len = self.packet.read_uint8() - if catalog_len: - self.catalog_nz_code = self.packet.read(catalog_len + 1) + elif key == Q_CATALOG_CODE: # 0x02 for MySQL 5.0.x + pass elif key == Q_AUTO_INCREMENT: # 0x03 self.auto_increment_increment = self.packet.read_uint16() self.auto_increment_offset = self.packet.read_uint16() @@ -226,7 +226,7 @@ def _read_status_vars_value_for_key(self, key): self.catalog_nz_code = self.packet.read(catalog_len) elif key == Q_LC_TIME_NAMES_CODE: # 0x07 self.lc_time_names_number = self.packet.read_uint16() - elif key == Q_CHARSET_DATABASE_CODE: # 0x08 + elif key == Q_CHARSET_DATABASE_CODE: # 0x08 self.charset_database_number = self.packet.read_uint16() elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09 self.table_map_for_update = self.packet.read_uint64() diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index c027bcf1..a5a7c0fd 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -465,7 +465,10 @@ def _read(x): def read_string(self): """Read a 'Length Coded String' from the data buffer. - + Read __data_buffer until NULL character (0 = \0 = \x00) + + Returns: + Binary string parsed from __data_buffer """ string = b'' while True: diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index c4ffd153..37873444 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -58,6 +58,14 @@ def create_and_insert_value(self, create_query, insert_query): return event def create_table(self, create_query): + """Create table + + Create table in db and return query event. + + Returns: + Query event + """ + self.execute(create_query) self.assertIsInstance(self.stream.fetchone(), RotateEvent) @@ -654,6 +662,19 @@ def test_partition_id(self): self.assertEqual(event.partition_id, 3) def test_status_vars(self): + """Test parse of status variables in query events + + Majority of status variables available depends on the settings of db. + Therefore, this test only tests system variable values independent from settings of db. + Note that if you change default db name 'pymysqlreplication_test', + event.mts_accessed_db_names MUST be asserted against the changed db name. + + Returns: + binary string parsed from __data_buffer + + Raises: + AssertionError: if no + """ create_query = "CREATE TABLE test (id INTEGER)" event = self.create_table(create_query) self.assertEqual(event.catalog_nz_code, b'std')