Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for PubSub reconnection bug #313

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repos:
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
files: ^TwitchChannelPointsMiner/
Expand All @@ -16,7 +16,7 @@ repos:
hooks:
- id: black
files: ^TwitchChannelPointsMiner/
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
Expand Down
38 changes: 15 additions & 23 deletions TwitchChannelPointsMiner/TwitchChannelPointsMiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def __init__(
):
# Fixes TypeError: 'NoneType' object is not subscriptable
if not username or username == "your-twitch-username":
logger.error(
"Please edit your runner file (usually run.py) and try again.")
logger.error("Please edit your runner file (usually run.py) and try again.")
logger.error("No username, exiting...")
sys.exit(0)

Expand Down Expand Up @@ -122,7 +121,8 @@ def is_connected():

if enable_analytics is True:
Settings.analytics_path = os.path.join(
Path().absolute(), "analytics", username)
Path().absolute(), "analytics", username
)
Path(Settings.analytics_path).mkdir(parents=True, exist_ok=True)

self.username = username
Expand Down Expand Up @@ -161,17 +161,16 @@ def is_connected():
current_version, github_version = check_versions()

logger.info(
f"Twitch Channel Points Miner v2-{current_version} (fork by rdavydov)")
logger.info(
"https://github.com/rdavydov/Twitch-Channel-Points-Miner-v2")
f"Twitch Channel Points Miner v2-{current_version} (fork by rdavydov)"
)
logger.info("https://github.com/rdavydov/Twitch-Channel-Points-Miner-v2")

if github_version == "0.0.0":
logger.error(
"Unable to detect if you have the latest version of this script"
)
elif current_version != github_version:
logger.info(
f"You are running the version {current_version} of this script")
logger.info(f"You are running the version {current_version} of this script")
logger.info(f"The latest version on GitHub is: {github_version}")

