Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve #67, add async client #73

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions examples/async/filter-search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio
from innertube.asyncio import InnerTube

PARAMS_TYPE_VIDEO = "EgIQAQ%3D%3D"
PARAMS_TYPE_CHANNEL = "EgIQAg%3D%3D"
PARAMS_TYPE_PLAYLIST = "EgIQAw%3D%3D"
PARAMS_TYPE_FILM = "EgIQBA%3D%3D"


async def main() -> None:
async with InnerTube("WEB", "2.20230920.00.00") as client:
data = await client.search("arctic monkeys", params=PARAMS_TYPE_PLAYLIST)

items = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][
"sectionListRenderer"
]["contents"][0]["itemSectionRenderer"]["contents"]

for item in items:
playlist = item["playlistRenderer"]

playlist_id = playlist["playlistId"]
playlist_title = playlist["title"]["simpleText"]
playlist_video_count = playlist["videoCount"]

print(f"[{playlist_id}] {playlist_title} ({playlist_video_count} videos)")

await asyncio.sleep(1)

# Alternative usage
client = InnerTube("WEB", "2.20230920.00.00")
data = await client.search("arctic monkeys", params=PARAMS_TYPE_PLAYLIST)

items = data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"][
"sectionListRenderer"
]["contents"][0]["itemSectionRenderer"]["contents"]

for item in items:
playlist = item["playlistRenderer"]

playlist_id = playlist["playlistId"]
playlist_title = playlist["title"]["simpleText"]
playlist_video_count = playlist["videoCount"]

print(f"[{playlist_id}] {playlist_title} ({playlist_video_count} videos)")

# use this method for python versions 3.8>= <=3.10
# https://github.com/encode/httpx/issues/914
await client.close()


asyncio.run(main())
1 change: 1 addition & 0 deletions innertube/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .clients import InnerTube
71 changes: 71 additions & 0 deletions innertube/asyncio/adaptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Optional

from httpx import AsyncClient, Request, Response

from innertube import api
from innertube.config import config
from innertube.errors import RequestError, ResponseError
from innertube.models import ClientContext


class AsyncInnerTubeAdaptor:
context: ClientContext
session: AsyncClient

def __init__(
self, context: ClientContext, session: Optional[AsyncClient] = None
) -> None:
self.context = context
self.session = session or AsyncClient(base_url=config.base_url)

def __repr__(self) -> str:
return f"{type(self).__name__}(context={self.context!r})"

def _build_request(
self, endpoint: str, params: Optional[dict] = None, body: Optional[dict] = None
) -> Request:
return self.session.build_request(
"POST",
endpoint,
params=self.context.params().update(params or {}),
json=api.contextualise(self.context, body or {}),
headers=self.context.headers(),
)

async def _request(
self, endpoint: str, params: Optional[dict] = None, body: Optional[dict] = None
) -> Response:
r = self._build_request(endpoint, params=params, body=body)
return await self.session.send(r)

async def dispatch(
self, endpoint: str, params: Optional[dict] = None, body: Optional[dict] = None
) -> dict:
response: Response = await self._request(endpoint, params=params, body=body)

content_type: Optional[str] = response.headers.get("Content-Type")

if content_type is not None:
if not content_type.lower().startswith("application/json"):
raise ResponseError(f"Expected JSON response, got {content_type!r}")

response_data: dict = response.json()

visitor_data: Optional[str] = response_data.get("responseContext", {}).get(
"visitorData"
)

if visitor_data is not None:
self.session.headers["X-Goog-Visitor-Id"] = visitor_data

error: Optional[dict] = response_data.get("error")

if error is not None:
raise RequestError(api.error(error))

return response_data

# needed for python versions 3.8>= <=3.10 issue from httpx
# https://github.com/encode/httpx/issues/914
async def close(self) -> None:
await self.session.aclose()
208 changes: 208 additions & 0 deletions innertube/asyncio/clients.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import dataclasses
from typing import List, Optional, Coroutine

import httpx
import mediate
from httpx._types import ProxiesTypes

from innertube import api, utils
from .adaptor import AsyncInnerTubeAdaptor
from innertube.config import config
from innertube.enums import Endpoint
from innertube.models import ClientContext, Locale
from innertube.protocols import AsyncAdaptor


@dataclasses.dataclass
class AsyncClient:
adaptor: AsyncAdaptor

middleware: mediate.Middleware = dataclasses.field(
default_factory=mediate.Middleware, repr=False, init=False
)

def __call__(
self,
endpoint: str,
params: Optional[dict] = None,
body: Optional[dict] = None,
) -> Coroutine:
@self.middleware.bind
async def process(data: Coroutine, /) -> Coroutine:
_data = await data
_data.pop("responseContext")
return _data

response: Coroutine = process(
self.adaptor.dispatch(endpoint, params=params, body=body)
)

return response


@dataclasses.dataclass(init=False)
class InnerTube(AsyncClient):
def __init__(
self,
client_name: str,
client_version: Optional[str] = None,
*,
api_key: Optional[str] = None,
user_agent: Optional[str] = None,
referer: Optional[str] = None,
locale: Optional[Locale] = None,
auto: bool = True,
proxies: Optional[ProxiesTypes] = None,
) -> None:
if client_name is None:
raise ValueError("Precondition failed: Missing client name")

