Skip to content

Commit

Permalink
check possible optional metadata version And delete get Table informa…
Browse files Browse the repository at this point in the history
…tion

check possible optional metadata version And delete get Table information

check possible optional metadata version And delete get Table information
  • Loading branch information
sean-k1 committed Sep 17, 2023
1 parent 06ddd9f commit 10e90a1
Showing 1 changed file with 54 additions and 33 deletions.
87 changes: 54 additions & 33 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def close(self):
if self.__connected_ctl:
# break reference cycle between stream reader and underlying
# mysql connection object
self._ctl_connection._get_table_information = None
self._ctl_connection.close()
self.__connected_ctl = False

Expand All @@ -301,9 +300,9 @@ def __connect_to_ctl(self):
self._ctl_connection_settings["cursorclass"] = DictCursor
self._ctl_connection_settings["autocommit"] = True
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
self._ctl_connection._get_table_information = self.__get_table_information
self._ctl_connection._get_dbms = self.__get_dbms
self.__connected_ctl = True
self.__check_optional_meta_data()

def __checksum_enabled(self):
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
Expand Down Expand Up @@ -548,6 +547,28 @@ def __set_mariadb_settings(self):

return prelude

def __check_optional_meta_data(self):
cur = self._ctl_connection.cursor()
cur.execute("SHOW VARIABLES LIKE 'BINLOG_ROW_METADATA';")
value = cur.fetchone()
if value is None:
logging.log(
logging.WARN,
"""
Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
use python-mysql-replication version Before 1.0 version """,
)
else:
value = value.get("Value", "")
if value.upper() != "FULL":
logging.log(
logging.WARN,
"""
Setting The Variable Value BINLOG_ROW_METADATA = FULL
By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
""",
)

def fetchone(self):
while True:
if self.end_log_pos and self.is_past_end_log_pos:
Expand Down Expand Up @@ -709,37 +730,37 @@ def _allowed_event_list(
pass
return frozenset(events)

def __get_table_information(self, schema, table):
for i in range(1, 3):
try:
if not self.__connected_ctl:
self.__connect_to_ctl()

cur = self._ctl_connection.cursor()
cur.execute(
"""
SELECT
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
DATA_TYPE, CHARACTER_OCTET_LENGTH
FROM
information_schema.columns
WHERE
table_schema = %s AND table_name = %s
""",
(schema, table),
)
result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"])
cur.close()

return result
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
self.__connected_ctl = False
continue
else:
raise error
# def __get_table_information(self, schema, table):
# for i in range(1, 3):
# try:
# if not self.__connected_ctl:
# self.__connect_to_ctl()
#
# cur = self._ctl_connection.cursor()
# cur.execute(
# """
# SELECT
# COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
# COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
# DATA_TYPE, CHARACTER_OCTET_LENGTH
# FROM
# information_schema.columns
# WHERE
# table_schema = %s AND table_name = %s
# """,
# (schema, table),
# )
# result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"])
# cur.close()
#
# return result
# except pymysql.OperationalError as error:
# code, message = error.args
# if code in MYSQL_EXPECTED_ERROR_CODES:
# self.__connected_ctl = False
# continue
# else:
# raise error

def __get_dbms(self):
if not self.__connected_ctl:
Expand Down

0 comments on commit 10e90a1

Please sign in to comment.