-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
304 additions
and
287 deletions.
There are no files selected for viewing
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import requests | ||
import base64 | ||
import json | ||
import logging | ||
|
||
class GithubClient: | ||
def __init__(self, token, github_owner, github_repo): | ||
self.access_token = token | ||
self.github_owner = github_owner | ||
self.github_repo = github_repo | ||
|
||
def get_file(self, filename) -> str: | ||
# URL for the GitHub API endpoint | ||
url = f'https://api.github.com/repos/{self.github_owner}/{self.github_repo}/contents/{filename}' | ||
|
||
# Get the file content using the GitHub API | ||
response = requests.get(url, headers={'Authorization': f'token {self.access_token}'}) | ||
|
||
# Check the response | ||
if response.status_code == 200: | ||
content_base64 = response.json()['content'] | ||
content_bytes = base64.b64decode(content_base64) | ||
|
||
# return text | ||
return content_bytes.decode('utf-8') | ||
else: | ||
logging.error('Error getting github file:', response.json()) | ||
return None | ||
|
||
|
||
def write_file(self, filename, content, commit_msg): | ||
# Convert the JSON content to a string and encode it in base64 | ||
new_content_bytes = content.encode('utf-8') | ||
new_content_base64 = base64.b64encode(new_content_bytes).decode('utf-8') | ||
|
||
# URL for the GitHub API endpoint | ||
url = f'https://api.github.com/repos/{self.github_owner}/{self.github_repo}/contents/{filename}' | ||
|
||
# Get the current SHA of the file (required for updating the file) | ||
response = requests.get(url, headers={'Authorization': f'token {self.access_token}'}) | ||
current_sha = response.json()['sha'] | ||
|
||
# Prepare the data for the update request | ||
data = { | ||
'message': commit_msg, | ||
'content': new_content_base64, | ||
'sha': current_sha # Include the current SHA to update the file | ||
} | ||
|
||
# Update the file using the GitHub API | ||
response = requests.put(url, headers={'Authorization': f'token {self.access_token}'}, json=data) | ||
|
||
# Check the response | ||
if response.status_code == 200: | ||
logging.info('Github file updated successfully') | ||
else: | ||
logging.error('Error updating github file:', response.text) | ||
|
||
# Not used but keeping to remember that you may need to handle creating a file that does not exist yet | ||
def create_file(self, filename, new): | ||
# Convert the JSON content to a string and encode it in base64 | ||
new_content_str = json.dumps(new, indent=4) | ||
new_content_bytes = new_content_str.encode('utf-8') | ||
new_content_base64 = base64.b64encode(new_content_bytes).decode('utf-8') | ||
|
||
# URL for the GitHub API endpoint | ||
url = f'https://api.github.com/repos/{self.github_owner}/{self.github_repo}/contents/{filename}' | ||
|
||
# Prepare the data for the create request | ||
data = { | ||
'message': 'Create JSON file', | ||
'content': new_content_base64 | ||
} | ||
|
||
# Create the file using the GitHub API | ||
response = requests.put(url, headers={'Authorization': f'token {self.access_token}'}, json=data) | ||
|
||
# Check the response | ||
if response.status_code == 201: | ||
logging.info('Github file created successfully') | ||
else: | ||
logging.error('Error creating github file:', response.json()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import openai | ||
import logging | ||
|
||
class GptClient: | ||
def __init__(self, openai_api_key): | ||
openai.api_key = openai_api_key | ||
|
||
def chat_completion(self, messages): | ||
response = openai.ChatCompletion.create( | ||
messages=messages, | ||
model="gpt-4o-mini" | ||
) | ||
|
||
return response.choices[0].message.content | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
import json | ||
import logging | ||
from config import Config | ||
from gpt import GptClient | ||
from spotify import SpotifyClient | ||
from github import GithubClient | ||
|
||
class Podcast198LandService: | ||
def __init__(self): | ||
config = Config() | ||
GITHUB_OWNER = 'henrikskog' | ||
GITHUB_REPO = '198-land-kart' | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
|
||
self.BY_COUNTRY_PATH = "episodes_by_country.json" | ||
self.file_path = 'episodes_by_country.json' | ||
self.RAW_EPISODES_PATH = "raw_episodes.json" | ||
self.SPOTIFY_SHOW_ID = '7gVC1AP7O35An9TK6l2XpJ' | ||
self.github_client = GithubClient(config.github_api_key, GITHUB_OWNER, GITHUB_REPO) | ||
self.gpt_client = GptClient(config.openai_api_key) | ||
self.spotify_client = SpotifyClient(config.spotify_client_id, config.spotify_client_secret) | ||
|
||
@staticmethod | ||
def GPT_PROMPT(episode_name, episode_description): | ||
return f""" | ||
You will be given an episode of an episode of a norwegian geography podcast. | ||
If the episode is not about a spesific country, please answer "no". If it is, return the name of the country in english followed by its country code according to the A3 spesification. | ||
EXAMPLE 1: | ||
Episode title: | ||
"Ekstramateriale: LIVE fra Akershus festning", | ||
Episode description: | ||
"I august gikk liveshowet 198 Land: Norge spesial av stabelen p\u00e5 Akershus festning i Oslo. Her kan du enten oppleve eller gjenoppleve noen h\u00f8ydepunkter fra kvelden. Produsert av Martin Oftedal, PLAN-B Hosted on Acast. See acast.com/privacy for more information.", | ||
Your answer: | ||
no | ||
EXAMPLE 2: | ||
Episode title: | ||
Chile del 2 med Benedicte Bull | ||
Episode description: | ||
Denne uken blir vi mer kjent med Chile og \u00e5ssen det er der, a? Vi blir kjent med gjennomsnittschileneren og deres rike matkultur, sportshistorikk og litteraturvirksomhet. Og tror du jaggumeg ikke at vi rekker \u00e5 pl\u00f8ye gjennom noen j\u00f8ss og lyttersp\u00f8rsm\u00e5l? Einar fyller den allerede tettpakkede episoden med sine mer eller mindre kvalitetssikrede fakta i tospann med professor, samfunnsviter og tidligere Chilebeboer, Benedicte Bull.Produsert av Martin Oftedal, PLAN-B Hosted on Acast. See acast.com/privacy for more information. | ||
Your answer: | ||
Chile, CHL | ||
Episode title: | ||
{episode_name} | ||
Episode description: | ||
{episode_description} | ||
""".strip() | ||
|
||
def get_198_land_episodes(self): | ||
return self.spotify_client.get_episodes(self.SPOTIFY_SHOW_ID) | ||
|
||
def extract_country(self, episode_name: str, episode_description: str): | ||
messages = [{"role": "user", "content": self.GPT_PROMPT(episode_name, episode_description)}] | ||
|
||
gpt_response = self.gpt_client.chat_completion(messages) | ||
|
||
if gpt_response == "no": | ||
return None, None | ||
|
||
try: | ||
country, cc = gpt_response.split(", ") | ||
return country, cc | ||
except: | ||
logging.error(f"Got unexpected answer from gpt: {gpt_response} given the prompt: {self.GPT_PROMPT(episode_name, episode_description)}") | ||
return None, None | ||
|
||
def get_raw_episodes_file(self): | ||
github_str = self.github_client.get_file(self.RAW_EPISODES_PATH) | ||
return json.loads(github_str) | ||
|
||
def get_episodes_file_by_country(self): | ||
github_str = self.github_client.get_file(self.BY_COUNTRY_PATH) | ||
return json.loads(github_str) | ||
|
||
def raw_episodes_to_by_country(self, new_episodes: list) -> dict: | ||
by_country = self.get_episodes_file_by_country() | ||
|
||
for episode in new_episodes: | ||
country, cc = self.extract_country(episode["name"], episode["description"]) | ||
|
||
if country == None or cc == None: | ||
logging.info(f"Could not extract country from episode {episode['name']}") | ||
continue | ||
|
||
new = { | ||
"country": country, | ||
"ep": episode | ||
} | ||
|
||
logging.info(f"Episode {episode['name']} got classified as {country} ({cc})") | ||
|
||
if cc in by_country: | ||
for e in by_country[cc]: | ||
if e["ep"]["name"] == new["ep"]["name"]: | ||
logging.warn(f"Episode {episode['name']} already exists in list. Exiting.") | ||
return None | ||
|
||
by_country[cc].append(new) | ||
else: | ||
by_country[cc] = [new] | ||
|
||
return by_country | ||
|
||
def process_new_episodes(self, all_episodes: list): | ||
logging.info("Checking for new episodes...") | ||
|
||
# ordered by date, newest first | ||
stored_episodes = self.get_raw_episodes_file() | ||
|
||
if len(all_episodes) == len(stored_episodes): | ||
logging.info("No new episodes found.") | ||
return | ||
|
||
new_episodes = all_episodes[0: len(all_episodes) - len(stored_episodes)] | ||
|
||
logging.info(f"Found {len(new_episodes)} new episodes.\n" + "\n".join([f"- {e['name']}" for e in new_episodes])) | ||
|
||
by_country = self.raw_episodes_to_by_country(new_episodes) | ||
|
||
if by_country == None: # Meaning we found a duplicate | ||
logging.info("Duplicate found. Exiting and not writing to github.") | ||
return | ||
|
||
return by_country | ||
|
||
def update_github_workflow(self): | ||
all_episodes = self.get_198_land_episodes() | ||
episodes_by_country = self.process_new_episodes(all_episodes) | ||
self.github_client.write_file(self.RAW_EPISODES_PATH, json.dumps(all_episodes, indent=4), "Automatic update of json file with new podcast episode!") | ||
self.github_client.write_file(self.BY_COUNTRY_PATH, json.dumps(episodes_by_country, indent=4), "Automatic update of json file with new podcast episode!") | ||
|
||
if __name__ == "__main__": | ||
service = Podcast198LandService() | ||
service.update_github_workflow() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import requests | ||
|
||
AUTH_URL = 'https://accounts.spotify.com/api/token' | ||
|
||
class SpotifyClient: | ||
def __init__(self, client_id, client_secret): | ||
self.client_id = client_id | ||
self.client_secret = client_secret | ||
self.access_token = self.get_auth_token() | ||
|
||
def get_auth_token(self): | ||
auth_response = requests.post(AUTH_URL, { | ||
'grant_type': 'client_credentials', | ||
'client_id': self.client_id, | ||
'client_secret': self.client_secret, | ||
}) | ||
|
||
auth_response_data = auth_response.json() | ||
access_token = auth_response_data['access_token'] | ||
return access_token | ||
|
||
def get_episodes(self, show_id): | ||
headers = { | ||
'Authorization': 'Bearer {token}'.format(token=self.access_token) | ||
} | ||
|
||
BASE_URL = 'https://api.spotify.com/v1/' | ||
limit = 50 # Maximum allowed by Spotify API | ||
offset = 0 | ||
episodes = [] | ||
|
||
while True: | ||
response = requests.get( | ||
BASE_URL + f'shows/{show_id}/episodes', | ||
headers=headers, | ||
params={ | ||
'limit': limit, | ||
'offset': offset, | ||
'market': 'NO' | ||
} | ||
) | ||
|
||
response_data = response.json() | ||
episodes.extend(response_data['items']) | ||
|
||
# Check if there's a next page | ||
if response_data['next']: | ||
offset += limit | ||
else: | ||
break | ||
|
||
return episodes |
Oops, something went wrong.