Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests are added #18

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


<p align="center" width="100%">
<img width="80%" src="https://github.com/msinamsina/automail/blob/main/docs/_static/automail-logo.png" >
<img width="80%" src="https://github.com/msinamsina/automail/blob/main/docs/_static/automail-logo.png?raw=true" >
</p>


Expand Down
152 changes: 150 additions & 2 deletions automail/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from automail.storage import get_session, create_tables
import time
import os
from pathlib import Path
from automail.utils import get_config_dict, read_config_file



app = typer.Typer()
Expand Down Expand Up @@ -34,14 +37,47 @@ def main(
@app.command()
def init(
db_path: str = typer.Option(
'./',
'./automail-workspace',
"--db-path",
"-db",
prompt="Where do you want to initialize the automail project?",
),
smtp_server: str = typer.Option(
'smtp.gmail.com',
"--smtp-server",
"-ss",
prompt="What is your smtp server?",
help="Your smtp server.",
),
smtp_port: int = typer.Option(
465,
"--smtp-port",
"-sp",
prompt="What is your smtp port?",
help="Your smtp port.",
),
email: str = typer.Option(
'',
"--email",
"-e",
help="Your email address.",
),
password: str = typer.Option(
'',
"--password",
"-p",
help="Your email password.",
hide_input=True,
),
is_test: bool = typer.Option(
False,
"--test",
"-t",
help="Test output.",
),
) -> None:
"""Initialize the mail database."""
from automail.storage import util
from automail import utils
from automail import __app_name__

if not shutil.os.path.exists(db_path):
Expand All @@ -64,12 +100,124 @@ def init(
session, engin = get_session()
create_tables(engin)
typer.echo("Done!")
# create the config file
typer.echo("Creating config file...")
utils.create_config_file(
smtp_server=smtp_server, smtp_port=smtp_port, password=password, sender_email=email, is_test=is_test
)
os.chdir("../")

session.close()
engin.dispose()
return


@app.command()
def register(
email: str = typer.Option(
get_config_dict().get("user", None),
"--email",
"-e",
prompt="What is your email address?",
help="Your email address.",
),
contact_list: Path = typer.Argument(
...,
exists=True,
help="The path to your contact list.",
),
title: str = typer.Option(
"Contact List",
"--title",
"-T",
prompt="What is the title of your contact list?",
help="The title of your contact list.",
),
custom_pdf: bool = typer.Option(
False,
"--custom-pdf",
"-CA",
help="Convert your contact list to pdf.",
),
attachment: Path = typer.Option(
None,
"--attachment",
"-a",
help="The path to the attachment file.",
),
custom_pdf_dir: Path = typer.Option(
None,
"--custom-pdf-dir",
"-cpd",
help="The path to the custom pdf directory.",
),
subject: str = typer.Option(
'None',
"--subject",
"-s",
prompt="What is the subject of your email?",
help="The subject of your email.",
),
template: Optional[Path] = typer.Option(
None,
"--template",
"-t",
# exists=True,
# file_okay= True,
help="The body of your email.",
),

) -> None:
"""Register your email account."""
if os.path.exists('./mail.db') and os.path.exists('./config.cfg'):
from automail.storage import util
from automail import utils

if template is None:
tmp = typer.confirm("Do you want to use template?", default=False)
if tmp:
while True:
template = typer.prompt("Please enter the path to your template file.", type=Path)
if os.path.isfile(template) and (template.suffix == '.txt' or template.suffix == '.html'):
template = str(template)
break
else:
typer.secho(f"File {template} does not exist. or the file type is not supported. "
f"the file type should be .txt or .html", fg=typer.colors.RED)
continue
print(title)
if not os.path.exists(title):
shutil.os.mkdir(title)
else:
typer.secho(f"Directory {title} already exists. Please choose another title.", fg=typer.colors.RED)
raise typer.Exit(code=0)

shutil.copy(contact_list, title)
contact_list = os.path.join(title, os.path.basename(contact_list))

if attachment:
shutil.copy(attachment, title)
attachment = os.path.join(title, os.path.basename(attachment))
if template:
shutil.copy(template, title)
template = os.path.join(title, os.path.basename(template))

typer.echo("Registering your email account...")
util.register_new_process(
email=email,
contact_list=contact_list,
title=title,
custom_pdf=custom_pdf,
attachment=attachment,
custom_pdf_dir=custom_pdf_dir,
subject=subject,
template=template,
)

else:
typer.secho("Please initialize the automail project first.", fg=typer.colors.RED)
raise typer.Exit(code=0)


if __name__ == "__main__":
app(prog_name=__app_name__)
28 changes: 5 additions & 23 deletions automail/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,12 @@ def registration(username, contacts, name="", cpdf=False, attachment="", pdf_dir
This function will register a new process plus records for each contact in the contacts csv file.

"""
session, engin = get_session()
create_tables(engin)

process = Process(title=name, subject=args['subject'], sender=username,
temp_file=args['template'], release_date=datetime.datetime.date(datetime.datetime.now()))
session.add(process)
session.commit()
from automail.storage import register_new_process

print(f"ID:{process.id} => Registering user {username} with contacts {contacts}")
contact_df = pd.read_csv(contacts)
for index, row in contact_df.iterrows():
filename = None
if attachment:
filename = attachment

if cpdf:
filename = os.path.join(pdf_dir, str(row['cpdf']) + '.pdf')
record = Record(receiver=row['email'], data=row.to_json(), process_id=process.id, attachment_path=filename)
session.add(record)
session.commit()
print(f"ID:{record.id} => Registering record for {row['email']}")

session.close()
engin.dispose()
register_new_process(title=name, email=username, contact_list=contacts, custom_pdf=cpdf, attachment=attachment,
custom_pdf_dir=pdf_dir, subject=args.get('subject', ""), template=args.get('template', ""))
print("Process registered successfully!")
print("You can start the process with 'automail start <process_id>' command")


def run(pid, resume=True):
Expand Down
67 changes: 65 additions & 2 deletions automail/storage/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import create_engine

import os
import datetime
import pandas as pd

Base = declarative_base()

Expand Down Expand Up @@ -44,4 +46,65 @@ def get_session():
# create a Session
session = sessionmaker(bind=engine)
session = session()
return session, engine
return session, engine


def register_new_process(title, subject, email, template, contact_list, custom_pdf, attachment, custom_pdf_dir):
"""register a new process in database

Parameters
----------
title: str
title of the process
subject: str
subject of the process
email: str
email of the sender
template: str
path to the template
contact_list: str
path to the contact list
custom_pdf: bool
if True, convert the contact list to pdf
attachment: str
path to the attachment
custom_pdf_dir: str
path to the custom pdf

Returns
-------
None

Examples
--------
>>> register_new_process(title="test", subject="test", email="test", template="test",\
contact_list="test", custom_pdf=False, attachment=None, custom_pdf_dir=None)
"""
from automail.storage import Record, Process

session, engin = get_session()
create_tables(engin)

process = Process(title=title, subject=subject, sender=email,
temp_file=template, release_date=datetime.datetime.date(datetime.datetime.now()))

session.add(process)
session.commit()

print(f"ID:{process.id} => Registering user {email} with contacts {contact_list}")
contact_df = pd.read_csv(contact_list)
for index, row in contact_df.iterrows():
filename = None
if attachment:
filename = attachment.strip()

if custom_pdf:
filename = os.path.join(custom_pdf_dir, str(row['cpdf']) + '.pdf')
email = row['email'].strip()
record = Record(receiver=email, data=row.to_json(), process_id=process.id, attachment_path=filename)
session.add(record)
session.commit()
print(f"ID:{record.id} => Registering record for {email}")

session.close()
engin.dispose()
Loading