diff --git a/tests/test_watchdog.py b/tests/test_watchdog.py index 7b22de1600..e3f67df1c6 100644 --- a/tests/test_watchdog.py +++ b/tests/test_watchdog.py @@ -2,7 +2,7 @@ import subprocess import os import sys -from random import random +from random import random, randrange import urllib as url import time sys.path.append(os.getcwd()) @@ -59,12 +59,26 @@ def test_watchdog(self): duration = int(time.time() - start) self.assertEquals(duration, 4) + # Too much memory used, killed by Watchdog + start = time.time() + p = subprocess.Popen(["python", "tests/test_watchdog.py", "memory"]) + p.wait() + duration = int(time.time() - start) + # process should be killed well before the restart interval of 30. + assert duration < 20 + class MockTxManager(object): def flush(self): "Pretend to flush for a long time" time.sleep(5) sys.exit(0) +class MemoryHogTxManager(object): + def flush(self): + rand_data = [] + while True: + rand_data.append('%030x' % randrange(256**15)) + class PseudoAgent(object): """Same logic as the agent, simplified""" def busy_run(self): @@ -100,6 +114,12 @@ def fast_tornado(self): a._tr_manager = MockTxManager() a.run() + def use_lots_of_memory(self): + a = Application(12345, {}) + a._watchdog = Watchdog(30, 50) + a._tr_manager = MemoryHogTxManager() + a.run() + if __name__ == "__main__": if sys.argv[1] == "busy": a = PseudoAgent() @@ -119,3 +139,6 @@ def fast_tornado(self): elif sys.argv[1] == "test": t = TestWatchdog() t.runTest() + elif sys.argv[1] == "memory": + a = PseudoAgent() + a.use_lots_of_memory() diff --git a/util.py b/util.py index 0ca8061d06..1708cc7cc6 100644 --- a/util.py +++ b/util.py @@ -1,5 +1,6 @@ import os import platform +import resource import signal import socket import subprocess @@ -186,16 +187,21 @@ def _get_hostname_unix(): class Watchdog(object): """Simple signal-based watchdog that will scuttle the current process - if it has not been reset every N seconds. + if it has not been reset every N seconds, or if the processes exceeds + a specified memory threshold. Can only be invoked once per process, so don't use with multiple threads. If you instantiate more than one, you're also asking for trouble. """ - def __init__(self, duration): + def __init__(self, duration, max_mem_mb = 2000): """Set the duration """ self._duration = int(duration) signal.signal(signal.SIGALRM, Watchdog.self_destruct) + # cap memory usage + self._max_mem_kb = 1024 * max_mem_mb + max_mem_bytes = 1024 * self._max_mem_kb + resource.setrlimit(resource.RLIMIT_AS, (max_mem_bytes, max_mem_bytes)) @staticmethod def self_destruct(signum, frame): @@ -208,6 +214,11 @@ def self_destruct(signum, frame): def reset(self): + # self destruct if using too much memory, as tornado will swallow MemoryErrors + mem_usage_kb = int(os.popen('ps -p %d -o %s | tail -1' % (os.getpid(), 'rss')).read()) + if mem_usage_kb > (0.95 * self._max_mem_kb): + Watchdog.self_destruct(signal.SIGKILL, sys._getframe(0)) + log.debug("Resetting watchdog for %d" % self._duration) signal.alarm(self._duration)