Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse status variables in query event #360

Merged
merged 5 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pymysqlreplication/constants/STATUS_VAR_KEY.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#from enum import IntEnum

#class StatusVarsKey(IntEnum):
"""List of Query_event_status_vars

A status variable in query events is a sequence of status KEY-VALUE pairs.
The class variables enumerated below are KEYs.
Each KEY determines the length of corresponding VALUE.

For further details refer to:
mysql-server: https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532
MySQL Documentation: https://dev.mysql.com/doc/internals/en/query-event.html

Status variable key names From mysql-server source code, edited by dongwook-chan
"""

# KEY
Q_FLAGS2_CODE = 0x00
Q_SQL_MODE_CODE = 0X01
Q_CATALOG_CODE = 0x02
Q_AUTO_INCREMENT = 0x03
Q_CHARSET_CODE = 0x04
Q_TIME_ZONE_CODE = 0x05
Q_CATALOG_NZ_CODE = 0x06
Q_LC_TIME_NAMES_CODE = 0x07
Q_CHARSET_DATABASE_CODE = 0x08
Q_TABLE_MAP_FOR_UPDATE_CODE = 0x09
Q_MASTER_DATA_WRITTEN_CODE = 0x0A
Q_INVOKER = 0x0B
Q_UPDATED_DB_NAMES = 0x0C
Q_MICROSECONDS = 0x0D
Q_COMMIT_TS = 0x0E
Q_COMMIT_TS2 = 0X0F
Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP = 0X10
Q_DDL_LOGGED_WITH_XID = 0X11
Q_DEFAULT_COLLATION_FOR_UTF8MB4 = 0X12
Q_SQL_REQUIRE_PRIMARY_KEY = 0X13
Q_DEFAULT_TABLE_ENCRYPTION = 0X14
1 change: 1 addition & 0 deletions pymysqlreplication/constants/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

from .BINLOG import *
from .FIELD_TYPE import *
from .STATUS_VAR_KEY import *
83 changes: 82 additions & 1 deletion pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import binascii
import struct
import datetime
from pymysqlreplication.constants.STATUS_VAR_KEY import *


class BinLogEvent(object):
Expand Down Expand Up @@ -167,7 +168,13 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.status_vars_length = self.packet.read_uint16()

# Payload
self.status_vars = self.packet.read(self.status_vars_length)
status_vars_end_pos = self.packet.read_bytes + self.status_vars_length
while self.packet.read_bytes < status_vars_end_pos: # while 남은 data length가 얼마만큼? OR read_bytes
# read KEY for status variable
status_vars_key = self.packet.read_uint8()
# read VALUE for status variable
self._read_status_vars_value_for_key(status_vars_key)

self.schema = self.packet.read(self.schema_length)
self.packet.advance(1)

Expand All @@ -181,6 +188,80 @@ def _dump(self):
print("Execution time: %d" % (self.execution_time))
print("Query: %s" % (self.query))


