Skip to content

Commit

Permalink
Merge pull request #313 from mpforce1/patch-1
Browse files Browse the repository at this point in the history
Fix for PubSub reconnection bug
  • Loading branch information
rdavydov authored Aug 8, 2023
2 parents 19fbf42 + 29d5d8a commit 4fd9125
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 68 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repos:
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
files: ^TwitchChannelPointsMiner/
Expand All @@ -16,7 +16,7 @@ repos:
hooks:
- id: black
files: ^TwitchChannelPointsMiner/
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
Expand Down
38 changes: 15 additions & 23 deletions TwitchChannelPointsMiner/TwitchChannelPointsMiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def __init__(
):
# Fixes TypeError: 'NoneType' object is not subscriptable
if not username or username == "your-twitch-username":
logger.error(
"Please edit your runner file (usually run.py) and try again.")
logger.error("Please edit your runner file (usually run.py) and try again.")
logger.error("No username, exiting...")
sys.exit(0)

Expand Down Expand Up @@ -122,7 +121,8 @@ def is_connected():

if enable_analytics is True:
Settings.analytics_path = os.path.join(
Path().absolute(), "analytics", username)
Path().absolute(), "analytics", username
)
Path(Settings.analytics_path).mkdir(parents=True, exist_ok=True)

self.username = username
Expand Down Expand Up @@ -161,17 +161,16 @@ def is_connected():
current_version, github_version = check_versions()

logger.info(
f"Twitch Channel Points Miner v2-{current_version} (fork by rdavydov)")
logger.info(
"https://github.com/rdavydov/Twitch-Channel-Points-Miner-v2")
f"Twitch Channel Points Miner v2-{current_version} (fork by rdavydov)"
)
logger.info("https://github.com/rdavydov/Twitch-Channel-Points-Miner-v2")

if github_version == "0.0.0":
logger.error(
"Unable to detect if you have the latest version of this script"
)
elif current_version != github_version:
logger.info(
f"You are running the version {current_version} of this script")
logger.info(f"You are running the version {current_version} of this script")
logger.info(f"The latest version on GitHub is: {github_version}")

