Skip to content

Commit

Permalink
TestCase resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
sean-k1 committed Sep 18, 2023
1 parent 8a0280b commit 3700960
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 97 deletions.
2 changes: 1 addition & 1 deletion pymysqlreplication/constants/CHARSET.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def by_name(self, name, dbms="mysql"):
) as f:
f.readline() # pass header
for line in f:
lines = line.split(",")
lines = line.rstrip("\n").split(",")
if len(lines) != 5:
continue

Expand Down
25 changes: 9 additions & 16 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,20 @@ def charset_to_encoding(name):

def __read_string(self, size, column):
string = self.packet.read_length_coded_pascal_string(size)
origin_string = string
if column.character_set_name is not None:
encoding = self.charset_to_encoding(column.character_set_name)
decode_errors = "ignore" if self._ignore_decode_errors else "strict"
try:
string = string.decode(encoding, decode_errors)
except LookupError:
# If python does not support Mysql encoding type ex)swe7 it will not decoding
string = string.decode(errors=decode_errors)
try:
# If Utf-8 decode error show origin String
string = string.decode(errors=decode_errors)
except UnicodeDecodeError:
string = origin_string

return string

def __read_bit(self, column):
Expand Down Expand Up @@ -688,13 +694,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.table_obj = Table(self.table_id, self.schema, self.table, self.columns)
table_map[self.table_id] = self.table_obj
self.optional_metadata = self._get_optional_meta_data()

# We exclude 'CHAR' and 'INTERVAL' as they map to 'TINY' and 'ENUM' respectively
self.reverse_field_type = {
v: k
for k, v in vars(FIELD_TYPE).items()
if isinstance(v, int) and k not in ["CHAR", "INTERVAL"]
}
self._sync_column_info()

def get_table(self):
Expand Down Expand Up @@ -810,7 +809,6 @@ def _sync_column_info(self):
if not self.__optional_meta_data:
# If optional_meta_data is False Do not sync Event Time Column Schemas
return

charset_pos = 0
enum_or_set_pos = 0
enum_pos = 0
Expand All @@ -825,11 +823,9 @@ def _sync_column_info(self):
if self._is_character_column(column_type, dbms=self.dbms):
charset_id = self.optional_metadata.charset_collation_list[charset_pos]
charset_pos += 1

encode_name, collation_name, charset_name = find_charset(
charset_id, dbms=self.dbms
str(charset_id), dbms=self.dbms
)

self.columns[column_idx].collation_name = collation_name
self.columns[column_idx].character_set_name = encode_name

Expand All @@ -840,7 +836,7 @@ def _sync_column_info(self):
enum_or_set_pos += 1

encode_name, collation_name, charset_name = find_charset(
charset_id, dbms=self.dbms
str(charset_id), dbms=self.dbms
)

self.columns[column_idx].collation_name = collation_name
Expand Down Expand Up @@ -1049,9 +1045,6 @@ def _is_numeric_column(column_type):
return True
return False

def _get_field_type_key(self, field_type_value):
return self.reverse_field_type.get(field_type_value, None)


def find_encoding(charset: CHARSET.Charset):
encode = None
Expand Down
98 changes: 48 additions & 50 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ def create_binlog_packet_wrapper(pkt):


class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestMultipleRowBinLogStreamReader, self).setUp()
if self.isMySQL8014AndMore():
self.execute("SET GLOBAL binlog_row_metadata='FULL';")
self.execute("SET GLOBAL binlog_row_image='FULL';")

def ignoredEvents(self):
return [GtidEvent, PreviousGtidsEvent]

Expand Down Expand Up @@ -714,45 +720,44 @@ def test_delete_multiple_row_event(self):
self.assertEqual(event.rows[1]["values"]["id"], 2)
self.assertEqual(event.rows[1]["values"]["data"], "World")

