Skip to content

Commit

Permalink
Merge branch 'timeouts'
Browse files Browse the repository at this point in the history
This fixes rq#12.
  • Loading branch information
nvie committed Mar 16, 2012
2 parents f47398a + 3c05f20 commit df2b283
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 23 deletions.
11 changes: 9 additions & 2 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(self, id=None):
self.ended_at = None
self._result = None
self.exc_info = None
self.timeout = None


# Data access
Expand Down Expand Up @@ -143,10 +144,10 @@ def refresh(self): # noqa
"""
key = self.key
properties = ['data', 'created_at', 'origin', 'description',
'enqueued_at', 'ended_at', 'result', 'exc_info']
'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout']
data, created_at, origin, description, \
enqueued_at, ended_at, result, \
exc_info = conn.hmget(key, properties)
exc_info, timeout = conn.hmget(key, properties)
if data is None:
raise NoSuchJobError('No such job: %s' % (key,))

Expand All @@ -164,6 +165,10 @@ def to_date(date_str):
self.ended_at = to_date(ended_at)
self._result = result
self.exc_info = exc_info
if timeout is None:
self.timeout = None
else:
self.timeout = int(timeout)

def save(self):
"""Persists the current job instance to its corresponding Redis key."""
Expand All @@ -186,6 +191,8 @@ def save(self):
obj['result'] = self._result
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout

conn.hmset(key, obj)

Expand Down
21 changes: 18 additions & 3 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ def from_queue_key(cls, queue_key):
name = queue_key[len(prefix):]
return Queue(name)

def __init__(self, name='default'):
def __init__(self, name='default', default_timeout=None):
prefix = self.redis_queue_namespace_prefix
self.name = name
self._key = '%s%s' % (prefix, name)
self._default_timeout = default_timeout

@property
def key(self):
Expand Down Expand Up @@ -99,24 +100,38 @@ def enqueue(self, f, *args, **kwargs):
Expects the function to call, along with the arguments and keyword
arguments.
The special keyword `timeout` is reserved for `enqueue()` itself and
it won't be passed to the actual job function.
"""
if f.__module__ == '__main__':
raise ValueError(
'Functions from the __main__ module cannot be processed '
'by workers.')

timeout = kwargs.pop('timeout', None)
job = Job.create(f, *args, **kwargs)
return self.enqueue_job(job)
return self.enqueue_job(job, timeout=timeout)

def enqueue_job(self, job, set_meta_data=True):
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.
When the `timeout` argument is sent, it will overrides the default
timeout value of 180 seconds. `timeout` may either be a string or
integer.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
"""
if set_meta_data:
job.origin = self.name
job.enqueued_at = times.now()

if timeout:
job.timeout = timeout # _timeout_in_seconds(timeout)
else:
job.timeout = 180 # default

job.save()
self.push_job_id(job.id)
return job
Expand Down
51 changes: 51 additions & 0 deletions rq/timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import signal


class JobTimeoutException(Exception):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
pass


class death_pentalty_after(object):
def __init__(self, timeout):
self._timeout = timeout

def __enter__(self):
self.setup_death_penalty()

def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass

# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, JobTimeoutException being handled normally to the
# invoking context.
return False

def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout)

def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
a JobTimeoutException after the timeout amount (expressed in
seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)

def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
35 changes: 27 additions & 8 deletions rq/worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
import os
import errno
import random
Expand All @@ -17,6 +16,7 @@
from .proxy import conn
from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_pentalty_after

green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
Expand Down Expand Up @@ -295,14 +295,14 @@ def work(self, burst=False): # noqa
return did_perform_work

def fork_and_perform_job(self, job):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
child_pid = os.fork()
if child_pid == 0:
self._is_horse = True
random.seed()
self.log = Logger('horse')

success = self.perform_job(job)
sys.exit(int(not success))
self.main_work_horse(job)
else:
self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time()))
Expand All @@ -320,12 +320,31 @@ def fork_and_perform_job(self, job):
if e.errno != errno.EINTR:
raise

def main_work_horse(self, job):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
# that are different from the worker.
random.seed()
self._is_horse = True
self.log = Logger('horse')

success = self.perform_job(job)

# os._exit() is the way to exit from childs after a fork(), in
# constrast to the regular sys.exit()
os._exit(int(not success))

def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.procline('Processing %s from %s since %s' % (
job.func.__name__,
job.origin, time.time()))

try:
rv = job.perform()
with death_pentalty_after(job.timeout or 180):
rv = job.perform()
except Exception as e:
fq = self.failed_queue
self.log.exception(red(str(e)))
Expand Down
5 changes: 5 additions & 0 deletions run_tests
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ else
safe_rg=cat
fi

export ONLY_RUN_FAST_TESTS=1
if [ "$1" == '-f' ]; then # Poor man's argparse
unset ONLY_RUN_FAST_TESTS
fi

if check_redis_running; then
/usr/bin/env python -m unittest discover -v -s tests $@ 2>&1 | egrep -v '^test_' | $safe_rg
else
Expand Down
17 changes: 15 additions & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logbook import NullHandler
from rq import conn


def find_empty_redis_database():
"""Tries to connect to a random Redis database (starting from 4), and
will use/connect it when no keys are in there.
Expand All @@ -15,6 +16,18 @@ def find_empty_redis_database():
assert False, 'No empty Redis database found to run tests in.'


