Skip to content

Commit

Permalink
Merge pull request #355 from dongwook-chan/extra-data
Browse files Browse the repository at this point in the history
Fix parsing of row events for MySQL8 partitioned table
  • Loading branch information
julien-duponchelle authored Aug 30, 2021
2 parents 8b67828 + 6e9be94 commit c24032b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
17 changes: 16 additions & 1 deletion pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,22 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.event_type == BINLOG.DELETE_ROWS_EVENT_V2 or \
self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
self.flags, self.extra_data_length = struct.unpack('<HH', self.packet.read(4))
self.extra_data = self.packet.read(self.extra_data_length / 8)
if self.extra_data_length > 2:
self.extra_data_type = struct.unpack('<B', self.packet.read(1))[0]

# ndb information
if self.extra_data_type == 0:
self.nbd_info_length, self.nbd_info_format = struct.unpack('<BB', self.packet.read(1))
self.nbd_info = self.packet.read(self.nbd_info_length - 2)
# partition information
elif self.extra_data_type == 1:
if self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
self.partition_id, self.source_partition_id = struct.unpack('<HH', self.packet.read(4))
else:
self.partition_id = struct.unpack('<H', self.packet.read(2))[0]
# etc
else:
self.extra_data = self.packet.read(self.extra_info_length - 3)
else:
self.flags = struct.unpack('<H', self.packet.read(2))[0]

Expand Down
4 changes: 4 additions & 0 deletions pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def isMySQL57(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version == 5.7

def isMySQL80AndMore(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version >= 8.0

@property
def supportsGTID(self):
if not self.isMySQL56AndMore():
Expand Down
16 changes: 16 additions & 0 deletions pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,5 +625,21 @@ def test_zerofill(self):
self.assertEqual(event.rows[0]["values"]["test4"], '0000000001')
self.assertEqual(event.rows[0]["values"]["test5"], '00000000000000000001')

def test_partition_id(self):
if not self.isMySQL80AndMore():
self.skipTest("Not supported in this version of MySQL")
create_query = "CREATE TABLE test (id INTEGER) \
PARTITION BY RANGE (id) ( \
PARTITION p0 VALUES LESS THAN (1), \
PARTITION p1 VALUES LESS THAN (2), \
PARTITION p2 VALUES LESS THAN (3), \
PARTITION p3 VALUES LESS THAN (4), \
PARTITION p4 VALUES LESS THAN (5) \
)"
insert_query = "INSERT INTO test (id) VALUES(3)"
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.extra_data_type, 1)
self.assertEqual(event.partition_id, 3)

if __name__ == "__main__":
unittest.main()

0 comments on commit c24032b

Please sign in to comment.