# TODO: check if instance attribute with the same name already exists
# TODO: put all the instace attribute in separate class? called status_vars
# TODO: does length need to be remembered?
# TODO: ref(mysql doc. and mysql-server) for each hunk
def _read_status_vars_value_for_key(self, key):
"""parse status variable VALUE for given KEY

A status variable in query events is a sequence of status KEY-VALUE pairs.
Parsing logic from mysql-server source code edited by dongwook-chan
https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336

Args:
key: key for status variable
"""
if key == Q_FLAGS2_CODE: # 0x00
self.flags2 = self.packet.read_uint32()
elif key == Q_SQL_MODE_CODE: # 0x01
self.sql_mode = self.packet.read_uint64()
elif key == Q_CATALOG_CODE: # 0x02 for MySQL 5.0.x
pass
elif key == Q_AUTO_INCREMENT: # 0x03
self.auto_increment_increment = self.packet.read_uint16()
self.auto_increment_offset = self.packet.read_uint16()
elif key == Q_CHARSET_CODE: # 0x04
self.character_set_client = self.packet.read_uint16()
self.collation_connection = self.packet.read_uint16()
self.collation_server = self.packet.read_uint16()
elif key == Q_TIME_ZONE_CODE: # 0x05
time_zone_len = self.packet.read_uint8()
if time_zone_len:
self.time_zone = self.packet.read(time_zone_len)
elif key == Q_CATALOG_NZ_CODE: # 0x06
catalog_len = self.packet.read_uint8()
if catalog_len:
self.catalog_nz_code = self.packet.read(catalog_len)
elif key == Q_LC_TIME_NAMES_CODE: # 0x07
self.lc_time_names_number = self.packet.read_uint16()
elif key == Q_CHARSET_DATABASE_CODE: # 0x08
self.charset_database_number = self.packet.read_uint16()
elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09
self.table_map_for_update = self.packet.read_uint64()
elif key == Q_MASTER_DATA_WRITTEN_CODE: # 0x0A
pass
elif key == Q_INVOKER: # 0x0B
user_len = self.packet.read_uint8()
if user_len:
self.user = self.packet.read(user_len)
host_len = self.packet.read_uint8()
if host_len:
self.host = self.packet.read(host_len)
elif key == Q_UPDATED_DB_NAMES: # 0x0C
mts_accessed_dbs = self.packet.read_uint8()
dbs = []
for i in range(mts_accessed_dbs):
db = self.packet.read_string()
dbs.append(db)
self.mts_accessed_db_names = dbs
elif key == Q_MICROSECONDS: # 0x0D
self.microseconds = self.packet.read_uint24()
elif key == Q_COMMIT_TS: # 0x0E
pass
elif key == Q_COMMIT_TS2: # 0x0F
pass
elif key == Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:# 0x10
self.explicit_defaults_ts = self.packet.read_uint8()
elif key == Q_DDL_LOGGED_WITH_XID: # 0x11
self.ddl_xid = self.packet.read_uint64()
elif key == Q_DEFAULT_COLLATION_FOR_UTF8MB4: # 0x12
self.default_collation_for_utf8mb4_number = self.packet.read_uint16()
elif key == Q_SQL_REQUIRE_PRIMARY_KEY: # 0x13
self.sql_require_primary_key = self.packet.read_uint8()
elif key == Q_DEFAULT_TABLE_ENCRYPTION: # 0x14
self.default_table_encryption = self.packet.read_uint8()

class BeginLoadQueryEvent(BinLogEvent):
"""
Expand Down
17 changes: 17 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,20 @@ def _read(x):
return self.read_binary_json_type(x[0], length)

return [_read(x) for x in values_type_offset_inline]

def read_string(self):
"""Read a 'Length Coded String' from the data buffer.

Read __data_buffer until NULL character (0 = \0 = \x00)

Returns:
Binary string parsed from __data_buffer
"""
string = b''
while True:
char = self.read(1)
if char == b'\0':
break
string += char

return string
39 changes: 39 additions & 0 deletions pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ def create_and_insert_value(self, create_query, insert_query):
self.assertIsInstance(event, WriteRowsEvent)
return event

def create_table(self, create_query):
"""Create table

Create table in db and return query event.

Returns:
Query event
"""

self.execute(create_query)

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)

event = self.stream.fetchone()

self.assertEqual(event.event_type, QUERY_EVENT)

return event

def test_decimal(self):
create_query = "CREATE TABLE test (test DECIMAL(2,1))"
insert_query = "INSERT INTO test VALUES(4.2)"
Expand Down Expand Up @@ -641,5 +661,24 @@ def test_partition_id(self):
self.assertEqual(event.extra_data_type, 1)
self.assertEqual(event.partition_id, 3)

def test_status_vars(self):
"""Test parse of status variables in query events

Majority of status variables available depends on the settings of db.
Therefore, this test only tests system variable values independent from settings of db.
Note that if you change default db name 'pymysqlreplication_test',
event.mts_accessed_db_names MUST be asserted against the changed db name.

Returns:
binary string parsed from __data_buffer

Raises:
AssertionError: if no
"""
create_query = "CREATE TABLE test (id INTEGER)"
event = self.create_table(create_query)
self.assertEqual(event.catalog_nz_code, b'std')
self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test'])

if __name__ == "__main__":
unittest.main()