Skip to content

Commit

Permalink
Merge branch 'master' into dev-usfd-changes
Browse files Browse the repository at this point in the history
Includes USFD-submitted pull requests digitalmethodsinitiative#368 and digitalmethodsinitiative#371
  • Loading branch information
ianroberts committed Aug 22, 2023
2 parents 39304a6 + b3f0097 commit 70b9bd5
Show file tree
Hide file tree
Showing 33 changed files with 592 additions and 405 deletions.
2 changes: 2 additions & 0 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ def remove_files(self):
# Remove the results file that was created
if self.dataset.get_results_path().exists():
self.dataset.get_results_path().unlink()
if self.dataset.get_results_folder_path().exists():
shutil.rmtree(self.dataset.get_results_folder_path())

# Remove any staging areas with temporary data
self.dataset.remove_staging_areas()
Expand Down
11 changes: 6 additions & 5 deletions backend/workers/datasource_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def work(self):
if not datasource:
continue

# Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id

is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False

Expand Down Expand Up @@ -85,7 +88,7 @@ def work(self):
# -------------------------

# Get the name of the posts table for this datasource
posts_table = datasource_id if "posts_" + datasource_id not in all_tables else "posts_" + datasource_id
posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id

# Count and update for every board individually
for board in boards:
Expand All @@ -104,8 +107,7 @@ def work(self):
# If the datasource is dynamic, we also only update days
# that haven't been added yet - these are heavy queries.
if not is_static:

