diff --git a/InstaTweet/Twitter API Template.txt b/InstaTweet/Twitter API Template.txt deleted file mode 100644 index 2ed3f09..0000000 --- a/InstaTweet/Twitter API Template.txt +++ /dev/null @@ -1,7 +0,0 @@ -{ - "Consumer Key": null, - "Consumer Secret": null, - "Access Token": null, - "Token Secret": null -} - diff --git a/InstaTweet/__init__.py b/InstaTweet/__init__.py index f03be53..5548544 100644 --- a/InstaTweet/__init__.py +++ b/InstaTweet/__init__.py @@ -1 +1,11 @@ -from InstaTweet.core import InstaTweet +# Helpers +from . import utils +from .db import DBConnection +# API Interaction/Wrapper Classes +from .instapost import InstaPost +from .instauser import InstaUser +from .instaclient import InstaClient +from .tweetclient import TweetClient +# User Interface Classes +from .profile import Profile +from .instatweet import InstaTweet diff --git a/InstaTweet/core/__init__.py b/InstaTweet/core/__init__.py deleted file mode 100644 index b623626..0000000 --- a/InstaTweet/core/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .instapost import InstaPost -from .instauser import InstaUser -from .instaclient import InstaClient -from .tweetclient import TweetClient -from .instatweet import InstaTweet diff --git a/InstaTweet/core/instaclient.py b/InstaTweet/core/instaclient.py deleted file mode 100644 index d568d7b..0000000 --- a/InstaTweet/core/instaclient.py +++ /dev/null @@ -1,77 +0,0 @@ -import requests -from . import InstaPost, InstaUser -from InstaTweet.utils import get_filepath - - -class InstaClient(object): - """ - Instagram client to scrape and download posts - """ - - def __init__(self, profile: dict): - self.session_id = profile['session_id'] - self.user_map = profile['user_map'] - self.user_agent = profile['user_agent'] - - def request(self, url): - return requests.get(url, headers=self.headers, cookies=self.cookies) - - def check_posts(self, username, amount=12): - print('Checking posts for @' + username) - user = self.get_user(username) - scraped_posts = self.user_map[username]['scraped'] - - if scraped_posts: - # Return a list of {amount} InstaPost objects, sorted from oldest to newest (oldest is tweeted first). - posts = [post for post in user.posts if post.id not in scraped_posts][:amount] - return sorted(posts, key=lambda p: p.timestamp) - else: - # By default, newly added users have an empty scraped post list and are only initialized on the first run. - scraped_posts.extend([post.id for post in user.posts]) - print(f'Initialized User: @{username}') - return None # No posts to tweet - - def get_user(self, username): - response = self.request(f'https://www.instagram.com/{username}/?__a=1') - if not response.ok: - raise Exception(response.json()) - else: - return InstaUser(response.json()) - - def download_post(self, post: InstaPost, filepath=None): - response = self.request(post.media_url) - if not response.ok: - raise RuntimeError(f'Failed to download post {post.permalink} by {post.owner["username"]}') - - if filepath is None: - filetype = '.mp4' if post.is_video else '.jpg' - filepath = get_filepath(post.id, filetype=filetype) - with open(filepath, 'wb') as f: - f.write(response.content) - - post.file_path = filepath - print(f'Downloaded post {post.id} by {post.owner["username"]} from {post.permalink}') - - # Properties so that changes to sessionid/useragent will be reflected - @property - def cookies(self): - return { - 'sessionid': self.session_id - } - - # TODO See if any other cookies/headers should be included - @property - def headers(self): - return { - 'User-Agent': self.user_agent - } - - @property - def session_id(self): - return self._session_id - - @session_id.setter - def session_id(self, session_id: str): - if not isinstance(session_id, str): - raise ValueError('Session ID cookie must be of type str') - self._session_id = session_id diff --git a/InstaTweet/core/instapost.py b/InstaTweet/core/instapost.py deleted file mode 100644 index 97159c1..0000000 --- a/InstaTweet/core/instapost.py +++ /dev/null @@ -1,56 +0,0 @@ -from datetime import datetime - - -class InstaPost: - - def __init__(self, post_data): - """Convenience wrapper for Instagram media data""" - self.json = post_data - self.id = post_data['id'] # Something's wrong if this raises an error - self.is_video = self.json.get('is_video', False) - self.video_url = self.json.get('video_url', '') - self.dimensions = self.json.get('dimensions', {}) - - def __str__(self): - return f'Post {self.id} by @{self.owner["username"]} on {self.timestamp}' - - @property - def owner(self): - if owner := self.json.get('owner', self.json.get('user', {})): - return owner - return dict.fromkeys(['id', 'username']) - - @property - def is_carousel(self): - return self.json.get('media_type') == 8 - - @property - def shortcode(self): - return self.json.get('shortcode', self.json.get('code', '')) - - @property - def permalink(self): - return f'https://www.instagram.com/p/{self.shortcode}' - - @property - def thumbnail_url(self): - return self.json.get('display_url', - self.json.get('thumbnail_src', - self.json.get('thumbnail_resources', - [{}])[-1].get('src', ''))) - - @property - def timestamp(self): - if timestamp := self.json.get('taken_at_timestamp', self.json.get('taken_at', '')): - return datetime.utcfromtimestamp(timestamp) - return '' - - @property - def media_url(self): - return self.video_url if self.is_video else self.thumbnail_url - - @property - def caption(self): - if caption_node := self.json.get('edge_media_to_caption', {}).get('edges', [{}])[0]: - return caption_node.get('node', {}).get('text', '') - return '' diff --git a/InstaTweet/core/instatweet.py b/InstaTweet/core/instatweet.py deleted file mode 100644 index af43be0..0000000 --- a/InstaTweet/core/instatweet.py +++ /dev/null @@ -1,257 +0,0 @@ -import os -import json -import copy -import time -from tqdm import tqdm -from collections.abc import Iterable -from InstaTweet.utils import get_agent, get_root, get_filepath -from . import InstaClient, TweetClient - -DEFAULT_USER_MAPPING = {'hashtags': [], 'scraped': [], 'tweets': []} - - -class InstaTweet: - - def __init__(self, **kwargs): - self.profile_name = kwargs.get('profile', 'default') - - if not self.is_default: - self.load_profile(self.profile_name) - - else: - self.session_id = kwargs.get('session_id', '') - self.twitter_keys = kwargs.get('twitter_keys', None) - self.user_agent = kwargs.get('user_agent', get_agent()) - self.user_map = kwargs.get('user_map', {}) - print('Using default profile.') - - @classmethod - def load(cls, profile_name: str): - return cls(profile=profile_name) - - def start(self): - self.validate() - insta = InstaClient(self.config) - oauth = TweetClient.oauth(self.twitter_keys) - - for user, mapping in self.user_map.items(): - new_posts = insta.check_posts(user) - if not new_posts: - print(f'No new posts to tweet for @{user}') - continue - - print(f'There are {len(new_posts)} posts to tweet for @{user}') - for post in new_posts: - insta.download_post(post) - tweet = TweetClient(post, oauth, hashtags=mapping['hashtags']) - tweet.send() - - mapping['scraped'] += [post.id] - mapping['tweets'] += [post.tweet] - if self.profile_exists(): - self.save_profile(alert=False) - - print(f'Finished insta-tweeting for @{user}', sep='\n') - - print(f'All users have been insta-tweeted') - if self.profile_exists(): - self.save_profile() - - def loop(self, delay): - while True: - try: - self.start() - with tqdm(total=delay) as pbar: - pbar.set_description(f'Waiting {delay} seconds before rechecking') - for i in range(delay): - time.sleep(1) - pbar.update(1) - - except KeyboardInterrupt: - print('Quitting InstaTweet...') - break - - def add_users(self, users, scrape_only=True): - """ - Add users to scrape and auto-tweet. Can be provided as a single username, an iterable containing usernames, or a full user_map dictionary. - By default, new users will be scraped and any post after this point will be tweeted. - Use scrape_only=False to immediately scrape AND tweet the user's most recent posts (12 by default). - """ - user_map = self.user_map - - if isinstance(users, str): - user_map.setdefault(users, copy.deepcopy(DEFAULT_USER_MAPPING)) - if not scrape_only: - # Tweets are sent only when the user's scraped list is not empty - user_map[users]['scraped'].append('-1') - - elif isinstance(users, Iterable): - for user in users: - self.add_users(user, scrape_only=scrape_only) - - elif isinstance(users, dict): - for user in users: - u = users[user] - if u.keys() != DEFAULT_USER_MAPPING.keys(): - raise KeyError('Invalid user map keys for user ' + user) - if not all(isinstance(val, list) for val in u.values()): - raise TypeError('Invalid user map value types for user ' + user + - '\nProvided values: ' + u.values() + - '\n All values should be of type list') - user_map.update(users) - - else: - raise ValueError('Invalid type provided for parameter "users"') - - if self.profile_exists(): - self.save_profile(alert=False) - - def add_hashtags(self, user, hashtags): - if isinstance(hashtags, str): - self.user_map[user]['hashtags'].append(hashtags) - else: - for hashtag in hashtags: - if hashtag not in self.user_map[user]['hashtags']: - self.user_map[user]['hashtags'].append(hashtag) - - if self.profile_exists(): - self.save_profile(alert=False) - - def validate(self): - if not self.session_id: - raise AttributeError('Instagram sessionid cookie is required to scrape_only.') - - if missing_keys := [key for key in TweetClient.DEFAULT_KEYS if key not in self.twitter_keys]: - raise KeyError(f''' - Invalid Twitter API Keys Provided - Missing Keys: {missing_keys}''') - - if not all(self.twitter_keys.values()): - twitter_file = get_filepath('Twitter API Template') - if not os.path.exists(twitter_file): - raise ValueError(f''' - Values missing for Twitter API Keys. - Missing Values For: {[key for key, value in self.twitter_keys.items() if not value]} - - Default API Key file "{twitter_file}" is also missing. - ''') - - try: - # Setter will raise error if keys are invalid - self.twitter_keys = self.load_data(twitter_file) - except KeyError as e: - raise e - - if not self.user_map: - raise AttributeError('You must add at least one Instagram user to auto-tweet from') - - @property - def session_id(self): - return self._session_id - - @session_id.setter - def session_id(self, session_id: str): - """If an existing profile is currently active, it will be updated when setting a new session_id""" - if not isinstance(session_id, str): - raise ValueError('Session ID cookie must be of type str') - self._session_id = session_id - - if session_id: - if self.profile_exists(): - self.save_profile(alert=False) - - @property - def twitter_keys(self): - return self._twitter_keys - - @twitter_keys.setter - def twitter_keys(self, keys: dict): - default = TweetClient.DEFAULT_KEYS - - if isinstance(keys, dict): - default.update(keys) - if not all(default.values()): - raise KeyError('No value provided for the following Twitter API Keys:' + - f'{[key for key in default if not default[key]]}') - self._twitter_keys = keys - if self.profile_exists(): - self.save_profile(alert=False) - - elif keys is None: - # Default init value - self._twitter_keys = default - - else: - raise TypeError(f'\n\n' - f'Twitter API Keys should be passed as a dictionary.\n' - f'See {get_filepath("Twitter API Template")} for expected format\n' - f'Expected:\n' - f'{json.dumps(TweetClient.DEFAULT_KEYS, indent=4)}') - - def load_profile(self, profile_name: str): - if profile_path := self.profile_exists(profile_name): - profile = self.load_data(profile_path) - self.user_agent = profile['user_agent'] - self._session_id = profile['session_id'] - self._twitter_keys = profile['twitter_keys'] - self.user_map = profile['user_map'] - self.profile_name = profile['profile'] - print(f'Loaded profile "{self.profile_name}"') - else: - raise FileNotFoundError('No profile with that name was found') - - def save_profile(self, profile_name: str = None, alert: bool = True): - """Update currently loaded profile, or save a new one. Name only required for new profiles.""" - profiles_dir = os.path.join(get_root(), 'profiles') - if not os.path.exists(profiles_dir): - os.mkdir(profiles_dir) - if profile_name: - self.profile_name = profile_name - # Allows a loaded profile to be saved without specifying profile name - if not self.is_default: - self._save_data(self.config, os.path.join('profiles', self.profile_name)) - if alert: - print(f'Saved profile "{self.profile_name}"') - # If currently using default profile, must supply a profile name - else: - raise AttributeError('No profile currently loaded. Must provide a profile name') - - def profile_exists(self, profile_name=None): - """ - Checks if the settings file for a profile exists and returns the path if True. - Called by any method that changes the state of a profile to ensure these methods only update settings files and never create them. - """ - if profile_name is None: - profile_name = self.profile_name - profile_path = get_filepath(os.path.join('profiles', profile_name)) - - return profile_path if os.path.exists(profile_path) else False - - @property - def is_default(self): - """Check if default profile is being used. Used in initial save/load of profile""" - return self.profile_name == 'default' - - @property - def config(self): - return { - 'profile': self.profile_name, - 'session_id': self.session_id, - 'user_agent': self.user_agent, - 'twitter_keys': self.twitter_keys, - 'user_map': self.user_map - } - - @staticmethod - def load_data(filepath): - with open(filepath, 'r') as data_in: - return json.load(data_in) - - def _save_data(self, data, filename): - filepath = get_filepath(filename) - with open(filepath, 'w') as data_out: - json.dump(data, data_out, indent=4) - - def _config(self): - for k, v in self.config.items(): - print(f'{k} : {v}') diff --git a/InstaTweet/core/tweetclient.py b/InstaTweet/core/tweetclient.py deleted file mode 100644 index a0fed4e..0000000 --- a/InstaTweet/core/tweetclient.py +++ /dev/null @@ -1,185 +0,0 @@ -import os -import sys -import time -import random -import requests -import mimetypes -from PIL import Image -from requests_oauthlib import OAuth1 -from moviepy.video.fx.crop import crop -from moviepy.video.io.VideoFileClip import VideoFileClip - -MEDIA_ENDPOINT_URL = 'https://upload.twitter.com/1.1/media/upload.json' -POST_TWEET_URL = 'https://api.twitter.com/1.1/statuses/update.json' - - -class TweetClient(object): - - DEFAULT_KEYS = { - 'Consumer Key': None, - 'Consumer Secret': None, - 'Access Token': None, - 'Token Secret': None - } - - @staticmethod - def oauth(keys): - return OAuth1( - keys['Consumer Key'], - client_secret=keys['Consumer Secret'], - resource_owner_key=keys['Access Token'], - resource_owner_secret=keys['Token Secret'] - ) - - def __init__(self, post, auth, hashtags=None): - self.post = post - self.auth = auth - self.hashtags = hashtags - self.video_path = post.file_path - # For twitter media upload - self.media_id = None - self.processing_info = None - - @property - def total_bytes(self): - return os.path.getsize(self.video_path) - - def send(self): - if self.post.is_video: - print(f'Cropping video {self.post.id}') - self.crop_video() - - self._media_upload_init() - self._media_upload_append() - self._media_upload_finalize() - self.post.tweet = self._post_tweet() - print(f'Tweet sent for post {self.post.id}') - os.remove(self.video_path) - - def crop_video(self): - clip = VideoFileClip(self.video_path) - bbox = self._get_bbox(clip) - - if bbox: - new_path = self.video_path.replace('.mp4', '_cropped.mp4') - with crop(clip, x1=bbox[0], y1=bbox[1], x2=bbox[2], y2=bbox[3]) as cropped_clip: - cropped_clip.write_videofile(new_path, audio_codec='aac', logger=None) - - os.remove(self.video_path) # Delete uncropped video - self.video_path = new_path - - clip.close() - - def _get_bbox(self, clip): - frame_path = self.video_path.replace('.mp4', '_frame.png') - clip.save_frame(frame_path) - img = Image.open(frame_path) - bbox = img.getbbox() - img.close() - os.remove(frame_path) - return bbox - - def _media_upload_init(self): - request_data = { - 'command': 'INIT', - 'media_type': mimetypes.guess_type(self.video_path)[0], - 'total_bytes': self.total_bytes, - 'media_category': 'TWEET_VIDEO' if self.post.is_video else 'TWEET_IMAGE' - } - - r = requests.post(MEDIA_ENDPOINT_URL, data=request_data, auth=self.auth) - if r.ok: - self.media_id = r.json()['media_id'] - else: - print('Failed to initialize Twitter media upload.', r.status_code, r.reason, sep='\n') - sys.exit(0) - - def _media_upload_append(self): - segment_id = 0 - bytes_sent = 0 - file = open(self.video_path, 'rb') - - while bytes_sent < self.total_bytes: - chunk = file.read(4 * 1024 * 1024) - request_data = { - 'command': 'APPEND', - 'media_id': self.media_id, - 'segment_index': segment_id - } - files = {'media': chunk} - - r = requests.post(MEDIA_ENDPOINT_URL, data=request_data, files=files, auth=self.auth) - if r.status_code < 200 or r.status_code > 299: - print(r.status_code) - print(r.text) - sys.exit(0) - - segment_id += 1 - bytes_sent = file.tell() - - print(f"Twitter media upload for post {self.post.id} complete") - - def _media_upload_finalize(self): - request_data = { - 'command': 'FINALIZE', - 'media_id': self.media_id - } - - r = requests.post(MEDIA_ENDPOINT_URL, data=request_data, auth=self.auth) - if not r.ok: - print(r.json()) - sys.exit(0) - - self.processing_info = r.json().get('processing_info', None) - self._check_status() - - def _check_status(self): - if not self.processing_info: - return - - state = self.processing_info['state'] - if state == u'succeeded': - return - if state == u'failed': - print(self.processing_info) - sys.exit(0) - - wait = self.processing_info['check_after_secs'] - time.sleep(wait) - - request_params = { - 'command': 'STATUS', - 'media_id': self.media_id - } - - r = requests.get(MEDIA_ENDPOINT_URL, params=request_params, auth=self.auth) - self.processing_info = r.json().get('processing_info', None) - self._check_status() - - def _post_tweet(self): - request_data = { - 'status': self._build_tweet(), - 'media_ids': self.media_id - } - - r = requests.post(POST_TWEET_URL, data=request_data, auth=self.auth) - if r.ok: - return r.json() - print(r.json()) - - def _build_tweet(self): - link = self.post.permalink - caption = self.post.caption.strip().replace('@', '@/') # Avoid tagging randos on Twitter - characters = 295 - - if self.hashtags: - random_hashtags = random.sample(self.hashtags, random.choice(min([4, 5], [len(self.hashtags)] * 2))) - hashtags = ' '.join(f'#{hashtag}' for hashtag in random_hashtags) - characters -= (len(hashtags + link) + 3) # For 3 newlines -> caption \n hashtags \n\n link - tweet = '\n'.join((caption[:characters], hashtags, '', link)) - - else: - characters -= (len(link) + 2) # For 2 newlines -> caption \n\n link - tweet = '\n'.join((caption[:characters], '', link)) - - return tweet diff --git a/InstaTweet/db.py b/InstaTweet/db.py new file mode 100644 index 0000000..1418f38 --- /dev/null +++ b/InstaTweet/db.py @@ -0,0 +1,123 @@ +from __future__ import annotations +import os +import pickle +import InstaTweet + +from sqlalchemy import create_engine, Column, String, LargeBinary +from sqlalchemy.orm import sessionmaker, scoped_session, Query +from sqlalchemy.ext.declarative import declarative_base + + +DATABASE_URL = os.getenv('DATABASE_URL', '').replace('postgres://', 'postgresql://', 1) +Base = declarative_base() + + +class Profiles(Base): + """Database table used to store :class:`~.Profile` settings + + When a :class:`~.Profile` calls :meth:`~.Profile.save` and has :attr:`~.Profile.local` ``= False``, its + :attr:`~.Profile.name` will be used as the primary key to either insert or update a table row + + * Currently the table only has fields for the :attr:`~.Profile.name` and pickle bytes (from :meth:`~.to_pickle`) + """ + __tablename__ = 'profiles' + name = Column(String, primary_key=True) + config = Column(LargeBinary) + + def __repr__(self): + return "".format(self.name) + + +class DBConnection: + + """Database Connection class with context management ooh wow + + Uses ``SQLAlchemy`` to connect and interact with the database specified in the ``DATABASE_URL`` environment variable + + **Sample Usage** + + >>> def poop_check(): + >>> with DBConnection() as db: + >>> if db.query_profile(name="POOP").first(): + >>> raise FileExistsError('DELETE THIS NEPHEW......') + >>> else: + >>> print("Congrats, you're normal") + """ + + SESSION = None + ENGINE = None + + def __enter__(self): + if not DATABASE_URL: + raise EnvironmentError('Must set the DATABASE_URL environment variable') + + if not self.ENGINE: + engine = create_engine(DATABASE_URL, echo=False) + Base.metadata.create_all(engine) + DBConnection.ENGINE = engine + + if not self.SESSION: + self.connect() + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + DBConnection.SESSION = None + + @staticmethod + def connect() -> None: + """Creates a database session and assigns it to the :attr:`~SESSION`""" + DBConnection.SESSION = scoped_session(sessionmaker(bind=DBConnection.ENGINE)) + + def query_profile(self, name: str) -> Query: + """Queries the database for a :class:`~.Profile` by its name + + :param name: the profile name (ie. the :attr:`.Profile.name`) + :returns: the :class:`~sqlalchemy.orm.Query` NOT the :class:`~.Profile` + """ + return self.SESSION.query(Profiles).filter_by(name=name) + + def load_profile(self, name: str) -> InstaTweet.Profile: + """Loads a profile from the database by name + + :param name: the profile name (ie. the :attr:`.Profile.name`) + :raises LookupError: if the database has no profile saved with the specified name + """ + if profile := self.query_profile(name).first(): + return pickle.loads(profile.config) + else: + raise LookupError(f"No database profile found with the name {name}") + + def save_profile(self, profile: InstaTweet.Profile, alert: bool = True) -> bool: + """Saves a :class:`~.Profile` to the database by either updating an existing row or inserting a new one + + :param profile: the :class:`~.Profile` to save + :param alert: if ``True``, will print a message upon successfully saving + """ + if (db_profile := self.query_profile(profile.name)).first(): + db_profile.update({'config': profile.to_pickle()}) + else: + new_profile = Profiles(name=profile.name, config=profile.to_pickle()) + self.SESSION.add(new_profile) + + self.SESSION.commit() + + if alert: + print(f"Saved Database Profile: {profile.name}") + return True + + def delete_profile(self, name: str, alert: bool = True) -> bool: + """Deletes a :class:`~.Profile` from the database by name + + :param name: the profile name (ie. the :attr:`.Profile.name`) + :param alert: if ``True``, will print a message upon successfully deleting + """ + if not (profile := self.query_profile(name).first()): + raise LookupError(f"No database profile found with the name {name}") + + profile.delete() + self.SESSION.commit() + + if alert: + print(f'Deleted Database Profile: {name}') + return True diff --git a/InstaTweet/examples/create_template.py b/InstaTweet/examples/create_template.py deleted file mode 100644 index abb0fec..0000000 --- a/InstaTweet/examples/create_template.py +++ /dev/null @@ -1,27 +0,0 @@ -from InstaTweet import InstaTweet -import json - -""" - This example creates a template profile, which can later be loaded, saved under a new name, and further modified. - Profile attributes can be set at the time of object initialization or later on via InstaTweet.attribute = value -""" - -session_id = 'string' # The sessionid cookie is obtained by logging into Instagram from browser -twitter_keys = { # You must have Twitter API keys with access to Standard v1.1 endpoints - 'Consumer Key': 'string', - 'Consumer Secret': 'string', - 'Access Token': 'string', - 'Token Secret': 'string' -} - -it = InstaTweet(session_id=session_id, twitter_keys=twitter_keys) -it.save_profile('My Template') - -print('Profile Settings:', json.dumps(it.config, indent=4), sep='\n') - - -def create_template(template_name, session_id=session_id, twitter_keys=twitter_keys): - """Function to Create a Template Profile""" - it = InstaTweet(session_id=session_id, twitter_keys=twitter_keys) - it.save_profile(template_name) - print('Profile Settings:', json.dumps(it.config, indent=4), sep='\n') diff --git a/InstaTweet/examples/run_profile.py b/InstaTweet/examples/run_profile.py deleted file mode 100644 index aa91a17..0000000 --- a/InstaTweet/examples/run_profile.py +++ /dev/null @@ -1,14 +0,0 @@ -from InstaTweet import InstaTweet - -profiles = ['profile1'] - - -def run(profile_name): - """Use this to run fully configured profiles""" - it = InstaTweet.load(profile_name) - it.start() - - -if __name__ == '__main__': - for profile in profiles: - run(profile) diff --git a/InstaTweet/examples/use_template.py b/InstaTweet/examples/use_template.py deleted file mode 100644 index 5808352..0000000 --- a/InstaTweet/examples/use_template.py +++ /dev/null @@ -1,27 +0,0 @@ -from InstaTweet import InstaTweet -import json - -""" -Let's build a new profile based off a template profile (see create_template.py) -We'll add two users to monitor, as well as hashtags to include when composing tweets -""" -template_name, profile_name = '', '' -it = InstaTweet.load(template_name) -it.save_profile(profile_name) - -""" -The add_users() method accepts users in the form of a single string, an iterable of strings, or an entire user_map -Specify scrape_only=False if you'd like to tweet all of the most recent posts when running the first time (rather than just scraping them). -""" -it.add_users('td_canada') -it.add_users('korn_official', scrape_only=False) - -hashtags = ['TDKorn', 'TD', 'Korn'] # General hashtags for all users -for user in it.user_map: - it.add_hashtags(user, hashtags) - -# User-specific hashtags -it.add_hashtags('td_canada', ['finance', 'banking', 'corporate']) -it.add_hashtags('korn_official', ['KornBand', 'metal']) -it.save_profile() -print('User Map:', json.dumps(it.user_map, indent=4), sep='\n') diff --git a/InstaTweet/instaclient.py b/InstaTweet/instaclient.py new file mode 100644 index 0000000..b055cbb --- /dev/null +++ b/InstaTweet/instaclient.py @@ -0,0 +1,93 @@ +import os +import requests +from json.decoder import JSONDecodeError +from . import InstaUser, InstaPost, utils + + +class InstaClient: + + """Minimalistic class for scraping/downloading Instagram user/media data""" + + DOWNLOAD_DIR = os.path.abspath('downloads') + + def __init__(self, session_id: str, user_agent: str = None, proxies: dict = None): + """Initialize an :class:`~InstaClient` with an Instagram sessionid cookie (at minimum) + + :param session_id: valid Instagram sessionid cookie from a browser + :param user_agent: user agent to use in requests made by the class + :param proxies: proxies to use in requests made by the class + """ + if not isinstance(session_id, str): + raise TypeError('session_id must be a string') + + self.session_id = session_id + self.user_agent = user_agent if user_agent else utils.get_agent() + self.proxies = proxies + + if not os.path.exists(InstaClient.DOWNLOAD_DIR): + os.mkdir(InstaClient.DOWNLOAD_DIR) + + def request(self, url: str) -> requests.Response: + """Sends a request using the :attr:`cookies`, :attr:`headers`, and :attr:`proxies` + + :param url: the Instagram URL to send the request to + """ + return requests.get( + url, + headers=self.headers, + cookies=self.cookies, + proxies=self.proxies + ) + + def get_user(self, username: str) -> InstaUser: + """Scrapes an Instagram user's profile and wraps the response + + :param username: the username of the IG user to scrape (without the @) + :return: an :class:`~.InstaUser` object, which wraps the response data + """ + response = self.request(f'https://www.instagram.com/{username}/?__a=1&__d=dis') + if response.ok: + try: + return InstaUser(response.json()) + except JSONDecodeError as e: + msg = f'Unable to scrape Instagram user @{username} - endpoint potentially deprecated?' + raise RuntimeError(msg) from e + else: + try: + error = response.json() + except JSONDecodeError: + error = response.reason + raise RuntimeError( + 'Failed to scrape Instagram user @{u}\nResponse: [{code}] -- {e}'.format( + u=username, code=response.status_code, e=error + ) + ) + + def download_post(self, post: InstaPost, filepath: str = None) -> bool: + """Downloads the media from an Instagram post + + :param post: the :class:`~.InstaPost` of the post to download + :param filepath: the path to save the downloaded media; if ``None``, saves to the :attr:`~DOWNLOAD_DIR` + """ + response = self.request(post.media_url) + if not response.ok: + print(f'Failed to download post {post.permalink} by {post.owner["username"]}') + return False + + filepath = filepath if filepath else os.path.join(self.DOWNLOAD_DIR, post.filename) + with open(filepath, 'wb') as f: + f.write(response.content) + + print(f'Downloaded post {post.permalink} by {post.owner["username"]} to {filepath}') + post.filepath = filepath + return True + + @property + def headers(self) -> dict: + """Headers to use in :meth:`~.request`""" + return {'User-Agent': self.user_agent, } + + @property + def cookies(self) -> dict: + """Cookies to use in :meth:`~.request`""" + return {'sessionid': self.session_id, } diff --git a/InstaTweet/instapost.py b/InstaTweet/instapost.py new file mode 100644 index 0000000..ba07553 --- /dev/null +++ b/InstaTweet/instapost.py @@ -0,0 +1,93 @@ +import os +from datetime import datetime +from tweepy.models import Status + + +class InstaPost: + + """Minimalistic API response wrapper for an Instagram post""" + + def __init__(self, post_data: dict): + """Initialize an :class:`~InstaPost` + + :param post_data: the JSON response data of an Instagram post -- found within auser's profile data + """ + self.json = post_data + self.id = post_data['id'] # Something's wrong if this raises an error + self.is_video = post_data.get('is_video', False) + self.video_url = post_data.get('video_url', '') + self.dimensions = post_data.get('dimensions', {}) + # Attributes set by other classes + self.filepath = '' # Set by InstaClient when downloaded + self.tweet_data = None # Set by TweetClient when tweeted + + def __str__(self): + return f'Post {self.id} by @{self.owner["username"]} on {self.timestamp}' + + @property + def filename(self) -> str: + """Default filepath basename to use when downloading the post (:attr:`~id` + :attr:`~filetype`)""" + return self.id + self.filetype + + @property + def filetype(self) -> str: + """Filetype of the post, based on the value of :attr:`~is_video`""" + return 'mp4' if self.is_video else 'jpg' + + @property + def is_downloaded(self) -> bool: + """Checks if the post has been downloaded yet (``filepath`` attribute is set by :class:`~.InstaClient`)""" + return os.path.exists(self.filepath) + + @property + def owner(self): + if owner := self.json.get('owner', self.json.get('user', {})): + return owner + return dict.fromkeys(['id', 'username']) + + @property + def is_carousel(self): + return self.json.get('media_type') == 8 + + @property + def shortcode(self): + return self.json.get('shortcode', self.json.get('code', '')) + + @property + def permalink(self): + return f'https://www.instagram.com/p/{self.shortcode}' + + @property + def thumbnail_url(self): + return self.json.get('display_url', + self.json.get('thumbnail_src', + self.json.get('thumbnail_resources', + [{}])[-1].get('src', ''))) + + @property + def timestamp(self): + if timestamp := self.json.get('taken_at_timestamp', self.json.get('taken_at', '')): + return datetime.utcfromtimestamp(timestamp) + return '' + + @property + def media_url(self): + return self.video_url if self.is_video else self.thumbnail_url + + @property + def caption(self): + if caption_edge := self.json.get('edge_media_to_caption', {}).get('edges', []): + return caption_edge[0].get('node', {}).get('text', '') + return '' + + def add_tweet_data(self, tweet: Status) -> bool: + """Used by :class:`~.TweetClient` to add minimal tweet data after the post has been tweeted + + :param tweet: a :class:`~tweepy.models.Status` object from a successfully sent tweet + """ + self.tweet_data = { + 'link': tweet.entities['urls'][0]['url'], + 'created_at': str(tweet.created_at), + 'text': tweet.text + } + return True diff --git a/InstaTweet/instatweet.py b/InstaTweet/instatweet.py new file mode 100644 index 0000000..1328733 --- /dev/null +++ b/InstaTweet/instatweet.py @@ -0,0 +1,133 @@ +from typing import Optional, List +from . import utils, TweetClient, InstaClient, InstaPost, Profile + + +class InstaTweet: + + """Uses the settings from a Profile to do the actual InstaTweeting + + You might be wondering, what's InstaTweeting? According to TDK Dictionary: + + **InstaTweet** (`verb`): + To scrape an Instagram account -> download & tweet any new content -> update and save the loaded :class:`~.Profile` + + **Example Sentence** + "Oh, you lost 700 Twitter followers after you shared your IG post? Well maybe if people actually saw the + picture and not just the caption your tweet would've been less creepy. You should've InstaTweeted it. + + """ + + def __init__(self, profile: Profile): + """Initializes InstaTweet using a fully configured :class:`~.Profile` + + The :class:`Profile` will be used to initialize an :class:`~.InstaClient` and :class:`~.TweetClient` + + :Note: + Profile settings will only be validated when calling :meth:`~.start` + + :param profile: the profile to use for InstaTweeting + """ + self.profile = profile + self.proxies = self.get_proxies() + self.insta = self.get_insta_client() + self.twitter = self.get_tweet_client() + + @classmethod + def load(cls, profile_name: str, local: bool = True) -> "InstaTweet": + """Loads a profile by name + + :param profile_name: profile name + :param local: whether the profile is saved locally (True) or remotely on a SQLAlchemy-supported database + + """ + return cls(profile=Profile.load(name=profile_name, local=local)) + + def get_proxies(self) -> Optional[dict]: + """Retrieve proxies using the loaded :class:`~.Profile` settings""" + return utils.get_proxies( + env_key=self.profile.proxy_key + ) + + def get_insta_client(self) -> InstaClient: + """Initializes an :class:`~.InstaClient` using the loaded :class:`~.Profile` settings""" + return InstaClient( + session_id=self.profile.session_id, + user_agent=self.profile.user_agent, + proxies=self.proxies + ) + + def get_tweet_client(self) -> TweetClient: + """Initializes an :class:`~.TweetClient` using the loaded :class:`~.Profile` settings""" + return TweetClient( + profile=self.profile, + proxies=self.proxies + ) + + def start(self) -> None: + """InstaTweets all users in the :class:`~.Profile`'s user map + + Each user will have their profile scraped, and their posts will be compared to their "scraped" list to determine + if any are new. If there's new posts, the content from them will be downloaded and tweeted + + **Notes** + * The :class:`~.Profile` is only saved upon successfully downloading and tweeting a post + - This allows any failed attempts to be retried in the next call to :meth:`~start` + * Error handling/printing is done by :meth:`~.download_post` and :meth:`~.send_tweet` + """ + profile = self.profile + profile.validate() + + print(f'Starting InstaTweet for Profile: {profile.name}') + + for user in profile.user_map: + new_posts = self.get_new_posts(user) + if not new_posts: + print(f'No posts to tweet for @{user}') + continue + + print(f'There are {len(new_posts)} posts to tweet for @{user}') + + for post in new_posts: + self.insta.download_post(post) + if not post.is_downloaded: + continue + + tweeted = self.twitter.send_tweet(post, hashtags=profile.get_hashtags_for(user)) + if not tweeted: + continue + + profile.get_scraped_from(user).append(post.id) + profile.get_tweets_for(user).append(post.tweet_data) + + if profile.exists: + profile.save(alert=False) + + print(f'Finished insta-tweeting for @{user}') + + print(f'All users have been insta-tweeted') + + def get_new_posts(self, username) -> Optional[List[InstaPost]]: + """Scrapes recent posts from an Instagram user and returns all posts that haven't been tweeted yet + + **NOTE:** If a user's ``scraped`` list is empty, no posts will be returned. + + Instead, the user is "initialized" as follows: + * Their ``scraped`` list will be populated with the ID's from the most recent posts + * These IDs are then used in future calls to the method to determine which posts to tweet + + :param username: the IG username to scrape posts from + :return: a list of posts that haven't been tweeted yet, or nothing at all (if user is only initialized) + + """ + print(f'Checking posts from @{username}') + scraped_posts = self.profile.get_scraped_from(username) + user = self.insta.get_user(username) + + if scraped_posts: + new_posts = [post for post in user.posts if post.id not in scraped_posts] + return sorted(new_posts, key=lambda post: post.timestamp) + else: + scraped_posts.extend(post.id for post in user.posts) + print(f'Initialized User: @{username}') + return None + diff --git a/InstaTweet/core/instauser.py b/InstaTweet/instauser.py similarity index 55% rename from InstaTweet/core/instauser.py rename to InstaTweet/instauser.py index 16e4fed..14cbd68 100644 --- a/InstaTweet/core/instauser.py +++ b/InstaTweet/instauser.py @@ -1,20 +1,20 @@ from . import InstaPost -class InstaUser(object): - """Convenience wrapper for Instagram profile API response""" +class InstaUser: + """Minimalistic API response wrapper for an Instagram profile""" - def __init__(self, user_json): - self.json = user_json + def __init__(self, data: dict): + """Initialize an :class:`InstaUser` + + :param data: the API response JSON to use as source data + """ + self.json = data @property def user_data(self): return self.json.get('graphql', {}).get('user') - @property - def id(self): - return int(self.user_data.get('id')) - @property def media_data(self): return self.user_data.get('edge_owner_to_timeline_media', {'edges': []}) @@ -22,3 +22,7 @@ def media_data(self): @property def posts(self): return [InstaPost(media['node']) for media in self.media_data['edges']] + + @property + def id(self): + return int(self.user_data.get('id', -1)) \ No newline at end of file diff --git a/InstaTweet/profile.py b/InstaTweet/profile.py new file mode 100644 index 0000000..a23463c --- /dev/null +++ b/InstaTweet/profile.py @@ -0,0 +1,326 @@ +from __future__ import annotations + +import os +import copy +import json +import pickle + +from typing import Iterable +from . import utils, TweetClient, DBConnection + + +class Profile: + + USER_MAPPING = {'hashtags': [], 'scraped': [], 'tweets': []} + LOCAL_DIR = os.path.join(utils.get_root(), 'profiles') + + def __init__(self, name: str = 'default', local: bool = True, **kwargs): + """Create a new :class:`Profile` + + A :class:`Profile` contains a ``user_map`` and all API access settings associated with it + + ... + + The ``user_map`` is a mapping of added Instagram usernames to their associated :attr:`USER_MAPPING` + + * The mapping includes a list of hashtags, scraped posts, and sent tweets + * Methods exist to access and modify these lists for a particular user + * Mainly used to help compose tweets and detect when posts are new + + ... + + :param name: unique profile name + :param local: indicates if profile is being saved locally or on a remote database + :param kwargs: see below + + :Keyword Arguments: + * *session_id* (``str``) -- + Instagram ``sessionid`` cookie, obtained by logging in through browser + * *twitter_keys* (``dict``) -- + Twitter API Keys with v1.1 endpoint access + * See :attr:`~InstaTweet.tweetclient.TweetClient.DEFAULT_KEYS` for a template + * *user_agent* (``str``) -- Optional + The user agent to use for requests; scrapes the newest Chrome agent if not provided + * *proxy_key* (``str``) -- Optional + Name of environment variable to retrieve proxies from + * *user_map* (``dict``) -- Optional + A dict of Instagram users and their associated :attr:`~.USER_MAPPING` + + :Note: + A name is not necessary to create and *InstaTweet* a profile, but it's required to :meth:`~.save` it + + """ + self.local = local + self.name = name # Will raise Exception if name is already used + + self.session_id = kwargs.get('session_id', '') + self.twitter_keys = kwargs.get('twitter_keys', TweetClient.DEFAULT_KEYS) + self.user_agent = kwargs.get('user_agent', utils.get_agent()) + self.proxy_key = kwargs.get('proxy_key', None) + self.user_map = kwargs.get('user_map', {}) + + @classmethod + def load(cls, name: str, local: bool = True) -> Profile: + """Loads an existing profile from a locally saved pickle file or remotely stored pickle byte string + + :param name: the name of the :class:`Profile` to load + :param local: whether the profile is saved locally (default, ``True``) or remotely on a database + If saved remotely, the ``DATABASE_URL`` environment variable must be configured + """ + if not cls.profile_exists(name, local): + raise LookupError( + f'No {"local" if local else "database"} profile found with the name "{name}"' + ) + if local: + with open(cls.get_local_path(name), 'rb') as f: + return pickle.load(f) + else: + with DBConnection() as db: + return db.load_profile(name) + + @classmethod + def from_json(cls, json_str: str) -> Profile: + """Creates a profile from a JSON formatted string of config settings""" + return cls.from_dict(json.loads(json_str)) + + @classmethod + def from_dict(cls, d: dict) -> Profile: + """Creates a profile from a dictionary of config settings""" + return cls(**d) + + @staticmethod + def profile_exists(name: str, local: bool = True) -> bool: + """Check if a profile with the given name and location (local/remote) already exists""" + if local: + return os.path.exists(Profile.get_local_path(name)) + else: + with DBConnection() as db: + return bool(db.query_profile(name).first()) + + @staticmethod + def get_local_path(name: str) -> str: + """Returns filepath of where a local profile would be saved""" + return os.path.join(Profile.LOCAL_DIR, name) + '.pickle' + + def add_users(self, users: Iterable, send_tweet: bool = False): + """Add Instagram user(s) to the :attr:`~.user_map` for subsequent monitoring + + By default, newly added users will not have their posts tweeted the first time they are scraped - + the IDs of their recent posts are stored, and any new posts from that point forward will be tweeted + + You can override this by setting ``send_tweet=True``, which will immediately scrape AND tweet the recent posts + + :param users: Instagram username(s) to automatically scrape and tweet content from + :param send_tweet: choose if tweets should be sent on the first scrape, or only for new posts going forward + """ + if not isinstance(users, Iterable): + raise TypeError(f'Invalid type provided. `users` must be an Iterable') + if isinstance(users, str): + users = [users] + + for user in users: + mapping = copy.deepcopy(Profile.USER_MAPPING) + self.user_map.setdefault(user, mapping) + + if send_tweet: # Non-empty scraped list will trigger Tweets to send + self.get_scraped_from(user).append(-1) + + print(f'Added Instagram user @{user} to the user map') + + if self.exists: + self._save_profile(alert=False) + + def add_hashtags(self, user: str, hashtags: Iterable): + """Add hashtag(s) to a user in the :attr:`~.user_map`, which will be randomly chosen from when composing Tweets + + :param user: the user in the user map to add hashtags to + :param hashtags: hashtags to choose from and include in any Tweets where content comes from this user + """ + if not isinstance(hashtags, Iterable): + raise TypeError("Hashtags must be provided as a string or iterable of strings") + if isinstance(hashtags, str): + hashtags = [hashtags] + + tags = self.get_hashtags_for(user) # Retrieve the current hashtag list + tags.extend(set(hashtags) - set(tags)) # Add new ones (case-sensitive) + + if self.exists: + self._save_profile(alert=False) + print(f'Added hashtags for @{user}') + + def save(self, name: str = None, alert: bool = True) -> bool: + """Pickles and saves the :class:`Profile` using the specified or currently set name. + + :param name: name to save the :class:`Profile` under; replaces the current :attr:`~.name` + :param alert: set to ``True`` to print a message upon successful save + """ + if name: + self.name = name + if self.is_default: # Profile name wasn't specified and wasn't previously set + raise AttributeError('Profile name is required to save the profile') + else: + return self._save_profile(alert=alert) + + def _save_profile(self, alert: bool = True) -> bool: + """Internal function to save the profile, based on the value of :attr:`~.local`""" + if self.local: + with open(self.profile_path, 'wb') as f: + pickle.dump(self, f) + if alert: + print(f'Saved Local Profile {self.name}') + return True + else: + with DBConnection() as db: + return db.save_profile(profile=self, alert=alert) + + def validate(self) -> None: + """Checks to see if the Profile is fully configured for InstaTweeting + + :raises ValueError: if the :attr:`~.session_id`, :attr:`~.twitter_keys`, or :attr:`~.user_map` are invalid + """ + if not self.session_id: + raise ValueError('Instagram sessionid cookie is required to scrape posts') + + if bad_keys := [key for key, value in self.twitter_keys.items() if value == 'string']: + raise ValueError(f'Values not set for the following Twitter keys: {bad_keys}') + + if not self.user_map: + raise ValueError('You must add at least one Instagram user to auto-tweet from') + + def to_pickle(self) -> bytes: + """Serializes profile to a pickled byte string""" + return pickle.dumps(self) + + def to_json(self) -> str: + """Serializes profile to a JSON formatted string""" + return json.dumps(self.to_dict()) + + def to_dict(self) -> dict: + """Serializes profile to a dict""" + return self.config + + def view_config(self): + """Prints the :attr:`~.config` dict to make it legible""" + for k, v in self.config.items(): + print(f'{k} : {v}') + + @property + def config(self) -> dict: + """Returns a dictionary containing important configuration settings""" + return { + 'name': self.name, + 'local': self.local, + 'session_id': self.session_id, + 'twitter_keys': self.twitter_keys, + 'user_agent': self.user_agent, + 'proxy_key': self.proxy_key, + 'user_map': self.user_map, + } + + @property + def exists(self) -> bool: + """Returns True if a local save file or database record exists for the currently set profile name""" + return self.profile_exists(name=self.name, local=self.local) + + @property + def is_default(self) -> bool: + """Check if profile :attr:`~.name` is set or not""" + return self.name == 'default' + + @property + def profile_path(self) -> str: + """If :attr:`~.local` is ``True``, returns the file path for where this profile would be/is saved""" + if self.local and not self.is_default: + return Profile.get_local_path(self.name) + return '' + + def get_user(self, user: str) -> dict: + """Returns the specified user's dict entry in the :attr:`user_map`""" + return self.user_map[user] + + def get_scraped_from(self, user: str) -> list: + """Returns a list of posts that have been scraped from the specified user""" + return self.user_map[user]['scraped'] + + def get_tweets_for(self, user: str) -> list: + """Returns a list of tweets that use the specified user's scraped content""" + return self.user_map[user]['tweets'] + + def get_hashtags_for(self, user: str) -> list: + """Returns the hashtag list for the specified user""" + return self.user_map[user]['hashtags'] + + @property + def local(self) -> bool: + """Indicates if profile is being saved locally (``True``) or on a remote database (``False``)""" + return self._local + + @local.setter + def local(self, local: bool): + if local: + if not os.path.exists(self.LOCAL_DIR): + os.mkdir(self.LOCAL_DIR) + + self._local = local + + @property + def name(self) -> str: + """The profile name""" + return self._name + + @name.setter + def name(self, profile_name): + """Sets the profile name, if a profile with that name doesn't already exist locally/remotely""" + if profile_name != 'default' and self.profile_exists(profile_name, local=self.local): + if self.local: + raise FileExistsError( + 'Local save file already exists for profile named "{}"\n'.format(profile_name) + + 'Please choose another name, load the profile, or delete the file.') + else: + raise ResourceWarning( + 'Database record already exists for profile named "{}"\n'.format(profile_name) + + 'Please choose another name or use InstaTweet.db to load/delete the profile' + ) + self._name = profile_name + + @property + def session_id(self) -> str: + """Instagram ``sessionid`` cookie, obtained by logging in through a browser + + :Tip: If you log into your account with a browser you don't use, the session cookie will last longer + """ + return self._session_id + + @session_id.setter + def session_id(self, session_id: str): + if not isinstance(session_id, str): + raise TypeError( + f'Session ID cookie must be of type {str}' + ) + self._session_id = session_id + if self.exists: + self._save_profile(alert=False) + + @property + def twitter_keys(self) -> dict: + """Twitter developer API keys with v1.1 endpoint access. See :attr:`~.DEFAULT_KEYS`""" + return self._twitter_keys + + @twitter_keys.setter + def twitter_keys(self, api_keys: dict): + if not isinstance(api_keys, dict): + raise TypeError( + f'Twitter Keys must be of type {dict}' + ) + if missing_keys := [key for key in TweetClient.DEFAULT_KEYS if key not in api_keys]: + raise KeyError( + f'Missing Twitter Keys: {missing_keys}' + ) + for key in TweetClient.DEFAULT_KEYS: + if not bool(api_keys[key]): + raise ValueError( + f'Missing Value for Twitter Key: {key}' + ) + self._twitter_keys = api_keys + if self.exists: + self._save_profile(alert=False) diff --git a/InstaTweet/profiles/Example Template.txt b/InstaTweet/profiles/Example Template.txt deleted file mode 100644 index 6cf83c7..0000000 --- a/InstaTweet/profiles/Example Template.txt +++ /dev/null @@ -1,11 +0,0 @@ -{ - "session_id": "string", - "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", - "twitter_keys": { - "Consumer Key": "string", - "Consumer Secret": "string", - "Access Token": "string", - "Token Secret": "string" - }, - "user_map": {} -} \ No newline at end of file diff --git a/InstaTweet/profiles/My Template.txt b/InstaTweet/profiles/My Template.txt deleted file mode 100644 index 8fbc4a5..0000000 --- a/InstaTweet/profiles/My Template.txt +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profile": "My Template", - "session_id": "session_id", - "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36", - "twitter_keys": { - "Consumer Key": "key", - "Consumer Secret": "secret", - "Access Token": "token", - "Token Secret": "secret" - }, - "user_map": {} -} \ No newline at end of file diff --git a/InstaTweet/tweetclient.py b/InstaTweet/tweetclient.py new file mode 100644 index 0000000..de12f0e --- /dev/null +++ b/InstaTweet/tweetclient.py @@ -0,0 +1,147 @@ +from __future__ import annotations +import os +import random +import tweepy +import InstaTweet + +from . import InstaPost +from typing import Union, Optional +from tweepy.errors import TweepyException + + +class TweetClient: + + MAX_HASHTAGS = 5 + DEFAULT_KEYS = { + 'Consumer Key': 'string', + 'Consumer Secret': 'string', + 'Access Token': 'string', + 'Token Secret': 'string' + } + + def __init__(self, profile: InstaTweet.Profile, proxies: dict = None): + """Initialize TweetClient using a profile + + Basically just a wrapper for tweepy. It uses the settings of a profile to initialize the API and send tweets + + :param profile: the profile to use when initializing a :class:`tweepy.API` object + :param proxies: optional proxies to use when making API requests + """ + self.profile = profile + self.proxies = proxies + self.api = self.get_api() + + def get_api(self) -> tweepy.API: + """Initializes a :class:`tweepy.API` object using the API keys of the loaded :class:`~.Profile`""" + return tweepy.API( + auth=self.get_oauth(self.profile.twitter_keys), + user_agent=self.profile.user_agent, + proxy=self.proxies + ) + + @staticmethod + def get_oauth(api_keys: dict) -> tweepy.OAuth1UserHandler: + """Initializes and returns an ``OAuth1UserHandler`` object from tweepy using the specified API keys + + :param api_keys: Twitter developer API keys with v1.1 endpoint access + """ + if missing_keys := [key for key in TweetClient.DEFAULT_KEYS if key not in api_keys]: + raise KeyError( + f"Missing the following Twitter Keys: {missing_keys}" + ) + if bad_keys := [key for key in TweetClient.DEFAULT_KEYS if not api_keys[key] or api_keys[key] == 'string']: + raise ValueError( + f"Invalid values for the following Twitter keys: {bad_keys}" + ) + return tweepy.OAuth1UserHandler( + consumer_key=api_keys['Consumer Key'], + consumer_secret=api_keys['Consumer Secret'], + access_token=api_keys['Access Token'], + access_token_secret=api_keys['Token Secret'] + ) + + def send_tweet(self, post: InstaPost, hashtags: Optional[list[str]] = None) -> bool: + """Composes and sends a Tweet using an already-downloaded Instagram post + + :param post: the post to tweet; uses the :attr:`~.InstaPost.filepath` as media file source + :param hashtags: a list of hashtags, from the :attr:`~.user_map` + If non-empty, a few will randomly be chosen to include in the tweet + """ + if not post.filepath or not os.path.exists(post.filepath): + raise FileNotFoundError('Post must be downloaded first') + + if not (uploaded := self.upload_media(post)): + return False + + try: + tweet = self.api.update_status( + status=self.build_tweet(post, hashtags), + media_ids=[str(uploaded.media_id)], + ) + print(f'Sent tweet for {post}') + return post.add_tweet_data(tweet) + + except TweepyException as e: + print('Failed to send tweet for {}:\nResponse: {}'.format(post, e)) + return False + + def upload_media(self, post: InstaPost) -> Union[tweepy.Media, bool]: + """Uploads the media from an already-downloaded Instagram post to Twitter + + :param post: the Instagram post to use as the media source + :return: the response from the Twitter API (if upload was successful) or ``False`` + """ + media = self.api.media_upload( + filename=post.filepath, + media_category='TWEET_VIDEO' if post.is_video else 'TWEET_IMAGE', + wait_for_async_finalize=True, + chunked=True + ) + if media.processing_info['state'] != 'succeeded': + print(f'Failed to upload media to Twitter for {post}') + return False + else: + print(f'Successfully uploaded media to Twitter for {post}') + return media + + def build_tweet(self, post: InstaPost, hashtags: Optional[list[str]] = None) -> str: + """Uses an :class:`~.InstaPost` to build the body text of a tweet + + :param post: the post that's being tweeted; the caption and link are used + :param hashtags: optional list of hashtags to randomly pick from and include + :return: the text to use for the tweet + """ + tags = self.pick_hashtags(hashtags) + caption = post.caption.strip().replace('@', '@/') # Avoid tagging randos on Twitter + characters = 280 - len(tags) - len(post.permalink) - 2 + tweet = "{text}\n{hashtags}\n{link}".format( + text=caption[:characters], + hashtags=tags, + link=post.permalink + ) + return tweet + + @staticmethod + def pick_hashtags(hashtags: list[str]) -> str: + """Randomly picks hashtags from the provided list and returns them as a single string + + The number of hashtags chosen will either be 1 less than the length of the list (to avoid using the same tags + in every tweet), or the value of :attr:`~.MAX_HASHTAGS`, whichever is smaller + + :param hashtags: a list of hashtags to randomly choose from + + :Example: + >>> TweetClient.pick_hashtags(['cat','dog','woof']) + "#woof #cat\\n" + + :Note: A newline is added to help with formatting & character counting in :meth:`~.build_tweet` + """ + if not hashtags: + return '' + if not isinstance(hashtags, list): + raise TypeError('Provide a list of hashtags') + + num_hashtags = min(len(hashtags) - 1, TweetClient.MAX_HASHTAGS) # Pick at most MAX_HASHTAGS + random_hashtags = random.sample(hashtags, max(1, num_hashtags)) # Pick at least 1 + + return ' '.join(f'#{hashtag}' for hashtag in random_hashtags) + '\n' diff --git a/InstaTweet/utils.py b/InstaTweet/utils.py index a463e8d..83062f6 100644 --- a/InstaTweet/utils.py +++ b/InstaTweet/utils.py @@ -1,24 +1,43 @@ import os import requests from pathlib import Path -from bs4 import BeautifulSoup as bs +from typing import Optional -BACKUP_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36' +AGENTS = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36'] -def get_agent(): - r = requests.get('https://www.whatismybrowser.com/guides/the-latest-user-agent/chrome') - if r.ok: - soup = bs(r.text, 'html.parser') - if soup_agents := soup.find_all('span', {'class': 'code'}): - return soup_agents[0].text - # If function fails to scrape, will use hardcoded user agent - return BACKUP_AGENT +def get_agents() -> list: + """Scrapes a list of user agents. Returns a default list if the scrape fails.""" + if (response := requests.get('https://www.whatismybrowser.com/guides/the-latest-user-agent/chrome')).ok: + section = response.text.split('