# erase temporary
# def test_ignore_decode_errors(self):
# problematic_unicode_string = (
# b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]'
# )
# self.stream.close()
# self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)")
# self.execute_with_args(
# "INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string)
# )
# self.execute("COMMIT")
#
# # Initialize with ignore_decode_errors=False
# self.stream = BinLogStreamReader(
# self.database,
# server_id=1024,
# only_events=(WriteRowsEvent,),
# ignore_decode_errors=False,
# )
# event = self.stream.fetchone()
# event = self.stream.fetchone()
# with self.assertRaises(UnicodeError):
# event = self.stream.fetchone()
# if event.table_map[event.table_id].column_name_flag:
# data = event.rows[0]["values"]["data"]
#
# # Initialize with ignore_decode_errors=True
# self.stream = BinLogStreamReader(
# self.database,
# server_id=1024,
# only_events=(WriteRowsEvent,),
# ignore_decode_errors=True,
# )
# self.stream.fetchone()
# self.stream.fetchone()
# event = self.stream.fetchone()
# if event.table_map[event.table_id].column_name_flag:
# data = event.rows[0]["values"]["data"]
# self.assertEqual(data, '[{"text":" Some string"}]')
def test_ignore_decode_errors(self):
problematic_unicode_string = (
b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]'
)
self.stream.close()
self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)")
self.execute_with_args(
"INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string)
)
self.execute("COMMIT")

# Initialize with ignore_decode_errors=False
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=(WriteRowsEvent,),
ignore_decode_errors=False,
)
event = self.stream.fetchone()
event = self.stream.fetchone()
with self.assertRaises(UnicodeError):
event = self.stream.fetchone()
if event.table_map[event.table_id].column_name_flag:
data = event.rows[0]["values"]["data"]

# Initialize with ignore_decode_errors=True
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=(WriteRowsEvent,),
ignore_decode_errors=True,
)
self.stream.fetchone()
self.stream.fetchone()
event = self.stream.fetchone()
if event.table_map[event.table_id].column_name_flag:
data = event.rows[0]["values"]["data"]
self.assertEqual(data, '[{"text":" Some string"}]')

def test_drop_column(self):
self.stream.close()
Expand All @@ -774,15 +779,13 @@ def test_drop_column(self):
finally:
self.resetBinLog()

@unittest.expectedFailure
def test_alter_column(self):
self.stream.close()
self.execute(
"CREATE TABLE test_alter_column (id INTEGER(11), data VARCHAR(50))"
)
self.execute("INSERT INTO test_alter_column VALUES (1, 'A value')")
self.execute("COMMIT")
# this is a problem only when column is added in position other than at the end
self.execute(
"ALTER TABLE test_alter_column ADD COLUMN another_data VARCHAR(50) AFTER id"
)
Expand All @@ -796,16 +799,11 @@ def test_alter_column(self):
server_id=1024,
only_events=(WriteRowsEvent,),
)
event = self.stream.fetchone() # insert with two values
# both of these asserts fail because of issue underlying proble described in issue #118
# because it got table schema info after the alter table, it wrongly assumes the second
# column of the first insert is 'another_data'
# ER: {'id': 1, 'data': 'A value'}
# AR: {'id': 1, 'another_data': 'A value'}
self.assertIn("data", event.rows[0]["values"])
self.assertNot("another_data", event.rows[0]["values"])
event = self.stream.fetchone()
self.assertEqual(event.rows[0]["values"]["data"], "A value")
event = self.stream.fetchone() # insert with three values
self.assertEqual(event.rows[0]["values"]["another_data"], "Another value")
self.assertEqual(event.rows[0]["values"]["data"], "A value")
self.stream.fetchone() # insert with three values


class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase):
Expand Down
40 changes: 10 additions & 30 deletions pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ def encode_value(v):


class TestDataType(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestDataType, self).setUp()
if self.isMySQL8014AndMore():
self.execute("SET GLOBAL binlog_row_metadata='FULL';")
self.execute("SET GLOBAL binlog_row_image='FULL';")

def ignoredEvents(self):
return [GtidEvent, PreviousGtidsEvent]

