Skip to content

Commit

Permalink
v0.1.2-pre1: HF downloads implemented, need testing
Browse files Browse the repository at this point in the history
  • Loading branch information
christianazinn committed Apr 15, 2024
1 parent e73c01a commit 12d56b2
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 52 deletions.
1 change: 1 addition & 0 deletions Homepage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# main.py
# Version 0.1.2-pre1: Convert_Safetensors done, need to test Hugging_Face_Downloader.py with new queuer,
import streamlit as st
from st_pages import Page, Section, show_pages, add_indentation

Expand Down
10 changes: 9 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ TODO thread safety? threading.Thread(target=process_queue, daemon=True).start()

TODO follow page FILESTATUSes and also rework the scripts in the main directory
TODO rewrite the little page blurbs on all pages
TODO TEST TEST TEST!!!
TODO TEST TEST TEST!!!


# TODO NEXT TESTS
test Hugging_Face_Downloader.py
test queueing downloaded files
test the actual file downloads
test whether it works fine mixing conversion and download jobs
test cancelling file downloads midway
6 changes: 3 additions & 3 deletions pages/Convert_Safetensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# FUNCTIONS ---------------------------------------------------------------------------------

# Trigger the conversion commands
def trigger_command(model_folder, options):
if not any(options.values()):
return "Error: No quantization type selected."
Expand All @@ -16,7 +17,7 @@ def trigger_command(model_folder, options):
queue_command(model_folder, option.lower())
return "Commands queued. They will run sequentially."

# TODO fix this up and add a different command for each outtype
# Schedule the conversion command
def queue_command(model_folder, out_type):
base_dir = Path("llama.cpp/models")
input_dir = base_dir / model_folder
Expand All @@ -35,8 +36,7 @@ def queue_command(model_folder, out_type):
]

# add to scheduler
schedule = get_scheduler()
schedule.add_job(command)
get_scheduler().add_job(command)


# UI CODE ---------------------------------------------------------------------------------
Expand Down
60 changes: 12 additions & 48 deletions pages/Hugging_Face_Downloader.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,35 @@
# FILESTATUS: needs to be migrated to the New Way of Doing Things
# IMPORTS ---------------------------------------------------------------------------------
import os, subprocess, requests, streamlit as st
from apscheduler.schedulers.background import BackgroundScheduler
import requests, streamlit as st
from pathlib import Path
from st_pages import add_indentation
from util.scheduler import *

# FUNCTIONS ---------------------------------------------------------------------------------

# Initialize APScheduler
scheduler = BackgroundScheduler()
scheduler.start()

# Global variables to keep track of download tasks and downloaded files
scheduled_jobs = []
downloaded_files = []

def download_file_task(file_url, download_path, filename):
global downloaded_files
file_path = download_path / filename
# write the download task to the queue
def queue_command(file_url, download_path, filename):
command = [
"aria2c", file_url,
"--max-connection-per-server=16", "--split=8", "--min-split-size=25M", "--allow-overwrite=true",
"-d", str(download_path), "-o", filename,
"--continue=true"
]
try:
subprocess.run(command, check=True)
downloaded_files.append(str(file_path))
except subprocess.CalledProcessError as e:
print(f"Error downloading {filename}: {str(e)}")
get_scheduler.add_job(command)

def queue_download(file_links_dict, model_name):
global scheduled_jobs
# queues a download task for each file in the file_links_dict
def trigger_command(file_links_dict, model_name):
folder_name = model_name.split("/")[-1]
current_dir = Path(__file__).parent
download_path = current_dir.parent / f"llama.cpp/models/{folder_name}"
download_path = Path("llama.cpp/models") / folder_name
download_path.mkdir(parents=True, exist_ok=True)

for file_name, file_url in file_links_dict.items():
filename = Path(file_name).name
job = scheduler.add_job(download_file_task, args=[file_url, download_path, filename])
scheduled_jobs.append(job)
queue_command(file_url, download_path, filename)

return "Download tasks have been queued."

def cancel_downloads():
global scheduled_jobs, downloaded_files
for job in scheduled_jobs:
job.remove()
scheduled_jobs.clear()

for file_path in downloaded_files:
if os.path.exists(file_path):
os.remove(file_path)
downloaded_files.clear()