kwargs: dict = utils.filter(
dict(
client_name=client_name,
client_version=client_version,
api_key=api_key,
user_agent=user_agent,
referer=referer,
locale=locale,
)
)

context: ClientContext

auto_context: Optional[ClientContext]
if auto and (auto_context := api.get_context(client_name)):
context = dataclasses.replace(auto_context, **kwargs)
else:
if client_version is None:
raise ValueError("Precondition failed: Missing client version")

context = ClientContext(**kwargs)

super().__init__(
adaptor=AsyncInnerTubeAdaptor(
context=context,
session=httpx.AsyncClient(base_url=config.base_url, proxies=proxies),
),
)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

async def config(self) -> dict:
return await self(Endpoint.CONFIG)

async def guide(self) -> dict:
return await self(Endpoint.GUIDE)

async def player(self, video_id: str) -> dict:
return await self(
Endpoint.PLAYER,
body=dict(
videoId=video_id,
),
)

async def browse(
self,
browse_id: Optional[str] = None,
*,
params: Optional[str] = None,
continuation: Optional[str] = None,
) -> dict:
return await self(
Endpoint.BROWSE,
body=utils.filter(
dict(
browseId=browse_id,
params=params,
continuation=continuation,
)
),
)

async def search(
self,
query: Optional[str] = None,
*,
params: Optional[str] = None,
continuation: Optional[str] = None,
) -> dict:
return await self(
Endpoint.SEARCH,
body=utils.filter(
dict(
query=query or "",
params=params,
continuation=continuation,
)
),
)

async def next(
self,
video_id: Optional[str] = None,
playlist_id: Optional[str] = None,
*,
params: Optional[str] = None,
index: Optional[int] = None,
continuation: Optional[str] = None,
) -> dict:
return await self(
Endpoint.NEXT,
body=utils.filter(
dict(
params=params,
playlistId=playlist_id,
videoId=video_id,
playlistIndex=index,
continuation=continuation,
)
),
)

async def get_transcript(
self,
params: str,
) -> dict:
return await self(
Endpoint.GET_TRANSCRIPT,
body=utils.filter(
dict(
params=params,
)
),
)

async def music_get_search_suggestions(
self,
input: Optional[None] = None,
) -> dict:
return await self(
Endpoint.MUSIC_GET_SEARCH_SUGGESTIONS,
body=dict(
input=input or "",
),
)

async def music_get_queue(
self,
*,
video_ids: Optional[List[str]] = None,
playlist_id: Optional[str] = None,
) -> dict:
return await self(
Endpoint.MUSIC_GET_QUEUE,
body=utils.filter(
dict(
playlistId=playlist_id,
videoIds=video_ids or (None,),
)
),
)

async def close(self) -> None:
await self.adaptor.close()
24 changes: 6 additions & 18 deletions innertube/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,18 @@
REFERER_YOUTUBE_ANALYTICS: str = "https://analytics.youtube.com/"
REFERER_GOOGLE_ASSISTANT: str = "https://assistant.google.com/"

USER_AGENT_WEB: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"
)
USER_AGENT_ANDROID: str = (
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Mobile Safari/537.36"
)
USER_AGENT_IOS: str = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) FxiOS/98.2 Mobile/15E148 Safari/605.1.15"
)
USER_AGENT_WEB: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36"
USER_AGENT_ANDROID: str = "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Mobile Safari/537.36"
USER_AGENT_IOS: str = "Mozilla/5.0 (iPhone; CPU iPhone OS 15_4_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) FxiOS/98.2 Mobile/15E148 Safari/605.1.15"
USER_AGENT_TV_HTML5: str = (
"Mozilla/5.0 (PlayStation 4 5.55) AppleWebKit/601.2 (KHTML, like Gecko)"
)
USER_AGENT_TV_APPLE: str = (
"AppleCoreMedia/1.0.0.12B466 (Apple TV; U; CPU OS 8_1_3 like Mac OS X; en_us)"
)
USER_AGENT_TV_ANDROID: str = (
"Mozilla/5.0 (Linux; Android 5.1.1; AFTT Build/LVY48F; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/49.0.2623.10"
)
USER_AGENT_XBOX_ONE: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; Xbox; Xbox One) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10553"
)
USER_AGENT_GOOGLE_ASSISTANT: str = (
"Mozilla/5.0 (Linux; Android 11; Pixel 2; DuplexWeb-Google/1.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Mobile Safari/537.36"
)
USER_AGENT_TV_ANDROID: str = "Mozilla/5.0 (Linux; Android 5.1.1; AFTT Build/LVY48F; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/49.0.2623.10"
USER_AGENT_XBOX_ONE: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Xbox; Xbox One) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10553"
USER_AGENT_GOOGLE_ASSISTANT: str = "Mozilla/5.0 (Linux; Android 11; Pixel 2; DuplexWeb-Google/1.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.193 Mobile Safari/537.36"

config: Config = Config(
base_url="https://youtubei.googleapis.com/youtubei/v1/",
Expand Down
15 changes: 15 additions & 0 deletions innertube/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,18 @@ def dispatch(
body: Optional[dict] = None
) -> dict:
raise NotImplementedError


@runtime_checkable
class AsyncAdaptor(Protocol):
async def dispatch(
self,
endpoint: str,
*,
params: Optional[dict] = None,
body: Optional[dict] = None
) -> dict:
raise NotImplementedError

async def close(self) -> None:
raise NotImplementedError
Loading