Expand Down Expand Up @@ -111,13 +117,6 @@ def test_varbinary(self):
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["b"], b"\xff\x01\x00\x00")

def test_fixed_length_binary(self):
create_query = "CREATE TABLE test(b BINARY(4))"
insert_query = "INSERT INTO test VALUES(UNHEX('ff010000'))"
event = self.create_and_insert_value(create_query, insert_query)
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["b"], b"\xff\x01\x00\x00")

def test_decimal(self):
create_query = "CREATE TABLE test (test DECIMAL(2,1))"
insert_query = "INSERT INTO test VALUES(4.2)"
Expand Down Expand Up @@ -539,31 +538,31 @@ def test_tiny_blob(self):
insert_query = "INSERT INTO test VALUES('Hello', 'World')"
event = self.create_and_insert_value(create_query, insert_query)
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["test"], b"Hello")
self.assertEqual(event.rows[0]["values"]["test"], "Hello")
self.assertEqual(event.rows[0]["values"]["test2"], "World")

def test_medium_blob(self):
create_query = "CREATE TABLE test (test MEDIUMBLOB, test2 MEDIUMTEXT) CHARACTER SET latin1 COLLATE latin1_bin;"
insert_query = "INSERT INTO test VALUES('Hello', 'World')"
event = self.create_and_insert_value(create_query, insert_query)
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["test"], b"Hello")
self.assertEqual(event.rows[0]["values"]["test"], "Hello")
self.assertEqual(event.rows[0]["values"]["test2"], "World")

def test_long_blob(self):
create_query = "CREATE TABLE test (test LONGBLOB, test2 LONGTEXT) CHARACTER SET latin1 COLLATE latin1_bin;"
insert_query = "INSERT INTO test VALUES('Hello', 'World')"
event = self.create_and_insert_value(create_query, insert_query)
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["test"], b"Hello")
self.assertEqual(event.rows[0]["values"]["test"], "Hello")
self.assertEqual(event.rows[0]["values"]["test2"], "World")

def test_blob(self):
create_query = "CREATE TABLE test (test BLOB, test2 TEXT) CHARACTER SET latin1 COLLATE latin1_bin;"
insert_query = "INSERT INTO test VALUES('Hello', 'World')"
event = self.create_and_insert_value(create_query, insert_query)
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["test"], b"Hello")
self.assertEqual(event.rows[0]["values"]["test"], "Hello")
self.assertEqual(event.rows[0]["values"]["test2"], "World")

def test_string(self):
Expand Down Expand Up @@ -804,25 +803,6 @@ def test_encoding_utf8(self):
if event.table_map[event.table_id].column_name_flag:
self.assertMultiLineEqual(event.rows[0]["values"]["test"], string)

def test_zerofill(self):
create_query = "CREATE TABLE test ( \
test TINYINT UNSIGNED ZEROFILL DEFAULT NULL, \
test2 SMALLINT UNSIGNED ZEROFILL DEFAULT NULL, \
test3 MEDIUMINT UNSIGNED ZEROFILL DEFAULT NULL, \
test4 INT UNSIGNED ZEROFILL DEFAULT NULL, \
test5 BIGINT UNSIGNED ZEROFILL DEFAULT NULL \
)"
insert_query = (
"INSERT INTO test (test, test2, test3, test4, test5) VALUES(1, 1, 1, 1, 1)"
)
event = self.create_and_insert_value(create_query, insert_query)
if event.table_map[event.table_id].column_name_flag:
self.assertEqual(event.rows[0]["values"]["test"], "001")
self.assertEqual(event.rows[0]["values"]["test2"], "00001")
self.assertEqual(event.rows[0]["values"]["test3"], "00000001")
self.assertEqual(event.rows[0]["values"]["test4"], "0000000001")
self.assertEqual(event.rows[0]["values"]["test5"], "00000000000000000001")

def test_partition_id(self):
if not self.isMySQL80AndMore():
self.skipTest("Not supported in this version of MySQL")
Expand Down

0 comments on commit 3700960

Please sign in to comment.