def slow(f):
import os
from functools import wraps

@wraps(f)
def _inner(*args, **kwargs):
if os.environ.get('ONLY_RUN_FAST_TESTS'):
f(*args, **kwargs)

return _inner


class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ.
Expand Down Expand Up @@ -52,5 +65,5 @@ def tearDownClass(cls):

# Pop the connection to Redis
testconn = conn.pop()
assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'

assert testconn == cls.testconn, 'Wow, something really nasty ' \
'happened to the Redis connection stack. Check your setup.'
9 changes: 9 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,40 @@
This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics.
"""
import time


def say_hello(name=None):
"""A job with a single argument and a return value."""
if name is None:
name = 'Stranger'
return 'Hi there, %s!' % (name,)


def do_nothing():
"""The best job in the world."""
pass


def div_by_zero(x):
"""Prepare for a division-by-zero exception."""
return x / 0


def some_calculation(x, y, z=1):
"""Some arbitrary calculation with three numbers. Choose z smartly if you
want a division by zero exception.
"""
return x * y / z


def create_file(path):
"""Creates a file at the given path. Actually, leaves evidence that the
job ran."""
with open(path, 'w') as f:
f.write('Just a sentinel.')


def create_file_after_timeout(path, timeout):
time.sleep(timeout)
create_file(path)
2 changes: 1 addition & 1 deletion tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def test_persistence_of_typical_jobs(self):
['created_at', 'data', 'description'])

def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(some_calculation, 3, 4, z=2)
job.save()

Expand Down Expand Up @@ -149,4 +150,3 @@ def test_fetching_unreadable_data(self):
self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError):
job.refresh()

45 changes: 38 additions & 7 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from tests import RQTestCase
from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file
from tests import RQTestCase, slow
from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \
create_file_after_timeout
from tests.helpers import strip_milliseconds
from rq import Queue, Worker, Job

Expand All @@ -16,10 +17,12 @@ def test_work_and_quit(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.')
self.assertEquals(w.work(burst=True), False,
'Did not expect any work on the queue.')

fooq.enqueue(say_hello, name='Frank')
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
self.assertEquals(w.work(burst=True), True,
'Expected at least some work done.')

def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue."""
Expand Down Expand Up @@ -78,13 +81,13 @@ def test_work_fails(self):
job = Job.fetch(job.id)
self.assertEquals(job.origin, q.name)

# should be the original enqueued_at date, not the date of enqueueing to
# the failed queue
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
self.assertEquals(job.enqueued_at, enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info


def test_cancelled_jobs_arent_executed(self):
def test_cancelled_jobs_arent_executed(self): # noqa
"""Cancelling jobs."""

SENTINEL_FILE = '/tmp/rq-tests.txt'
Expand Down Expand Up @@ -129,3 +132,31 @@ def test_cleaning_up_of_jobs(self):
# results are immediately removed
assert self.testconn.ttl(job_with_rv.key) > 0
assert self.testconn.exists(job_without_rv.key) == False


@slow # noqa
def test_timeouts(self):
"""Worker kills jobs after timeout."""
sentinel_file = '/tmp/.rq_sentinel'

q = Queue()
w = Worker([q])

# Put it on the queue with a timeout value
res = q.enqueue(
create_file_after_timeout, sentinel_file, 4,
timeout=1)

try:
os.unlink(sentinel_file)
except OSError as e:
if e.errno == 2:
pass

self.assertEquals(os.path.exists(sentinel_file), False)
w.work(burst=True)
self.assertEquals(os.path.exists(sentinel_file), False)

# TODO: Having to do the manual refresh() here is really ugly!
res.refresh()
self.assertIn('JobTimeoutException', res.exc_info)

0 comments on commit df2b283

Please sign in to comment.