for sign in [signal.SIGINT, signal.SIGSEGV, signal.SIGTERM]:
Expand All @@ -195,8 +194,7 @@ def analytics(
http_server.name = "Analytics Thread"
http_server.start()
else:
logger.error(
"Can't start analytics(), please set enable_analytics=True")
logger.error("Can't start analytics(), please set enable_analytics=True")

def mine(
self,
Expand Down Expand Up @@ -242,8 +240,7 @@ def run(
streamers_dict[username] = streamer

if followers is True:
followers_array = self.twitch.get_followers(
order=followers_order)
followers_array = self.twitch.get_followers(order=followers_order)
logger.info(
f"Load {len(followers_array)} followers from your profile!",
extra={"emoji": ":clipboard:"},
Expand All @@ -266,8 +263,7 @@ def run(
if isinstance(streamers_dict[username], Streamer) is True
else Streamer(username)
)
streamer.channel_id = self.twitch.get_channel_id(
username)
streamer.channel_id = self.twitch.get_channel_id(username)
streamer.settings = set_default_settings(
streamer.settings, Settings.streamer_settings
)
Expand Down Expand Up @@ -309,8 +305,7 @@ def run(
# If we have at least one streamer with settings = claim_drops True
# Spawn a thread for sync inventory and dashboard
if (
at_least_one_value_in_settings_is(
self.streamers, "claim_drops", True)
at_least_one_value_in_settings_is(self.streamers, "claim_drops", True)
is True
):
self.sync_campaigns_thread = threading.Thread(
Expand Down Expand Up @@ -369,14 +364,12 @@ def run(

if streamer.settings.make_predictions is True:
self.ws_pool.submit(
PubsubTopic("predictions-channel-v1",
streamer=streamer)
PubsubTopic("predictions-channel-v1", streamer=streamer)
)

if streamer.settings.claim_moments is True:
self.ws_pool.submit(
PubsubTopic("community-moments-channel-v1",
streamer=streamer)
PubsubTopic("community-moments-channel-v1", streamer=streamer)
)

refresh_context = time.time()
Expand All @@ -386,15 +379,14 @@ def run(
# Check if is not None because maybe we have already created a new connection on array+1 and now index is None
for index in range(0, len(self.ws_pool.ws)):
if (
self.ws_pool.ws[index].is_reconneting is False
self.ws_pool.ws[index].is_reconnecting is False
and self.ws_pool.ws[index].elapsed_last_ping() > 10
and internet_connection_available() is True
):
logger.info(
f"#{index} - The last PING was sent more than 10 minutes ago. Reconnecting to the WebSocket..."
)
WebSocketsPool.handle_reconnection(
self.ws_pool.ws[index])
WebSocketsPool.handle_reconnection(self.ws_pool.ws[index])

if ((time.time() - refresh_context) // 60) >= 30:
refresh_context = time.time()
Expand Down
2 changes: 1 addition & 1 deletion TwitchChannelPointsMiner/classes/TwitchWebSocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, index, parent_pool, *args, **kw):
self.is_closed = False
self.is_opened = False

self.is_reconneting = False
self.is_reconnecting = False
self.forced_close = False

# Custom attribute
Expand Down
87 changes: 45 additions & 42 deletions TwitchChannelPointsMiner/classes/WebSocketsPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def __submit(self, index, topic):
if self.ws[index].is_opened is False:
self.ws[index].pending_topics.append(topic)
else:
self.ws[index].listen(
topic, self.twitch.twitch_login.get_auth_token())
self.ws[index].listen(topic, self.twitch.twitch_login.get_auth_token())

def __new(self, index):
return TwitchWebSocket(
Expand All @@ -70,8 +69,12 @@ def __new(self, index):
def __start(self, index):
if Settings.disable_ssl_cert_verification is True:
import ssl
thread_ws = Thread(target=lambda: self.ws[index].run_forever(
sslopt={"cert_reqs": ssl.CERT_NONE}))

thread_ws = Thread(
target=lambda: self.ws[index].run_forever(
sslopt={"cert_reqs": ssl.CERT_NONE}
)
)
logger.warn("SSL certificate verification is disabled! Be aware!")
else:
thread_ws = Thread(target=lambda: self.ws[index].run_forever())
Expand All @@ -96,7 +99,7 @@ def run():
while ws.is_closed is False:
# Else: the ws is currently in reconnecting phase, you can't do ping or other operation.
# Probably this ws will be closed very soon with ws.is_closed = True
if ws.is_reconneting is False:
if ws.is_reconnecting is False:
ws.ping() # We need ping for keep the connection alive
time.sleep(random.uniform(25, 30))

Expand Down Expand Up @@ -124,38 +127,40 @@ def on_close(ws, close_status_code, close_reason):

@staticmethod
def handle_reconnection(ws):
# Close the current WebSocket.
ws.is_closed = True
ws.keep_running = False
# Reconnect only if ws.forced_close is False (replace the keep_running)

# Set the current socket as reconnecting status
# So the external ping check will be locked
ws.is_reconneting = True

if ws.forced_close is False:
logger.info(
f"#{ws.index} - Reconnecting to Twitch PubSub server in ~60 seconds"
)
time.sleep(30)

while internet_connection_available() is False:
random_sleep = random.randint(1, 3)
logger.warning(
f"#{ws.index} - No internet connection available! Retry after {random_sleep}m"
# Reconnect only if ws.is_reconnecting is False to prevent more than 1 ws from being created
if ws.is_reconnecting is False:
# Close the current WebSocket.
ws.is_closed = True
ws.keep_running = False
# Reconnect only if ws.forced_close is False (replace the keep_running)

# Set the current socket as reconnecting status
# So the external ping check will be locked
ws.is_reconnecting = True

if ws.forced_close is False:
logger.info(
f"#{ws.index} - Reconnecting to Twitch PubSub server in ~60 seconds"
)
time.sleep(random_sleep * 60)
time.sleep(30)

while internet_connection_available() is False:
random_sleep = random.randint(1, 3)
logger.warning(
f"#{ws.index} - No internet connection available! Retry after {random_sleep}m"
)
time.sleep(random_sleep * 60)

# Why not create a new ws on the same array index? Let's try.
self = ws.parent_pool
# Create a new connection.
self.ws[ws.index] = self.__new(ws.index)
# Why not create a new ws on the same array index? Let's try.
self = ws.parent_pool
# Create a new connection.
self.ws[ws.index] = self.__new(ws.index)

self.__start(ws.index) # Start a new thread.
time.sleep(30)
self.__start(ws.index) # Start a new thread.
time.sleep(30)

for topic in ws.topics:
self.__submit(ws.index, topic)
for topic in ws.topics:
self.__submit(ws.index, topic)

@staticmethod
def on_message(ws, message):
Expand All @@ -179,8 +184,7 @@ def on_message(ws, message):
ws.last_message_timestamp = message.timestamp
ws.last_message_type_channel = message.identifier

streamer_index = get_streamer_index(
ws.streamers, message.channel_id)
streamer_index = get_streamer_index(ws.streamers, message.channel_id)
if streamer_index != -1:
try:
if message.topic == "community-points-user-v1":
Expand Down Expand Up @@ -239,14 +243,12 @@ def on_message(ws, message):
message.message["raid"]["id"],
message.message["raid"]["target_login"],
)
ws.twitch.update_raid(
ws.streamers[streamer_index], raid)
ws.twitch.update_raid(ws.streamers[streamer_index], raid)

elif message.topic == "community-moments-channel-v1":
if message.type == "active":
ws.twitch.claim_moment(
ws.streamers[streamer_index],
message.data["moment_id"]
ws.streamers[streamer_index], message.data["moment_id"]
)

elif message.topic == "predictions-channel-v1":
Expand Down Expand Up @@ -381,7 +383,9 @@ def on_message(ws, message):
if event_prediction.result["type"] != "LOSE":
# Analytics switch
if Settings.enable_analytics is True:
ws.streamers[streamer_index].persistent_annotations(
ws.streamers[
streamer_index
].persistent_annotations(
event_prediction.result["type"],
f"{ws.events_predictions[event_id].title}",
)
Expand All @@ -400,8 +404,7 @@ def on_message(ws, message):
)

elif response["type"] == "RESPONSE" and len(response.get("error", "")) > 0:
raise RuntimeError(
f"Error while trying to listen for a topic: {response}")
raise RuntimeError(f"Error while trying to listen for a topic: {response}")

elif response["type"] == "RECONNECT":
logger.info(f"#{ws.index} - Reconnection required")
Expand Down

0 comments on commit 4fd9125

Please sign in to comment.