From 3700960f8fcfc20b65199c17f7e3e9052c34d7e2 Mon Sep 17 00:00:00 2001 From: sean Date: Mon, 18 Sep 2023 17:55:24 +0900 Subject: [PATCH] TestCase resolved --- pymysqlreplication/constants/CHARSET.py | 2 +- pymysqlreplication/row_event.py | 25 ++---- pymysqlreplication/tests/test_basic.py | 98 +++++++++++----------- pymysqlreplication/tests/test_data_type.py | 40 +++------ 4 files changed, 68 insertions(+), 97 deletions(-) diff --git a/pymysqlreplication/constants/CHARSET.py b/pymysqlreplication/constants/CHARSET.py index 457d43e5..f4e8fb9d 100644 --- a/pymysqlreplication/constants/CHARSET.py +++ b/pymysqlreplication/constants/CHARSET.py @@ -59,7 +59,7 @@ def by_name(self, name, dbms="mysql"): ) as f: f.readline() # pass header for line in f: - lines = line.split(",") + lines = line.rstrip("\n").split(",") if len(lines) != 5: continue diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 38e72640..a88b07ed 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -259,6 +259,7 @@ def charset_to_encoding(name): def __read_string(self, size, column): string = self.packet.read_length_coded_pascal_string(size) + origin_string = string if column.character_set_name is not None: encoding = self.charset_to_encoding(column.character_set_name) decode_errors = "ignore" if self._ignore_decode_errors else "strict" @@ -266,7 +267,12 @@ def __read_string(self, size, column): string = string.decode(encoding, decode_errors) except LookupError: # If python does not support Mysql encoding type ex)swe7 it will not decoding - string = string.decode(errors=decode_errors) + try: + # If Utf-8 decode error show origin String + string = string.decode(errors=decode_errors) + except UnicodeDecodeError: + string = origin_string + return string def __read_bit(self, column): @@ -688,13 +694,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.table_obj = Table(self.table_id, self.schema, self.table, self.columns) table_map[self.table_id] = self.table_obj self.optional_metadata = self._get_optional_meta_data() - - # We exclude 'CHAR' and 'INTERVAL' as they map to 'TINY' and 'ENUM' respectively - self.reverse_field_type = { - v: k - for k, v in vars(FIELD_TYPE).items() - if isinstance(v, int) and k not in ["CHAR", "INTERVAL"] - } self._sync_column_info() def get_table(self): @@ -810,7 +809,6 @@ def _sync_column_info(self): if not self.__optional_meta_data: # If optional_meta_data is False Do not sync Event Time Column Schemas return - charset_pos = 0 enum_or_set_pos = 0 enum_pos = 0 @@ -825,11 +823,9 @@ def _sync_column_info(self): if self._is_character_column(column_type, dbms=self.dbms): charset_id = self.optional_metadata.charset_collation_list[charset_pos] charset_pos += 1 - encode_name, collation_name, charset_name = find_charset( - charset_id, dbms=self.dbms + str(charset_id), dbms=self.dbms ) - self.columns[column_idx].collation_name = collation_name self.columns[column_idx].character_set_name = encode_name @@ -840,7 +836,7 @@ def _sync_column_info(self): enum_or_set_pos += 1 encode_name, collation_name, charset_name = find_charset( - charset_id, dbms=self.dbms + str(charset_id), dbms=self.dbms ) self.columns[column_idx].collation_name = collation_name @@ -1049,9 +1045,6 @@ def _is_numeric_column(column_type): return True return False - def _get_field_type_key(self, field_type_value): - return self.reverse_field_type.get(field_type_value, None) - def find_encoding(charset: CHARSET.Charset): encode = None diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index c6c51c09..353af2ac 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -605,6 +605,12 @@ def create_binlog_packet_wrapper(pkt): class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestMultipleRowBinLogStreamReader, self).setUp() + if self.isMySQL8014AndMore(): + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + self.execute("SET GLOBAL binlog_row_image='FULL';") + def ignoredEvents(self): return [GtidEvent, PreviousGtidsEvent] @@ -714,45 +720,44 @@ def test_delete_multiple_row_event(self): self.assertEqual(event.rows[1]["values"]["id"], 2) self.assertEqual(event.rows[1]["values"]["data"], "World") - # erase temporary - # def test_ignore_decode_errors(self): - # problematic_unicode_string = ( - # b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]' - # ) - # self.stream.close() - # self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)") - # self.execute_with_args( - # "INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string) - # ) - # self.execute("COMMIT") - # - # # Initialize with ignore_decode_errors=False - # self.stream = BinLogStreamReader( - # self.database, - # server_id=1024, - # only_events=(WriteRowsEvent,), - # ignore_decode_errors=False, - # ) - # event = self.stream.fetchone() - # event = self.stream.fetchone() - # with self.assertRaises(UnicodeError): - # event = self.stream.fetchone() - # if event.table_map[event.table_id].column_name_flag: - # data = event.rows[0]["values"]["data"] - # - # # Initialize with ignore_decode_errors=True - # self.stream = BinLogStreamReader( - # self.database, - # server_id=1024, - # only_events=(WriteRowsEvent,), - # ignore_decode_errors=True, - # ) - # self.stream.fetchone() - # self.stream.fetchone() - # event = self.stream.fetchone() - # if event.table_map[event.table_id].column_name_flag: - # data = event.rows[0]["values"]["data"] - # self.assertEqual(data, '[{"text":" Some string"}]') + def test_ignore_decode_errors(self): + problematic_unicode_string = ( + b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]' + ) + self.stream.close() + self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)") + self.execute_with_args( + "INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string) + ) + self.execute("COMMIT") + + # Initialize with ignore_decode_errors=False + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(WriteRowsEvent,), + ignore_decode_errors=False, + ) + event = self.stream.fetchone() + event = self.stream.fetchone() + with self.assertRaises(UnicodeError): + event = self.stream.fetchone() + if event.table_map[event.table_id].column_name_flag: + data = event.rows[0]["values"]["data"] + + # Initialize with ignore_decode_errors=True + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(WriteRowsEvent,), + ignore_decode_errors=True, + ) + self.stream.fetchone() + self.stream.fetchone() + event = self.stream.fetchone() + if event.table_map[event.table_id].column_name_flag: + data = event.rows[0]["values"]["data"] + self.assertEqual(data, '[{"text":" Some string"}]') def test_drop_column(self): self.stream.close() @@ -774,7 +779,6 @@ def test_drop_column(self): finally: self.resetBinLog() - @unittest.expectedFailure def test_alter_column(self): self.stream.close() self.execute( @@ -782,7 +786,6 @@ def test_alter_column(self): ) self.execute("INSERT INTO test_alter_column VALUES (1, 'A value')") self.execute("COMMIT") - # this is a problem only when column is added in position other than at the end self.execute( "ALTER TABLE test_alter_column ADD COLUMN another_data VARCHAR(50) AFTER id" ) @@ -796,16 +799,11 @@ def test_alter_column(self): server_id=1024, only_events=(WriteRowsEvent,), ) - event = self.stream.fetchone() # insert with two values - # both of these asserts fail because of issue underlying proble described in issue #118 - # because it got table schema info after the alter table, it wrongly assumes the second - # column of the first insert is 'another_data' - # ER: {'id': 1, 'data': 'A value'} - # AR: {'id': 1, 'another_data': 'A value'} - self.assertIn("data", event.rows[0]["values"]) - self.assertNot("another_data", event.rows[0]["values"]) + event = self.stream.fetchone() + self.assertEqual(event.rows[0]["values"]["data"], "A value") + event = self.stream.fetchone() # insert with three values + self.assertEqual(event.rows[0]["values"]["another_data"], "Another value") self.assertEqual(event.rows[0]["values"]["data"], "A value") - self.stream.fetchone() # insert with three values class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase): diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 0e0c8570..75a56465 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -34,6 +34,12 @@ def encode_value(v): class TestDataType(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestDataType, self).setUp() + if self.isMySQL8014AndMore(): + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + self.execute("SET GLOBAL binlog_row_image='FULL';") + def ignoredEvents(self): return [GtidEvent, PreviousGtidsEvent] @@ -111,13 +117,6 @@ def test_varbinary(self): if event.table_map[event.table_id].column_name_flag: self.assertEqual(event.rows[0]["values"]["b"], b"\xff\x01\x00\x00") - def test_fixed_length_binary(self): - create_query = "CREATE TABLE test(b BINARY(4))" - insert_query = "INSERT INTO test VALUES(UNHEX('ff010000'))" - event = self.create_and_insert_value(create_query, insert_query) - if event.table_map[event.table_id].column_name_flag: - self.assertEqual(event.rows[0]["values"]["b"], b"\xff\x01\x00\x00") - def test_decimal(self): create_query = "CREATE TABLE test (test DECIMAL(2,1))" insert_query = "INSERT INTO test VALUES(4.2)" @@ -539,7 +538,7 @@ def test_tiny_blob(self): insert_query = "INSERT INTO test VALUES('Hello', 'World')" event = self.create_and_insert_value(create_query, insert_query) if event.table_map[event.table_id].column_name_flag: - self.assertEqual(event.rows[0]["values"]["test"], b"Hello") + self.assertEqual(event.rows[0]["values"]["test"], "Hello") self.assertEqual(event.rows[0]["values"]["test2"], "World") def test_medium_blob(self): @@ -547,7 +546,7 @@ def test_medium_blob(self): insert_query = "INSERT INTO test VALUES('Hello', 'World')" event = self.create_and_insert_value(create_query, insert_query) if event.table_map[event.table_id].column_name_flag: - self.assertEqual(event.rows[0]["values"]["test"], b"Hello") + self.assertEqual(event.rows[0]["values"]["test"], "Hello") self.assertEqual(event.rows[0]["values"]["test2"], "World") def test_long_blob(self): @@ -555,7 +554,7 @@ def test_long_blob(self): insert_query = "INSERT INTO test VALUES('Hello', 'World')" event = self.create_and_insert_value(create_query, insert_query) if event.table_map[event.table_id].column_name_flag: - self.assertEqual(event.rows[0]["values"]["test"], b"Hello") + self.assertEqual(event.rows[0]["values"]["test"], "Hello") self.assertEqual(event.rows[0]["values"]["test2"], "World") def test_blob(self): @@ -563,7 +562,7 @@ def test_blob(self): insert_query = "INSERT INTO test VALUES('Hello', 'World')" event = self.create_and_insert_value(create_query, insert_query) if event.table_map[event.table_id].column_name_flag: - self.assertEqual(event.rows[0]["values"]["test"], b"Hello") + self.assertEqual(event.rows[0]["values"]["test"], "Hello") self.assertEqual(event.rows[0]["values"]["test2"], "World") def test_string(self): @@ -804,25 +803,6 @@ def test_encoding_utf8(self): if event.table_map[event.table_id].column_name_flag: self.assertMultiLineEqual(event.rows[0]["values"]["test"], string) - def test_zerofill(self): - create_query = "CREATE TABLE test ( \ - test TINYINT UNSIGNED ZEROFILL DEFAULT NULL, \ - test2 SMALLINT UNSIGNED ZEROFILL DEFAULT NULL, \ - test3 MEDIUMINT UNSIGNED ZEROFILL DEFAULT NULL, \ - test4 INT UNSIGNED ZEROFILL DEFAULT NULL, \ - test5 BIGINT UNSIGNED ZEROFILL DEFAULT NULL \ - )" - insert_query = ( - "INSERT INTO test (test, test2, test3, test4, test5) VALUES(1, 1, 1, 1, 1)" - ) - event = self.create_and_insert_value(create_query, insert_query) - if event.table_map[event.table_id].column_name_flag: - self.assertEqual(event.rows[0]["values"]["test"], "001") - self.assertEqual(event.rows[0]["values"]["test2"], "00001") - self.assertEqual(event.rows[0]["values"]["test3"], "00000001") - self.assertEqual(event.rows[0]["values"]["test4"], "0000000001") - self.assertEqual(event.rows[0]["values"]["test5"], "00000000000000000001") - def test_partition_id(self): if not self.isMySQL80AndMore(): self.skipTest("Not supported in this version of MySQL")