for sign in [signal.SIGINT, signal.SIGSEGV, signal.SIGTERM]:
Expand All @@ -195,8 +194,7 @@ def analytics(
http_server.name = "Analytics Thread"
http_server.start()
else:
logger.error(
"Can't start analytics(), please set enable_analytics=True")
logger.error("Can't start analytics(), please set enable_analytics=True")

def mine(
self,
Expand Down Expand Up @@ -242,8 +240,7 @@ def run(
streamers_dict[username] = streamer

if followers is True:
followers_array = self.twitch.get_followers(
order=followers_order)
followers_array = self.twitch.get_followers(order=followers_order)
logger.info(
f"Load {len(followers_array)} followers from your profile!",
extra={"emoji": ":clipboard:"},
Expand All @@ -266,8 +263,7 @@ def run(
if isinstance(streamers_dict[username], Streamer) is True
else Streamer(username)
)
streamer.channel_id = self.twitch.get_channel_id(
username)
streamer.channel_id = self.twitch.get_channel_id(username)
streamer.settings = set_default_settings(
streamer.settings, Settings.streamer_settings
)
Expand Down Expand Up @@ -309,8 +305,7 @@ def run(
# If we have at least one streamer with settings = claim_drops True
# Spawn a thread for sync inventory and dashboard
if (
at_least_one_value_in_settings_is(
self.streamers, "claim_drops", True)
at_least_one_value_in_settings_is(self.streamers, "claim_drops", True)
is True
):
self.sync_campaigns_thread = threading.Thread(
Expand Down Expand Up @@ -369,14 +364,12 @@ def run(

if streamer.settings.make_predictions is True:
self.ws_pool.submit(
PubsubTopic("predictions-channel-v1",
streamer=streamer)
PubsubTopic("predictions-channel-v1", streamer=streamer)
)

if streamer.settings.claim_moments is True:
self.ws_pool.submit(
PubsubTopic("community-moments-channel-v1",
streamer=streamer)
PubsubTopic("community-moments-channel-v1", streamer=streamer)
)

refresh_context = time.time()
Expand All @@ -386,15 +379,14 @@ def run(
# Check if is not None because maybe we have already created a new connection on array+1 and now index is None
for index in range(0, len(self.ws_pool.ws)):
if (
self.ws_pool.ws[index].is_reconneting is False
self.ws_pool.ws[index].is_reconnecting is False
and self.ws_pool.ws[index].elapsed_last_ping() > 10
and internet_connection_available() is True
):
logger.info(
f"#{index} - The last PING was sent more than 10 minutes ago. Reconnecting to the WebSocket..."
)
WebSocketsPool.handle_reconnection(
self.ws_pool.ws[index])
WebSocketsPool.handle_reconnection(self.ws_pool.ws[index])

if ((time.time() - refresh_context) // 60) >= 30:
refresh_context = time.time()
Expand Down
2 changes: 1 addition & 1 deletion TwitchChannelPointsMiner/classes/TwitchWebSocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, index, parent_pool, *args, **kw):
self.is_closed = False
self.is_opened = False

self.is_reconneting = False
self.is_reconnecting = False
self.forced_close = False

# Custom attribute
Expand Down
87 changes: 45 additions & 42 deletions TwitchChannelPointsMiner/classes/WebSocketsPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def __submit(self, index, topic):
if self.ws[index].is_opened is False:
self.ws[index].pending_topics.append(topic)
else:
self.ws[index].listen(
topic, self.twitch.twitch_login.get_auth_token())
self.ws[index].listen(topic, self.twitch.twitch_login.get_auth_token())

def __new(self, index):
return TwitchWebSocket(
Expand All @@ -70,8 +69,12 @@ def __new(self, index):
def __start(self, index):
if Settings.disable_ssl_cert_verification is True:
import ssl
thread_ws = Thread(target=lambda: self.ws[index].run_forever(
sslopt={"cert_reqs": ssl.CERT_NONE}))

thread_ws = Thread(
target=lambda: self.ws[index].run_forever(
sslopt={"cert_reqs": ssl.CERT_NONE}
)
)
logger.warn("SSL certificate verification is disabled! Be aware!")
else:
thread_ws = Thread(target=lambda: self.ws[index].run_forever())
Expand All @@ -96,7 +99,7 @@ def run():
while ws.is_closed is False:
# Else: the ws is currently in reconnecting phase, you can't do ping or other operation.
# Probably this ws will be closed very soon with ws.is_closed = True
if ws.is_reconneting is False:
if ws.is_reconnecting is False:
ws.ping() # We need ping for keep the connection alive
time.sleep(random.uniform(25, 30))

Expand Down Expand Up @@ -124,38 +127,40 @@ def on_close(ws, close_status_code, close_reason):

@staticmethod
def handle_reconnection(ws):
# Close the current WebSocket.
ws.is_closed = True
ws.keep_running = False
# Reconnect only if ws.forced_close is False (replace the keep_running)

# Set the current socket as reconnecting status
# So the external ping check will be locked
ws.is_reconneting = True

if ws.forced_close is False:
logger.info(
f"#{ws.index} - Reconnecting to Twitch PubSub server in ~60 seconds"
)
time.sleep(30)

while internet_connection_available() is False:
random_sleep = random.randint(1, 3)
logger.warning(
f"#{ws.index} - No internet connection available! Retry after {random_sleep}m"
# Reconnect only if ws.is_reconnecting is False to prevent more than 1 ws from being created
if ws.is_reconnecting is False:
# Close the current WebSocket.
ws.is_closed = True
ws.keep_running = False
# Reconnect only if ws.forced_close is False (replace the keep_running)

# Set the current socket as reconnecting status
# So the external ping check will be locked
ws.is_reconnecting = True

if ws.forced_close is False:
logger.info(
f"#{ws.index} - Reconnecting to Twitch PubSub server in ~60 seconds"
)
time.sleep(random_sleep * 60)
time.sleep(30)

while internet_connection_available() is False:
random_sleep = random.randint(1, 3)
logger.warning(
f"#{ws.index} - No internet connection available! Retry after {random_sleep}m"
)
time.sleep(random_sleep * 60)

# Why not create a new ws on the same array index? Let's try.
self = ws.parent_pool
# Create a new connection.
self.ws[ws.index] = self.__new(ws.index)
# Why not create a new ws on the same array index? Let's try.
self = ws.parent_pool
# Create a new connection.
self.ws[ws.index] = self.__new(ws.index)

self.__start(ws.index) # Start a new thread.
time.sleep(30)
self.__start(ws.index) # Start a new thread.
time.sleep(30)

for topic in ws.topics:
self.__submit(ws.index, topic)
for topic in ws.topics:
self.__submit(ws.index, topic)

@staticmethod
def on_message(ws, message):
Expand All @@ -179,8 +184,7 @@ def on_message(ws, message):
ws.last_message_timestamp = message.timestamp
ws.last_message_type_channel = message.identifier

streamer_index = get_streamer_index(
ws.streamers, message.channel_id)
streamer_index = get_streamer_index(ws.streamers, message.channel_id)
if streamer_index != -1:
try:
if message.topic == "community-points-user-v1":
Expand Down Expand Up @@ -239,14 +243,12 @@ def on_message(ws, message):
message.message["raid"]["id"],
message.message["raid"]["target_login"],
)
ws.twitch.update_raid(
ws.streamers[streamer_index], raid)
ws.twitch.update_raid(ws.streamers[streamer_index], raid)

elif message.topic == "community-moments-channel-v1":
if message.type == "active":
ws.twitch.claim_moment(
ws.streamers[streamer_index],
message.data["moment_id"]
ws.streamers[streamer_index], message.data["moment_id"]
)

elif message.topic == "predictions-channel-v1":
Expand Down Expand Up @@ -381,7 +383,9 @@ def on_message(ws, message):
if event_prediction.result["type"] != "LOSE":
# Analytics switch
if Settings.enable_analytics is True:
ws.streamers[streamer_index].persistent_annotations(
ws.streamers[
streamer_index
].persistent_annotations(
event_prediction.result["type"],
f"{ws.events_predictions[event_id].title}",
)
Expand All @@ -400,8 +404,7 @@ def on_message(ws, message):
)

elif response["type"] == "RESPONSE" and len(response.get("error", "")) > 0:
raise RuntimeError(
f"Error while trying to listen for a topic: {response}")
raise RuntimeError(f"Error while trying to listen for a topic: {response}")

elif response["type"] == "RECONNECT":
logger.info(f"#{ws.index} - Reconnection required")
Expand Down