From 10e90a18fcb97344163817c1d5a7f465820552c7 Mon Sep 17 00:00:00 2001 From: sean Date: Mon, 18 Sep 2023 00:20:01 +0900 Subject: [PATCH] check possible optional metadata version And delete get Table information check possible optional metadata version And delete get Table information check possible optional metadata version And delete get Table information --- pymysqlreplication/binlogstream.py | 87 ++++++++++++++++++------------ 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fd75e1b4..0da4440f 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -290,7 +290,6 @@ def close(self): if self.__connected_ctl: # break reference cycle between stream reader and underlying # mysql connection object - self._ctl_connection._get_table_information = None self._ctl_connection.close() self.__connected_ctl = False @@ -301,9 +300,9 @@ def __connect_to_ctl(self): self._ctl_connection_settings["cursorclass"] = DictCursor self._ctl_connection_settings["autocommit"] = True self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) - self._ctl_connection._get_table_information = self.__get_table_information self._ctl_connection._get_dbms = self.__get_dbms self.__connected_ctl = True + self.__check_optional_meta_data() def __checksum_enabled(self): """Return True if binlog-checksum = CRC32. Only for MySQL > 5.6""" @@ -548,6 +547,28 @@ def __set_mariadb_settings(self): return prelude + def __check_optional_meta_data(self): + cur = self._ctl_connection.cursor() + cur.execute("SHOW VARIABLES LIKE 'BINLOG_ROW_METADATA';") + value = cur.fetchone() + if value is None: + logging.log( + logging.WARN, + """ + Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions, + use python-mysql-replication version Before 1.0 version """, + ) + else: + value = value.get("Value", "") + if value.upper() != "FULL": + logging.log( + logging.WARN, + """ + Setting The Variable Value BINLOG_ROW_METADATA = FULL + By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT. + """, + ) + def fetchone(self): while True: if self.end_log_pos and self.is_past_end_log_pos: @@ -709,37 +730,37 @@ def _allowed_event_list( pass return frozenset(events) - def __get_table_information(self, schema, table): - for i in range(1, 3): - try: - if not self.__connected_ctl: - self.__connect_to_ctl() - - cur = self._ctl_connection.cursor() - cur.execute( - """ - SELECT - COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME, - COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION, - DATA_TYPE, CHARACTER_OCTET_LENGTH - FROM - information_schema.columns - WHERE - table_schema = %s AND table_name = %s - """, - (schema, table), - ) - result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"]) - cur.close() - - return result - except pymysql.OperationalError as error: - code, message = error.args - if code in MYSQL_EXPECTED_ERROR_CODES: - self.__connected_ctl = False - continue - else: - raise error + # def __get_table_information(self, schema, table): + # for i in range(1, 3): + # try: + # if not self.__connected_ctl: + # self.__connect_to_ctl() + # + # cur = self._ctl_connection.cursor() + # cur.execute( + # """ + # SELECT + # COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME, + # COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION, + # DATA_TYPE, CHARACTER_OCTET_LENGTH + # FROM + # information_schema.columns + # WHERE + # table_schema = %s AND table_name = %s + # """, + # (schema, table), + # ) + # result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"]) + # cur.close() + # + # return result + # except pymysql.OperationalError as error: + # code, message = error.args + # if code in MYSQL_EXPECTED_ERROR_CODES: + # self.__connected_ctl = False + # continue + # else: + # raise error def __get_dbms(self): if not self.__connected_ctl: