From 1cbfbd0159bfa8c6bb6e60004dbbb0bc8da17ea0 Mon Sep 17 00:00:00 2001 From: Ukenn Date: Tue, 19 Dec 2023 10:13:05 +0900 Subject: [PATCH] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=20Bangumi=20API=20?= =?UTF-8?q?=E7=B1=BB=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/api/bangumi.py | 364 +++++++++++++++++++++++-------------------- 1 file changed, 198 insertions(+), 166 deletions(-) diff --git a/utils/api/bangumi.py b/utils/api/bangumi.py index e343fcb..4370e5f 100644 --- a/utils/api/bangumi.py +++ b/utils/api/bangumi.py @@ -27,9 +27,9 @@ def __init__(self, app_id: str, app_secret: str, redirect_uri: str, nsfw_token: self.app_secret = app_secret self.redirect_uri = redirect_uri self.nsfw_token = nsfw_token # 携带登陆密钥才能获取 NSFW 数据 - self.headers = { - "User-Agent": "Ukenn/BangumiBot (https://github.com/Ukenn2112/BangumiTelegramBot)" - }, + self.headers = ( + {"User-Agent": "Ukenn/BangumiBot (https://github.com/Ukenn2112/BangumiTelegramBot)"}, + ) self.s = aiohttp.ClientSession( headers=self.headers[0], timeout=aiohttp.ClientTimeout(total=10), @@ -38,35 +38,39 @@ def __init__(self, app_id: str, app_secret: str, redirect_uri: str, nsfw_token: def web_authorization_captcha(self): """获取验证码图片 :return (图片 Base64, RequestsCookieJar)""" - set_cookie = requests.get("https://bgm.tv/login", headers=self.headers[0], timeout=10).cookies + set_cookie = requests.get( + "https://bgm.tv/login", headers=self.headers[0], timeout=10 + ).cookies now = datetime.datetime.now() with requests.get( - f"https://bgm.tv/signup/captcha?{int(now.timestamp() * 1000)}1", - headers=self.headers[0], - cookies=set_cookie, + f"https://bgm.tv/signup/captcha?{int(now.timestamp() * 1000)}1", + headers=self.headers[0], + cookies=set_cookie, ) as resp: return (base64.b64encode(resp.content).decode('utf-8'), set_cookie) - def web_authorization_login(self, cookies: str, email: str, password: str, captcha_challenge_field: str): + def web_authorization_login( + self, cookies: str, email: str, password: str, captcha_challenge_field: str + ): """Web 登录 :return (是否成功 bool, 错误信息/RequestsCookieJar)""" with requests.post( - "https://bgm.tv/FollowTheRabbit", - headers={ - **self.headers[0], - "Cookie": cookies, - "Content-Type": "application/x-www-form-urlencoded", - }, - data={ - "email": email, - "password": password, - "captcha_challenge_field": captcha_challenge_field, - "loginsubmit": "登录", - "referer": "https://bgm.tv/FollowTheRabbit", - "dreferer": "https://bgm.tv/FollowTheRabbit", - }, - allow_redirects=False, - timeout=10, + "https://bgm.tv/FollowTheRabbit", + headers={ + **self.headers[0], + "Cookie": cookies, + "Content-Type": "application/x-www-form-urlencoded", + }, + data={ + "email": email, + "password": password, + "captcha_challenge_field": captcha_challenge_field, + "loginsubmit": "登录", + "referer": "https://bgm.tv/FollowTheRabbit", + "dreferer": "https://bgm.tv/FollowTheRabbit", + }, + allow_redirects=False, + timeout=10, ) as resp: if resp.status_code == 200: resp.encoding = "utf-8" @@ -95,24 +99,29 @@ def web_authorization_oauth(self, cookies: str): ) html_data = HTML(get_data.text) formhash = html_data.xpath("//input[@name='formhash']/@value") - if not formhash: return None - return requests.post( - "https://bgm.tv/oauth/authorize", - headers={ - **self.headers[0], - "Cookie": cookies, - "Content-Type": "application/x-www-form-urlencoded", - }, - data={ - "client_id": self.app_id, - "formhash": formhash[0], - "redirect_uri": None, - "submit": "授权", - }, - params=params, - timeout=10, - allow_redirects=False - ).headers.get('Location').split("code=")[-1] + if not formhash: + return None + return ( + requests.post( + "https://bgm.tv/oauth/authorize", + headers={ + **self.headers[0], + "Cookie": cookies, + "Content-Type": "application/x-www-form-urlencoded", + }, + data={ + "client_id": self.app_id, + "formhash": formhash[0], + "redirect_uri": None, + "submit": "授权", + }, + params=params, + timeout=10, + allow_redirects=False, + ) + .headers.get('Location') + .split("code=")[-1] + ) # OAuth https://github.com/bangumi/api/blob/master/docs-raw/How-to-Auth.md def oauth_authorization_code(self, code: str) -> dict: @@ -126,30 +135,30 @@ def oauth_authorization_code(self, code: str) -> dict: "user_id": xxxxxx bgm用户uid }""" with requests.post( - "https://bgm.tv/oauth/access_token", - headers=self.headers[0], - data={ - "client_id": self.app_id, - "client_secret": self.app_secret, - "grant_type": "authorization_code", - "code": code, - "redirect_uri": self.redirect_uri - }, - timeout=10 + "https://bgm.tv/oauth/access_token", + headers=self.headers[0], + data={ + "client_id": self.app_id, + "client_secret": self.app_secret, + "grant_type": "authorization_code", + "code": code, + "redirect_uri": self.redirect_uri, + }, + timeout=10, ) as resp: return resp.json() async def oauth_refresh_token(self, refresh_token) -> dict: """刷新 access_token""" async with self.s.post( - "https://bgm.tv/oauth/access_token", - data={ - "client_id": self.app_id, - "client_secret": self.app_secret, - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "redirect_uri": self.redirect_uri - } + "https://bgm.tv/oauth/access_token", + data={ + "client_id": self.app_id, + "client_secret": self.app_secret, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "redirect_uri": self.redirect_uri, + }, ) as resp: return await resp.json() @@ -157,13 +166,12 @@ async def oauth_refresh_token(self, refresh_token) -> dict: async def get_me_info(self, access_token) -> dict: """ 获取当前 Access Token 对应的用户信息 - + Docs: https://bangumi.github.io/api/#/%E7%94%A8%E6%88%B7/getMyself - + access_token: Access Token""" async with self.s.get( - f"{self.api_url}/v0/me", - headers={"Authorization": f"Bearer {access_token}"} + f"{self.api_url}/v0/me", headers={"Authorization": f"Bearer {access_token}"} ) as resp: return await resp.json() @@ -176,7 +184,7 @@ async def get_user_info(self, bgm_id) -> dict: :param bgm_id: BGM ID 或 用户名 可反查""" async with self.s.get( - f"{self.api_url}/user/{bgm_id}", + f"{self.api_url}/user/{bgm_id}", ) as resp: return await resp.json() @@ -189,17 +197,18 @@ async def get_user_collections_status(self, username) -> Union[list, None]: :param username: 用户名 可 UID""" async with self.s.get( - f"{self.api_url}/user/{username}/collections/status", - params={ - "app_id": self.app_id, - } + f"{self.api_url}/user/{username}/collections/status", + params={ + "app_id": self.app_id, + }, ) as resp: if resp.status == 404: return None return await resp.json() - async def get_user_subject_collections(self, username, access_token=None, subject_type=2, collection_type=3, - limit=30, offset=0) -> Union[dict, None]: + async def get_user_subject_collections( + self, username, access_token=None, subject_type=2, collection_type=3, limit=30, offset=0 + ) -> Union[dict, None]: """ 获取用户的条目收藏 @@ -212,20 +221,22 @@ async def get_user_subject_collections(self, username, access_token=None, subjec :param imit: 返回条目数量, 默认 30, 最大 100 :param offset: 返回条目偏移, 默认 0""" async with self.s.get( - f"{self.api_url}/v0/users/{username}/collections", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, - params={ - "subject_type": subject_type, - "type": collection_type, - "limit": limit, - "offset": offset - } + f"{self.api_url}/v0/users/{username}/collections", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + params={ + "subject_type": subject_type, + "type": collection_type, + "limit": limit, + "offset": offset, + }, ) as resp: if resp.status == 404: return None return await resp.json() - async def get_user_subject_collection(self, username, subject_id, access_token=None) -> Union[dict, None]: + async def get_user_subject_collection( + self, username, subject_id, access_token=None + ) -> Union[dict, None]: """ 获取用户对应条目收藏 没有收藏则返回 None @@ -235,15 +246,23 @@ async def get_user_subject_collection(self, username, subject_id, access_token=N :param subject_id: 条目 ID :param access_token: Access Token (可选) 查看私有收藏则需要""" async with self.s.get( - f"{self.api_url}/v0/users/{username}/collections/{subject_id}", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + f"{self.api_url}/v0/users/{username}/collections/{subject_id}", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, ) as resp: if resp.status == 404: return None return await resp.json() - async def post_user_subject_collection(self, access_token, subject_id, collection_type: int = 3, rate: int = None, - comment: str = None, private: bool = None, tags: str = None) -> None: + async def post_user_subject_collection( + self, + access_token, + subject_id, + collection_type: int = 3, + rate: int = None, + comment: str = None, + private: bool = None, + tags: str = None, + ) -> None: """ 添加用户条目收藏 (旧 API) @@ -271,12 +290,21 @@ async def post_user_subject_collection(self, access_token, subject_id, collectio return await self.s.post( f"{self.api_url}/collection/{subject_id}/update", headers={"Authorization": f"Bearer {access_token}"}, - data=send_data + data=send_data, ) - async def patch_user_subject_collection(self, access_token, subject_id, collection_type: int = None, - rate: int = None, ep_status: int = None, vol_status: int = None, - comment: str = None, private: bool = None, tags: list[str] = None) -> None: + async def patch_user_subject_collection( + self, + access_token, + subject_id, + collection_type: int = None, + rate: int = None, + ep_status: int = None, + vol_status: int = None, + comment: str = None, + private: bool = None, + tags: list[str] = None, + ) -> None: """ 修改用户条目收藏 @@ -309,11 +337,12 @@ async def patch_user_subject_collection(self, access_token, subject_id, collecti return await self.s.patch( f"{self.api_url}/v0/users/-/collections/{subject_id}", headers={"Authorization": f"Bearer {access_token}"}, - json=send_data + json=send_data, ) - async def get_user_episode_collections(self, access_token, subject_id, offset=0, limit=100, episode_type=0) -> \ - Union[dict, None]: + async def get_user_episode_collections( + self, access_token, subject_id, offset=0, limit=100, episode_type=0 + ) -> Union[dict, None]: """ 获取用户条目章节收藏 @@ -325,13 +354,9 @@ async def get_user_episode_collections(self, access_token, subject_id, offset=0, :param limit: 返回条目数量, 默认 100, 最大 100 :param episode_type: 集数类型, 0: 本篇 (默认), 1: 特别篇, 2: OP, 3: ED, 4, 预告/宣传/广告, 5: MAD, 6: 其他""" async with self.s.get( - f"{self.api_url}/v0/users/-/collections/{subject_id}/episodes", - headers={"Authorization": f"Bearer {access_token}"}, - params={ - "offset": offset, - "limit": limit, - "episode_type": episode_type - } + f"{self.api_url}/v0/users/-/collections/{subject_id}/episodes", + headers={"Authorization": f"Bearer {access_token}"}, + params={"offset": offset, "limit": limit, "episode_type": episode_type}, ) as resp: if resp.status == 404: return None @@ -346,15 +371,16 @@ async def get_user_episode_collection(self, access_token, episode_id) -> Union[d :param access_token: Access Token :param episode_id: 章节 ID""" async with self.s.get( - f"{self.api_url}/v0/users/-/collections/-/episodes/{episode_id}", - headers={"Authorization": f"Bearer {access_token}"}, + f"{self.api_url}/v0/users/-/collections/-/episodes/{episode_id}", + headers={"Authorization": f"Bearer {access_token}"}, ) as resp: if resp.status == 404: return None return await resp.json() - async def patch_uesr_episode_collection(self, access_token, subject_id, episodes_id: list[int], - status: int = 2) -> None: + async def patch_uesr_episode_collection( + self, access_token, subject_id, episodes_id: list[int], status: int = 2 + ) -> None: """ 修改用户条目章节收藏 @@ -367,10 +393,7 @@ async def patch_uesr_episode_collection(self, access_token, subject_id, episodes return await self.s.patch( f"{self.api_url}/v0/users/-/collections/{subject_id}/episodes", headers={"Authorization": f"Bearer {access_token}"}, - json={ - "episode_id": episodes_id, - "type": status - } + json={"episode_id": episodes_id, "type": status}, ) async def put_user_episode_collection(self, access_token, episode_id, status=2) -> None: @@ -385,9 +408,7 @@ async def put_user_episode_collection(self, access_token, episode_id, status=2) return await self.s.put( f"{self.api_url}/v0/users/-/collections/-/episodes/{episode_id}", headers={"Authorization": f"Bearer {access_token}"}, - json={ - "type": status - } + json={"type": status}, ) def post_episode_reply(self, cookie: str, episode_id: int, reply_text: str) -> None: @@ -403,7 +424,7 @@ def post_episode_reply(self, cookie: str, episode_id: int, reply_text: str) -> N **self.headers[0], "Cookie": cookie, }, - timeout=10 + timeout=10, ) html_data = HTML(get_data.text) formhash = html_data.xpath('//input[@name="formhash"]/@value')[0] @@ -423,7 +444,7 @@ def post_episode_reply(self, cookie: str, episode_id: int, reply_text: str) -> N "lastview": lastview, "submit": "submit", }, - timeout=10 + timeout=10, ) # 条目 @@ -433,9 +454,7 @@ async def get_calendar(self) -> list: 每日放送 Docs: https://bangumi.github.io/api/#/%E6%9D%A1%E7%9B%AE/getCalendar""" - async with self.s.get( - f"{self.api_url}/calendar" - ) as resp: + async with self.s.get(f"{self.api_url}/calendar") as resp: week_data = await resp.json() onair_data = get_onair_data() if onair_data is None: @@ -457,26 +476,34 @@ async def get_calendar(self) -> list: check = True if ontime := odata["timeCN"]: subject["_air_time"] = ontime - subject["air_weekday"] = odata["weekDayCN"] if odata["weekDayCN"] != 0 else 7 + subject["air_weekday"] = ( + odata["weekDayCN"] if odata["weekDayCN"] != 0 else 7 + ) output[subject["air_weekday"] - 1]["items"].append(subject) elif sites := odata.get("sites"): if begin := sites[0].get("begin"): begin = datetime.datetime.fromisoformat( - begin.replace("Z", "+00:00")) + datetime.timedelta(hours=8) + begin.replace("Z", "+00:00") + ) + datetime.timedelta(hours=8) subject["_air_time"] = begin.strftime("%H%M") weekday = int(begin.strftime("%w")) subject["air_weekday"] = weekday if weekday != 0 else 7 output[subject["air_weekday"] - 1]["items"].append(subject) elif ontime := odata["timeJP"]: subject["_air_time"] = ontime - subject["air_weekday"] = odata["weekDayJP"] if odata["weekDayJP"] != 0 else 7 + subject["air_weekday"] = ( + odata["weekDayJP"] if odata["weekDayJP"] != 0 else 7 + ) output[subject["air_weekday"] - 1]["items"].append(subject) elif ontime := odata["timeJP"]: subject["_air_time"] = ontime - subject["air_weekday"] = odata["weekDayJP"] if odata["weekDayJP"] != 0 else 7 + subject["air_weekday"] = ( + odata["weekDayJP"] if odata["weekDayJP"] != 0 else 7 + ) output[subject["air_weekday"] - 1]["items"].append(subject) break - if not check: output[subject["air_weekday"] - 1]["items"].append(subject) + if not check: + output[subject["air_weekday"] - 1]["items"].append(subject) return output @cache_data @@ -506,13 +533,12 @@ async def get_subject_core(self, subject_id, access_token) -> dict: retry = False access_token = None async with self.s.get( - f"{self.api_url}/v0/subjects/{subject_id}", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + f"{self.api_url}/v0/subjects/{subject_id}", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, ) as resp: if resp.status != 200: if access_token != self.nsfw_token and access_token is not None: # 失败 且为用户Token - logging.warning( - f"获取条目信息失败, 可能用户密钥过期或不支持, 去除密钥重新请求\n\n{await resp.json()}") + logging.warning(f"获取条目信息失败, 可能用户密钥过期或不支持, 去除密钥重新请求\n\n{await resp.json()}") return await self.get_subject_core(subject_id, None) elif retry: # 失败 且为长期密钥 logging.warning(f"获取条目信息失败, 可能长期密钥已过期, 使用空token重试\n\n{await resp.json()}") @@ -534,8 +560,8 @@ async def get_subject_persons(self, subject_id, access_token: str = None) -> lis if not access_token: access_token = self.nsfw_token async with self.s.get( - f"{self.api_url}/v0/subjects/{subject_id}/persons", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + f"{self.api_url}/v0/subjects/{subject_id}/persons", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, ) as resp: return await resp.json() @@ -551,8 +577,8 @@ async def get_subject_characters(self, subject_id, access_token: str = None) -> if not access_token: access_token = self.nsfw_token async with self.s.get( - f"{self.api_url}/v0/subjects/{subject_id}/characters", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + f"{self.api_url}/v0/subjects/{subject_id}/characters", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, ) as resp: return await resp.json() @@ -568,14 +594,16 @@ async def get_subject_related(self, subject_id, access_token: str = None) -> lis if not access_token: access_token = self.nsfw_token async with self.s.get( - f"{self.api_url}/v0/subjects/{subject_id}/subjects", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + f"{self.api_url}/v0/subjects/{subject_id}/subjects", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, ) as resp: return await resp.json() # 章节 @cache_data - async def get_episodes(self, subject_id, episode_type=0, limit=100, offset=100, access_token: str = None) -> dict: + async def get_episodes( + self, subject_id, episode_type=0, limit=100, offset=100, access_token: str = None + ) -> dict: """ 获取条目章节信息 @@ -589,14 +617,14 @@ async def get_episodes(self, subject_id, episode_type=0, limit=100, offset=100, if not access_token: access_token = self.nsfw_token async with self.s.get( - f"{self.api_url}/v0/episodes", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, - params={ - "subject_id": subject_id, - "type": episode_type, - "limit": limit, - "offset": offset, - } + f"{self.api_url}/v0/episodes", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + params={ + "subject_id": subject_id, + "type": episode_type, + "limit": limit, + "offset": offset, + }, ) as resp: return await resp.json() @@ -612,8 +640,8 @@ async def get_episode(self, episode_id, access_token: str = None) -> dict: if not access_token: access_token = self.nsfw_token async with self.s.get( - f"{self.api_url}/v0/episodes/{episode_id}", - headers={"Authorization": f"Bearer {access_token}"} if access_token else None, + f"{self.api_url}/v0/episodes/{episode_id}", + headers={"Authorization": f"Bearer {access_token}"} if access_token else None, ) as resp: return await resp.json() @@ -627,7 +655,7 @@ async def get_person(self, person_id) -> dict: :param person_id: 人物 ID""" async with self.s.get( - f"{self.api_url}/v0/persons/{person_id}", + f"{self.api_url}/v0/persons/{person_id}", ) as resp: return await resp.json() @@ -640,7 +668,7 @@ async def get_person_subjects(self, person_id) -> list: :param person_id: 人物 ID""" async with self.s.get( - f"{self.api_url}/v0/persons/{person_id}/subjects", + f"{self.api_url}/v0/persons/{person_id}/subjects", ) as resp: return await resp.json() @@ -653,7 +681,7 @@ async def get_person_characters(self, person_id) -> list: :param person_id: 人物 ID""" async with self.s.get( - f"{self.api_url}/v0/persons/{person_id}/characters", + f"{self.api_url}/v0/persons/{person_id}/characters", ) as resp: return await resp.json() @@ -667,7 +695,7 @@ async def get_character(self, character_id) -> dict: :param character_id: 角色 ID""" async with self.s.get( - f"{self.api_url}/v0/characters/{character_id}", + f"{self.api_url}/v0/characters/{character_id}", ) as resp: return await resp.json() @@ -680,7 +708,7 @@ async def get_character_subjects(self, character_id) -> list: :param character_id: 角色 ID""" async with self.s.get( - f"{self.api_url}/v0/characters/{character_id}/subjects", + f"{self.api_url}/v0/characters/{character_id}/subjects", ) as resp: return await resp.json() @@ -693,15 +721,20 @@ async def get_character_persons(self, character_id) -> list: :param character_id: 角色 ID""" async with self.s.get( - f"{self.api_url}/v0/characters/{character_id}/persons", + f"{self.api_url}/v0/characters/{character_id}/persons", ) as resp: return await resp.json() # 搜索 @cache_data - async def search_subjects(self, keywords, subject_type: int = None, - response_group: Literal["small", "medium", "large"] = "small", start=0, - max_results=10) -> list: + async def search_subjects( + self, + keywords, + subject_type: int = None, + response_group: Literal["small", "medium", "large"] = "small", + start=0, + max_results=10, + ) -> list: """ 条目搜索 (Old API) 由于新 API 暂时为试验性可能变动较大, 故暂时使用旧 API @@ -719,10 +752,7 @@ async def search_subjects(self, keywords, subject_type: int = None, } if subject_type: params["type"] = subject_type - async with self.s.get( - f"{self.api_url}/search/subject/{keywords}", - params=params - ) as resp: + async with self.s.get(f"{self.api_url}/search/subject/{keywords}", params=params) as resp: try: return await resp.json() except: @@ -737,11 +767,11 @@ async def search_mono(self, keywords, page=1, cat: Literal["all", "crt", "prsn"] :param page: 页码, 默认 1 :param cat: 搜索类型, all: 全部, crt: 角色, prsn: 人物, 默认 all""" async with self.s.get( - f"http://bgm.tv/mono_search/{keywords}", - params={ - "cat": cat, - "page": page, - } + f"http://bgm.tv/mono_search/{keywords}", + params={ + "cat": cat, + "page": page, + }, ) as resp: html_data = HTML(await resp.text()) error = html_data.xpath("//*[@id='colunmNotice']/div/p[1]") @@ -773,12 +803,14 @@ async def search_mono(self, keywords, page=1, cat: Literal["all", "crt", "prsn"] elif mono_data.startswith("/person/"): mono_id = int(mono_data[8:]) mono_type = "person" # 人物 - list_data.append({ - "id": mono_id, - "type": mono_type, - "name": name, - "name_cn": name_cn, - "img_url": img_url, - "info": info, - }) + list_data.append( + { + "id": mono_id, + "type": mono_type, + "name": name, + "name_cn": name_cn, + "img_url": img_url, + "info": info, + } + ) return {"error": None, "list": list_data}