days_added = self.db.fetchall("SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (datasource_id, board))
days_added = self.db.fetchall("SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (database_db_id, board))

if days_added:

Expand All @@ -130,8 +132,7 @@ def work(self):
FROM %s
WHERE %s AND %s
GROUP BY metric, datasource, board, date;
""" % (datasource_id, posts_table, board_sql, time_sql)

""" % (database_db_id, posts_table, board_sql, time_sql)
# Add to metrics table
rows = [dict(row) for row in self.db.fetchall(query)]

Expand Down
2 changes: 1 addition & 1 deletion backend/workers/expire_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def expire_users(self):
self.log.info(f"User {username} expired - deleting user and datasets")
user.delete()
else:
warning_notification = f"WARNING: This account will be deleted at <time datetime=\"{expires_at.strftime('%C')}\">{expires_at.strftime('%Y-%m-%d %H:%M')}</time>. Make sure to back up your data before then."
warning_notification = f"WARNING: This account will be deleted at <time datetime=\"{expires_at.strftime('%C')}\">{expires_at.strftime('%-d %B %Y %H:%M')}</time>. Make sure to back up your data before then."
user.add_notification(warning_notification)

def expire_notifications(self):
Expand Down
12 changes: 12 additions & 0 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ def get_results_path(self):
"""
return self.folder.joinpath(self.data["result_file"])

def get_results_folder_path(self):
"""
Get path to folder containing accompanying results
Returns a path that may not yet be created
:return Path: A path to the results file
"""
return self.folder.joinpath("folder_" + self.key)

def get_log_path(self):
"""
Get path to dataset log file
Expand Down Expand Up @@ -569,6 +579,8 @@ def delete(self, commit=True):
self.get_results_path().unlink()
if self.get_results_path().with_suffix(".log").exists():
self.get_results_path().with_suffix(".log").unlink()
if self.get_results_folder_path().exists():
shutil.rmtree(self.get_results_folder_path())
except FileNotFoundError:
# already deleted, apparently
pass
Expand Down
135 changes: 100 additions & 35 deletions common/lib/dmi_service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class DmiServiceManagerException(Exception):
"""
pass

class DsmOutOfMemory(DmiServiceManagerException):
"""
Raised when there is a problem with the configuration settings.
"""
pass


class DmiServiceManager:
"""
Expand All @@ -42,6 +48,21 @@ def __init__(self, processor):
self.path_to_files = None
self.path_to_results = None

def check_gpu_memory_available(self, service_endpoint):
"""
Returns tuple with True if server has some memory available and False otherwise as well as the JSON response
from server containing the memory information.
"""
api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint
resp = requests.get(api_endpoint, timeout=30)
if resp.status_code == 200:
return True, resp.json()
elif resp.status_code in [400, 404, 500, 503]:
return False, resp.json()
else:
self.processor.log.warning("Unknown response from DMI Service Manager: %s" % resp.text)
return False, None

def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
"""
Process files according to DMI Service Manager local or remote settings
Expand Down Expand Up @@ -71,7 +92,7 @@ def process_files(self, input_file_dir, filenames, output_file_dir, server_file_

def check_progress(self):
if self.local_or_remote == "local":
current_completed = self.count_local_files(self.path_to_results)
current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results))
elif self.local_or_remote == "remote":
existing_files = self.request_folder_files(self.server_file_collection_name)
current_completed = len(existing_files.get(self.server_results_folder_name, []))
Expand All @@ -80,13 +101,16 @@ def check_progress(self):

if current_completed != self.processed_files:
self.processor.dataset.update_status(
f"Collected text from {current_completed} of {self.num_files_to_process} files")
f"Processed {current_completed} of {self.num_files_to_process} files")
self.processor.dataset.update_progress(current_completed / self.num_files_to_process)
self.processed_files = current_completed

def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60):
def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True):
"""
Send request and wait for results to be ready.
Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False.
If counts the number of files in the output folder and compares it to the number of input files.
"""
if self.local_or_remote == "local":
service_endpoint += "_local"
Expand All @@ -103,7 +127,11 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
else:
try:
resp_json = resp.json()
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
# Request already exists
results_url = api_endpoint + "?key=" + resp_json['key']
else:
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
except JSONDecodeError:
# Unexpected Error
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")
Expand All @@ -125,7 +153,8 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
if (time.time() - check_time) > wait_period:
check_time = time.time()
# Update progress
self.check_progress()
if check_process:
self.check_progress()

result = requests.get(results_url, timeout=30)
if 'status' in result.json().keys() and result.json()['status'] == 'running':
Expand All @@ -136,6 +165,17 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
self.processor.dataset.update_status(f"Completed {service_endpoint}!")
success = True
break

elif 'returncode' in result.json().keys() and int(result.json()['returncode']) == 1:
# Error
if 'error' in result.json().keys():
error = result.json()['error']
if "CUDA error: out of memory" in error:
raise DmiServiceManagerException("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.")
else:
raise DmiServiceManagerException(f"Error {service_endpoint}: " + error)
else:
raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
else:
# Something botched
raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
Expand All @@ -147,22 +187,32 @@ def process_results(self, local_output_dir):
# Output files are already in local directory
pass
elif self.local_or_remote == "remote":
# Update list of result files
existing_files = self.request_folder_files(self.server_file_collection_name)
result_files = existing_files.get(self.server_results_folder_name, [])

self.download_results(result_files, self.server_file_collection_name, self.server_results_folder_name, local_output_dir)
results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name)
self.processor.dataset.log(f"Downloading results from {results_path}...")
# Collect result filenames from server
result_files = self.request_folder_files(results_path)
for path, files in result_files.items():
if path == '.':
self.download_results(files, results_path, local_output_dir)
else:
Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True)
self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path))

def request_folder_files(self, folder_name):
"""
Request files from a folder on the DMI Service Manager server.
"""
filename_url = f"{self.server_address}list_filenames?folder_name={folder_name}"
filename_url = f"{self.server_address}list_filenames/{folder_name}"
filename_response = requests.get(filename_url, timeout=30)

# Check if 4CAT has access to this PixPlot server
if filename_response.status_code == 403:
raise DmiServiceManagerException("403: 4CAT does not have permission to use the DMI Service Manager server")
elif filename_response.status_code in [400, 405]:
raise DmiServiceManagerException(f"400: DMI Service Manager server {filename_response.json()['reason']}")
elif filename_response.status_code == 404:
# Folder not found; no files
return {}

return filename_response.json()

Expand All @@ -187,44 +237,58 @@ def send_files(self, file_collection_name, results_name, files_to_upload, dir_wi
# Check if files have already been sent
self.processor.dataset.update_status("Connecting to DMI Service Manager...")
existing_files = self.request_folder_files(file_collection_name)
uploaded_files = existing_files.get('files', [])
uploaded_files = existing_files.get('4cat_uploads', [])
if len(uploaded_files) > 0:
self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files)))

# Compare files with previously uploaded
to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files]
total_files_to_upload = len(to_upload_filenames)

if len(to_upload_filenames) > 0 or results_name not in existing_files:
# TODO: perhaps upload one at a time?
if total_files_to_upload > 0 or results_name not in existing_files:
api_upload_endpoint = f"{self.server_address}send_files"

# Create a blank file to upload into results folder
empty_placeholder = f"4CAT_{results_name}_blank.txt"
with open(dir_with_files.joinpath(empty_placeholder), 'w') as file:
file.write('')

self.processor.dataset.update_status(f"Uploading {len(to_upload_filenames)} files")
response = requests.post(api_upload_endpoint,
files=[('files', open(dir_with_files.joinpath(file), 'rb')) for file in
to_upload_filenames] + [
(results_name, open(dir_with_files.joinpath(empty_placeholder), 'rb'))],
data=data, timeout=120)

if response.status_code == 200:
self.processor.dataset.update_status(f"Files uploaded: {len(to_upload_filenames)}")
elif response.status_code == 403:
raise DmiServiceManagerException("403: 4CAT does not have permission to use the DMI Service Manager server")
elif response.status_code == 405:
raise DmiServiceManagerException("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)")
else:
self.processor.dataset.update_status(f"Unable to upload {len(to_upload_filenames)} files!")

server_path_to_files = Path(file_collection_name).joinpath("files")
self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files")
files_uploaded = 0
while to_upload_filenames:
# Upload files
if files_uploaded == 0:
upload_file = empty_placeholder
# Upload a blank file to create the results folder
response = requests.post(api_upload_endpoint,
files=[(results_name, open(dir_with_files.joinpath(empty_placeholder), 'rb'))],
data=data, timeout=120)
else:
upload_file = to_upload_filenames.pop()
response = requests.post(api_upload_endpoint,
files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))],
data=data, timeout=120)

if response.status_code == 200:
files_uploaded += 1
if files_uploaded % 1000 == 0:
self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!")
self.processor.dataset.update_progress(files_uploaded / total_files_to_upload)
elif response.status_code == 403:
raise DmiServiceManagerException("403: 4CAT does not have permission to use the DMI Service Manager server")
elif response.status_code == 405:
raise DmiServiceManagerException("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)")
else:
self.processor.dataset.log(f"Unable to upload file ({response.status_code - response.reason}): {upload_file}")

self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!")

server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads")
server_path_to_results = Path(file_collection_name).joinpath(results_name)

return server_path_to_files, server_path_to_results

def download_results(self, filenames_to_download, file_collection_name, folder_name, local_output_dir):
def download_results(self, filenames_to_download, folder_name, local_output_dir):
"""
Download results from the DMI Service Manager server.
Expand All @@ -235,10 +299,11 @@ def download_results(self, filenames_to_download, file_collection_name, folder_n
:param Dataset dataset: Dataset object for status updates
"""
# Download the result files
api_upload_endpoint = f"{self.server_address}uploads/"
api_upload_endpoint = f"{self.server_address}download/"
self.processor.dataset.update_status(f"Downloading {len(filenames_to_download)} from {folder_name}...")
for filename in filenames_to_download:
file_response = requests.get(api_upload_endpoint + f"{file_collection_name}/{folder_name}/{filename}", timeout=30)
self.processor.dataset.update_status(f"Downloading {filename}...")
file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=30)

with open(local_output_dir.joinpath(filename), 'wb') as file:
file.write(file_response.content)

Expand Down
2 changes: 1 addition & 1 deletion datasources/eightchan/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 8chan data source for 4CAT

The 8chan data source works much the same as the 4chan data source. Please
refer to `/datasources/fourchan/README.md` for more information.
refer to the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) and the `/datasources/fourchan/README.md` for more information.
4 changes: 2 additions & 2 deletions datasources/eightkun/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# 8kun data source for 4CAT

The 8kun data source works much the same as the 4chan data source. Please
refer to `/datasources/fourchan/README.md` for more information.
refer to the [installation instructions for local data sources](https://github.com/digitalmethodsinitiative/4cat/wiki/Enabling-local-data-sources) and `/datasources/fourchan/README.md` for more information.

It is virtually identical to the 8chan data source also provided. However,
This data source virtually identical to the 8chan data source. However,
since 8kun is distinct from 8chan and has a new owner, it serves as a
separate data source to allow for changes to the platform without impacting
existing 8chan archives.
Loading

0 comments on commit 70b9bd5

Please sign in to comment.