Latest Chrome on Windows 10 User Agents

')[1] + raw_agents = section.split('code\">')[1:] + agents = [agent.split('<')[0] for agent in raw_agents] + for a in agents: + if a not in AGENTS: + AGENTS.append(a) + # If function fails, will still return the hardcoded list + return AGENTS -def get_root(): + +def get_agent(index: int = 0) -> str: + """Returns a single user agent string from the specified index of the AGENTS list""" + return get_agents()[index] # Specify index only if you hardcode more than 1 + + +def get_proxies(env_key) -> Optional[dict]: + """Retrieve proxies from an environment variable""" + if env_key: + return { + "http": os.environ[env_key], + "https": os.environ[env_key] + } + return None + + +def get_root() -> Path: return Path(__file__).parent -def get_filepath(filename, filetype='.txt'): - return os.path.join(get_root(), filename) + filetype +def get_filepath(filename: str, filetype: str = 'txt') -> str: + return os.path.join(get_root(), filename) + '.' + filetype diff --git a/requirements.txt b/requirements.txt index 748906d..b36c25a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ -moviepy==1.0.3 -Pillow==9.0.1 -requests==2.27.1 -requests_oauthlib==1.3.0 -tqdm~=4.58.0 \ No newline at end of file +tweepy>=4.10.0 +requests>=2.27.1 +SQLAlchemy>=1.4.36 +psycopg2>=2.9.3 diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..e7bd203 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,19 @@ +from InstaTweet import InstaTweet + +PROFILES = ['aProfile', 'myProfile'] +LOCAL = True + + +def run(profile_name: str, local: bool = LOCAL): + """Loads and InstaTweets a profile + + :param profile_name: the name of the :class:`~.Profile` + :param local: if the profile is saved locally or in a SQLAlchemy supported database + """ + insta_tweet = InstaTweet.load(profile_name, local=local) + insta_tweet.start() + + +if __name__ == '__main__': + for profile in PROFILES: + run(profile, local=LOCAL)