From 29d5d8a36075d9092c79dec90950d148c0283d44 Mon Sep 17 00:00:00 2001 From: mpforce1 Date: Sat, 15 Jul 2023 08:48:33 +0100 Subject: [PATCH] Fix for reconnection bug. Fixed typo in TwitchWebSocket.is_reconnecting. pre-commit changes. --- .pre-commit-config.yaml | 4 +- .../TwitchChannelPointsMiner.py | 38 ++++---- .../classes/TwitchWebSocket.py | 2 +- .../classes/WebSocketsPool.py | 87 ++++++++++--------- 4 files changed, 63 insertions(+), 68 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4ada90a6..5a07a6c4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,7 +6,7 @@ repos: - id: end-of-file-fixer - id: check-added-large-files - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort files: ^TwitchChannelPointsMiner/ @@ -16,7 +16,7 @@ repos: hooks: - id: black files: ^TwitchChannelPointsMiner/ -- repo: https://gitlab.com/pycqa/flake8 +- repo: https://github.com/pycqa/flake8 rev: 3.9.2 hooks: - id: flake8 diff --git a/TwitchChannelPointsMiner/TwitchChannelPointsMiner.py b/TwitchChannelPointsMiner/TwitchChannelPointsMiner.py index 9aed9a55..f35b44de 100644 --- a/TwitchChannelPointsMiner/TwitchChannelPointsMiner.py +++ b/TwitchChannelPointsMiner/TwitchChannelPointsMiner.py @@ -88,8 +88,7 @@ def __init__( ): # Fixes TypeError: 'NoneType' object is not subscriptable if not username or username == "your-twitch-username": - logger.error( - "Please edit your runner file (usually run.py) and try again.") + logger.error("Please edit your runner file (usually run.py) and try again.") logger.error("No username, exiting...") sys.exit(0) @@ -122,7 +121,8 @@ def is_connected(): if enable_analytics is True: Settings.analytics_path = os.path.join( - Path().absolute(), "analytics", username) + Path().absolute(), "analytics", username + ) Path(Settings.analytics_path).mkdir(parents=True, exist_ok=True) self.username = username @@ -161,17 +161,16 @@ def is_connected(): current_version, github_version = check_versions() logger.info( - f"Twitch Channel Points Miner v2-{current_version} (fork by rdavydov)") - logger.info( - "https://github.com/rdavydov/Twitch-Channel-Points-Miner-v2") + f"Twitch Channel Points Miner v2-{current_version} (fork by rdavydov)" + ) + logger.info("https://github.com/rdavydov/Twitch-Channel-Points-Miner-v2") if github_version == "0.0.0": logger.error( "Unable to detect if you have the latest version of this script" ) elif current_version != github_version: - logger.info( - f"You are running the version {current_version} of this script") + logger.info(f"You are running the version {current_version} of this script") logger.info(f"The latest version on GitHub is: {github_version}") for sign in [signal.SIGINT, signal.SIGSEGV, signal.SIGTERM]: @@ -195,8 +194,7 @@ def analytics( http_server.name = "Analytics Thread" http_server.start() else: - logger.error( - "Can't start analytics(), please set enable_analytics=True") + logger.error("Can't start analytics(), please set enable_analytics=True") def mine( self, @@ -242,8 +240,7 @@ def run( streamers_dict[username] = streamer if followers is True: - followers_array = self.twitch.get_followers( - order=followers_order) + followers_array = self.twitch.get_followers(order=followers_order) logger.info( f"Load {len(followers_array)} followers from your profile!", extra={"emoji": ":clipboard:"}, @@ -266,8 +263,7 @@ def run( if isinstance(streamers_dict[username], Streamer) is True else Streamer(username) ) - streamer.channel_id = self.twitch.get_channel_id( - username) + streamer.channel_id = self.twitch.get_channel_id(username) streamer.settings = set_default_settings( streamer.settings, Settings.streamer_settings ) @@ -309,8 +305,7 @@ def run( # If we have at least one streamer with settings = claim_drops True # Spawn a thread for sync inventory and dashboard if ( - at_least_one_value_in_settings_is( - self.streamers, "claim_drops", True) + at_least_one_value_in_settings_is(self.streamers, "claim_drops", True) is True ): self.sync_campaigns_thread = threading.Thread( @@ -369,14 +364,12 @@ def run( if streamer.settings.make_predictions is True: self.ws_pool.submit( - PubsubTopic("predictions-channel-v1", - streamer=streamer) + PubsubTopic("predictions-channel-v1", streamer=streamer) ) if streamer.settings.claim_moments is True: self.ws_pool.submit( - PubsubTopic("community-moments-channel-v1", - streamer=streamer) + PubsubTopic("community-moments-channel-v1", streamer=streamer) ) refresh_context = time.time() @@ -386,15 +379,14 @@ def run( # Check if is not None because maybe we have already created a new connection on array+1 and now index is None for index in range(0, len(self.ws_pool.ws)): if ( - self.ws_pool.ws[index].is_reconneting is False + self.ws_pool.ws[index].is_reconnecting is False and self.ws_pool.ws[index].elapsed_last_ping() > 10 and internet_connection_available() is True ): logger.info( f"#{index} - The last PING was sent more than 10 minutes ago. Reconnecting to the WebSocket..." ) - WebSocketsPool.handle_reconnection( - self.ws_pool.ws[index]) + WebSocketsPool.handle_reconnection(self.ws_pool.ws[index]) if ((time.time() - refresh_context) // 60) >= 30: refresh_context = time.time() diff --git a/TwitchChannelPointsMiner/classes/TwitchWebSocket.py b/TwitchChannelPointsMiner/classes/TwitchWebSocket.py index 2b3dc334..f81a398c 100644 --- a/TwitchChannelPointsMiner/classes/TwitchWebSocket.py +++ b/TwitchChannelPointsMiner/classes/TwitchWebSocket.py @@ -18,7 +18,7 @@ def __init__(self, index, parent_pool, *args, **kw): self.is_closed = False self.is_opened = False - self.is_reconneting = False + self.is_reconnecting = False self.forced_close = False # Custom attribute diff --git a/TwitchChannelPointsMiner/classes/WebSocketsPool.py b/TwitchChannelPointsMiner/classes/WebSocketsPool.py index a19923fc..f08f9d22 100644 --- a/TwitchChannelPointsMiner/classes/WebSocketsPool.py +++ b/TwitchChannelPointsMiner/classes/WebSocketsPool.py @@ -52,8 +52,7 @@ def __submit(self, index, topic): if self.ws[index].is_opened is False: self.ws[index].pending_topics.append(topic) else: - self.ws[index].listen( - topic, self.twitch.twitch_login.get_auth_token()) + self.ws[index].listen(topic, self.twitch.twitch_login.get_auth_token()) def __new(self, index): return TwitchWebSocket( @@ -70,8 +69,12 @@ def __new(self, index): def __start(self, index): if Settings.disable_ssl_cert_verification is True: import ssl - thread_ws = Thread(target=lambda: self.ws[index].run_forever( - sslopt={"cert_reqs": ssl.CERT_NONE})) + + thread_ws = Thread( + target=lambda: self.ws[index].run_forever( + sslopt={"cert_reqs": ssl.CERT_NONE} + ) + ) logger.warn("SSL certificate verification is disabled! Be aware!") else: thread_ws = Thread(target=lambda: self.ws[index].run_forever()) @@ -96,7 +99,7 @@ def run(): while ws.is_closed is False: # Else: the ws is currently in reconnecting phase, you can't do ping or other operation. # Probably this ws will be closed very soon with ws.is_closed = True - if ws.is_reconneting is False: + if ws.is_reconnecting is False: ws.ping() # We need ping for keep the connection alive time.sleep(random.uniform(25, 30)) @@ -124,38 +127,40 @@ def on_close(ws, close_status_code, close_reason): @staticmethod def handle_reconnection(ws): - # Close the current WebSocket. - ws.is_closed = True - ws.keep_running = False - # Reconnect only if ws.forced_close is False (replace the keep_running) - - # Set the current socket as reconnecting status - # So the external ping check will be locked - ws.is_reconneting = True - - if ws.forced_close is False: - logger.info( - f"#{ws.index} - Reconnecting to Twitch PubSub server in ~60 seconds" - ) - time.sleep(30) - - while internet_connection_available() is False: - random_sleep = random.randint(1, 3) - logger.warning( - f"#{ws.index} - No internet connection available! Retry after {random_sleep}m" + # Reconnect only if ws.is_reconnecting is False to prevent more than 1 ws from being created + if ws.is_reconnecting is False: + # Close the current WebSocket. + ws.is_closed = True + ws.keep_running = False + # Reconnect only if ws.forced_close is False (replace the keep_running) + + # Set the current socket as reconnecting status + # So the external ping check will be locked + ws.is_reconnecting = True + + if ws.forced_close is False: + logger.info( + f"#{ws.index} - Reconnecting to Twitch PubSub server in ~60 seconds" ) - time.sleep(random_sleep * 60) + time.sleep(30) + + while internet_connection_available() is False: + random_sleep = random.randint(1, 3) + logger.warning( + f"#{ws.index} - No internet connection available! Retry after {random_sleep}m" + ) + time.sleep(random_sleep * 60) - # Why not create a new ws on the same array index? Let's try. - self = ws.parent_pool - # Create a new connection. - self.ws[ws.index] = self.__new(ws.index) + # Why not create a new ws on the same array index? Let's try. + self = ws.parent_pool + # Create a new connection. + self.ws[ws.index] = self.__new(ws.index) - self.__start(ws.index) # Start a new thread. - time.sleep(30) + self.__start(ws.index) # Start a new thread. + time.sleep(30) - for topic in ws.topics: - self.__submit(ws.index, topic) + for topic in ws.topics: + self.__submit(ws.index, topic) @staticmethod def on_message(ws, message): @@ -179,8 +184,7 @@ def on_message(ws, message): ws.last_message_timestamp = message.timestamp ws.last_message_type_channel = message.identifier - streamer_index = get_streamer_index( - ws.streamers, message.channel_id) + streamer_index = get_streamer_index(ws.streamers, message.channel_id) if streamer_index != -1: try: if message.topic == "community-points-user-v1": @@ -239,14 +243,12 @@ def on_message(ws, message): message.message["raid"]["id"], message.message["raid"]["target_login"], ) - ws.twitch.update_raid( - ws.streamers[streamer_index], raid) + ws.twitch.update_raid(ws.streamers[streamer_index], raid) elif message.topic == "community-moments-channel-v1": if message.type == "active": ws.twitch.claim_moment( - ws.streamers[streamer_index], - message.data["moment_id"] + ws.streamers[streamer_index], message.data["moment_id"] ) elif message.topic == "predictions-channel-v1": @@ -381,7 +383,9 @@ def on_message(ws, message): if event_prediction.result["type"] != "LOSE": # Analytics switch if Settings.enable_analytics is True: - ws.streamers[streamer_index].persistent_annotations( + ws.streamers[ + streamer_index + ].persistent_annotations( event_prediction.result["type"], f"{ws.events_predictions[event_id].title}", ) @@ -400,8 +404,7 @@ def on_message(ws, message): ) elif response["type"] == "RESPONSE" and len(response.get("error", "")) > 0: - raise RuntimeError( - f"Error while trying to listen for a topic: {response}") + raise RuntimeError(f"Error while trying to listen for a topic: {response}") elif response["type"] == "RECONNECT": logger.info(f"#{ws.index} - Reconnection required")