Skip to content

Commit

Permalink
STOP: first draft of queuing blazegraph uploads jobs together
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinkle committed Nov 15, 2017
1 parent 897776f commit eba8b04
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
24 changes: 21 additions & 3 deletions app/modules/blazeUploader/upload_graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
import requests
import os

import config
from functools import wraps

def queue_upload(func):
'''
Enqueues the graph generated by the enclosed function.
'''
redis_url = config.REDIS_URL
redis_conn = redis.from_url(redis_url)
upload_q = Queue('blazegraph_uploads', connection=redis_conn,
default_timeout=config.DEFAULT_TIMEOUT)
@wraps(func)
def func_wrapper(*args, **kwargs):
# Run the decorated func., and assign the returned graph.
# The general use case is to decorate datastruct_savvy().
graph = func(*args, **kwargs)
job_upload = upload_q.enqueue(upload_graph(graph))
# While we return the job id as a good practice, this isn't used atm.
return job_upload.get_id()
return func_wrapper

def upload_graph(graph):
"""
Expand All @@ -20,7 +38,7 @@ def upload_graph(graph):
Prints out the response object from Blazegraph
"""
blazegraph_url = config.database['blazegraph_url']

data = graph.serialize(format="turtle")

headers = {'Content-Type': 'application/x-turtle'}
Expand All @@ -34,7 +52,7 @@ def upload_graph(graph):
)
return request.content


# This is currently only used for Phylotyper.
def upload_turtle(turtlefile):
"""
Uploads raw data onto Blazegraph. To ensure that Blazegraph interprets
Expand Down
5 changes: 3 additions & 2 deletions app/modules/turtleGrapher/datastruct_savvy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from rdflib import BNode, Literal, Graph
from modules.turtleGrapher.turtle_utils import generate_uri as gu, generate_hash, link_uris
from modules.turtleGrapher.turtle_grapher import generate_graph
from modules.blazeUploader.upload_graph import upload_graph
from modules.blazeUploader.upload_graph import queue_upload
from modules.PanPredic.pan_utils import contig_name_parse
# working with Serotype, Antimicrobial Resistance, & Virulence Factor data
# structures
Expand Down Expand Up @@ -185,11 +185,12 @@ def generate_datastruct(query_file, id_file, pickled_dictionary):

return graph

@queue_upload
def datastruct_savvy(query_file, id_file, pickled_dictionary):
"""
Note: we work we base graphs (those generated solely from the fasta file) and result graphs (those generated from analysis modules (RGI/ECtyper) separately - they are only linked once uploaded to blazegraph
:param args_dict:
:return:
"""
graph = generate_datastruct(query_file, id_file, pickled_dictionary)
return upload_graph(graph)
return graph
26 changes: 26 additions & 0 deletions app/supervisord-rq-blazegraph.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,29 @@ stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

[program:rqworkerblazegraphuploads]
environment=PATH='/opt/conda/envs/backend/bin'
command=/opt/conda/envs/backend/bin/rq worker -c config blazegraph_uploads
process_name=%(program_name)s-%(process_num)s

; This is limited to 1 for blazegraph uploads
numprocs=1

; This is the directory from which RQ is ran. Be sure to point this to the
; directory where your source code is importable from
directory=/app

; RQ requires the TERM signal to perform a warm shutdown. If RQ does not die
; within 10 seconds, supervisor will forcefully kill it
stopsignal=TERM

; These are up to you
autostart=true
autorestart=true

; redirect stdout and stderr for docker logs
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

0 comments on commit eba8b04

Please sign in to comment.