return "All queued downloads have been cancelled and files removed."

def construct_hf_repo_url(model_name):
base_url = "https://huggingface.co/api/models/"
return f"{base_url}{model_name}/tree/main"

# get the files from the Hugging Face repo - kept basically the same implementation as in the original
def get_files_from_repo(url, repo_name):
try:
response = requests.get(url)
Expand All @@ -82,7 +51,6 @@ def get_files_from_repo(url, repo_name):
return {}, {}
except Exception as e:
return {}, {}


# UI CODE ---------------------------------------------------------------------------------

Expand All @@ -92,7 +60,7 @@ def get_files_from_repo(url, repo_name):

model_name = st.text_input("Download PyTorch models from Huggingface", "Use the HuggingfaceUsername/Modelname")
if st.button("Get File List"):
_, file_links = get_files_from_repo(construct_hf_repo_url(model_name), model_name)
_, file_links = get_files_from_repo(f"https://huggingface.co/api/models/{model_name}/tree/main", model_name)
if file_links:
st.session_state['file_links_dict'] = file_links
files_info = "\n".join(f"{name}, Size: {size}" for name, size in file_links.items())
Expand All @@ -104,15 +72,11 @@ def get_files_from_repo(url, repo_name):

if st.button("Download Files"):
if 'file_links_dict' in st.session_state and st.session_state['file_links_dict']:
queue_message = queue_download(st.session_state['file_links_dict'], model_name)
queue_message = trigger_command(st.session_state['file_links_dict'], model_name)
st.text(queue_message)
else:
st.error("No files to download. Please get the file list first.")

if st.button("Stop Downloads"):
cancel_message = cancel_downloads()
st.text(cancel_message)

with st.expander("How to Download Model Files from Hugging Face", expanded=False):
st.markdown("""
**How to Download Model Files from Hugging Face**
Expand Down
24 changes: 24 additions & 0 deletions util/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# FUNCTIONS ---------------------------------------------------------------------------------

# TODO perhaps make download tasks independent of other tasks? but then you run into thread safety issues I think
# the scheduler singleton class (do i need st.cache_resource here?)
@st.cache_resource
class Scheduler:
Expand All @@ -32,6 +33,8 @@ def __init__(self):
self.job = None
self.command = ""
self.lastLog = ""
# implementation of a way to delete partially downloaded files
self.downloaded_files = []

# toggle active status (i.e. pause/unpause)
def toggle(self):
Expand Down Expand Up @@ -112,15 +115,28 @@ def run_next_job(self):
# if there is a job, run it
if self.command:
try:

# if the job is a download task, add the file to the list of partially downloaded files
if self.command[0] == "aria2c":
download_path = self.command[-4]
filename = self.command[-2]
file_path = download_path / filename
self.downloaded_files.append(str(file_path))

self.job = subprocess.run(self.command, check=True)

# log the job as completed if it works
with open(self.outPath, "a") as f:
self.log = f"Task executed successfully at {self.time()}: {self.command}\n"
f.write(self.log)

# log errors
except subprocess.CalledProcessError as e:
self.mostRecentError = f"Error in task execution: {e}"
self.active = False

clear_downloaded_files()

# log the job as failed
with open(self.outPath, "a") as f:
self.log = f"Error in task execution for task {self.command} at {self.time()}: {e}\n"
Expand All @@ -137,6 +153,7 @@ def run_next_job(self):
# optional argument to retain the job in the queue or to remove it and log it
def terminate(self, requeue=False):
self.job.terminate()
clear_downloaded_files()
# log the job as terminated if not requeue
if not requeue:
with open(self.outPath, "a") as f:
Expand All @@ -150,6 +167,13 @@ def terminate(self, requeue=False):
def time(self):
return datetime.now().strftime("%Y-%m-$d %H:%M:%S")

# clear all partially downloaded files - only required for download tasks
def clear_downloaded_files(self):
for file_path in self.downloaded_files:
if os.path.exists(file_path):
os.remove(file_path)
self.downloaded_files.clear()

# accessor for the scheduler singleton
@st.cache_resource
def get_scheduler():
Expand Down

0 comments on commit 12d56b2

Please sign in to comment.