Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to media controller #831

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions pychromecast/controllers/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,9 @@ def update_status(self, *, callback_function: CallbackType | None = None) -> Non
{MESSAGE_TYPE: TYPE_GET_STATUS}, callback_function=callback_function
)

def _send_command(self, command: dict) -> None:
def _send_command(
self, command: dict, callback_function: CallbackType | None
) -> None:
"""Send a command to the Chromecast on media channel."""
if self.status is None or self.status.media_session_id is None:
self.logger.warning(
Expand All @@ -618,7 +620,9 @@ def _send_command(self, command: dict) -> None:

command["mediaSessionId"] = self.status.media_session_id

self.send_message(command, inc_session_id=True)
self.send_message(
command, callback_function=callback_function, inc_session_id=True
)

@property
def is_playing(self) -> bool:
Expand Down Expand Up @@ -655,55 +659,80 @@ def thumbnail(self) -> str | None:

return images[0].url if images else None

def play(self) -> None:
def play(self, timeout: float = 10.0) -> None:
"""Send the PLAY command."""
self._send_command({MESSAGE_TYPE: TYPE_PLAY})
response_handler = WaitResponse(timeout)
self._send_command({MESSAGE_TYPE: TYPE_PLAY}, response_handler.callback)
response_handler.wait_response()

def pause(self) -> None:
def pause(self, timeout: float = 10.0) -> None:
"""Send the PAUSE command."""
self._send_command({MESSAGE_TYPE: TYPE_PAUSE})
response_handler = WaitResponse(timeout)
self._send_command({MESSAGE_TYPE: TYPE_PAUSE}, response_handler.callback)
response_handler.wait_response()

def stop(self) -> None:
def stop(self, timeout: float = 10.0) -> None:
"""Send the STOP command."""
self._send_command({MESSAGE_TYPE: TYPE_STOP})
response_handler = WaitResponse(timeout)
self._send_command({MESSAGE_TYPE: TYPE_STOP}, response_handler.callback)
response_handler.wait_response()

def rewind(self) -> None:
def rewind(self, timeout: float = 10.0) -> None:
"""Starts playing the media from the beginning."""
self.seek(0)
self.seek(0, timeout)

def skip(self) -> None:
def skip(self, timeout: float = 10.0) -> None:
"""Skips rest of the media. Values less then -5 behaved flaky."""
if not self.status.duration or self.status.duration < 5:
return
self.seek(int(self.status.duration) - 5)
self.seek(int(self.status.duration) - 5, timeout)

def seek(self, position: float) -> None:
def seek(self, position: float, timeout: float = 10.0) -> None:
"""Seek the media to a specific location."""
response_handler = WaitResponse(timeout)
self._send_command(
{
MESSAGE_TYPE: TYPE_SEEK,
"currentTime": position,
"resumeState": "PLAYBACK_START",
}
},
response_handler.callback,
)
response_handler.wait_response()

def queue_next(self) -> None:
def queue_next(self, timeout: float = 10.0) -> None:
"""Send the QUEUE_NEXT command."""
self._send_command({MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": 1})
response_handler = WaitResponse(timeout)
self._send_command(
{MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": 1}, response_handler.callback
)
response_handler.wait_response()

def queue_prev(self) -> None:
def queue_prev(self, timeout: float = 10.0) -> None:
"""Send the QUEUE_PREV command."""
self._send_command({MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": -1})
response_handler = WaitResponse(timeout)
self._send_command(
{MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": -1}, response_handler.callback
)
response_handler.wait_response()

def enable_subtitle(self, track_id: int) -> None:
def enable_subtitle(self, track_id: int, timeout: float = 10.0) -> None:
"""Enable specific text track."""
response_handler = WaitResponse(timeout)
self._send_command(
{MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": [track_id]}
{MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": [track_id]},
response_handler.callback,
)
response_handler.wait_response()

def disable_subtitle(self) -> None:
def disable_subtitle(self, timeout: float = 10.0) -> None:
"""Disable subtitle."""
self._send_command({MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": []})
response_handler = WaitResponse(timeout)
self._send_command(
{MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": []},
response_handler.callback,
)
response_handler.wait_response()

def block_until_active(self, timeout: float | None = None) -> None:
"""
Expand Down