Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[process.py] Handle scenario where system clock rolls backward #1047

Merged
merged 5 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,28 @@ def _spawn_as_child(self, filename, argv):
options.write(2, "supervisor: child process was not spawned\n")
options._exit(127) # exit process with code for spawn failure

def _check_and_adjust_for_system_clock_rollback(self, test_time):
"""
Check if system clock has rolled backward beyond test_time. If so, set
affected timestamps to test_time.
"""
if self.state == ProcessStates.STARTING:
if test_time < self.laststart:
self.laststart = test_time;
if self.delay > 0 and test_time < (self.delay - self.config.startsecs):
self.delay = test_time + self.config.startsecs
elif self.state == ProcessStates.RUNNING:
if test_time > self.laststart and test_time < (self.laststart + self.config.startsecs):
self.laststart = test_time - self.config.startsecs
elif self.state == ProcessStates.STOPPING:
if test_time < self.laststopreport:
self.laststopreport = test_time;
if self.delay > 0 and test_time < (self.delay - self.config.stopwaitsecs):
self.delay = test_time + self.config.stopwaitsecs
elif self.state == ProcessStates.BACKOFF:
if self.delay > 0 and test_time < (self.delay - self.backoff):
self.delay = test_time + self.backoff

def stop(self):
""" Administrative stop """
self.administrative_stop = True
Expand All @@ -365,6 +387,9 @@ def stop_report(self):
""" Log a 'waiting for x to stop' message with throttling. """
if self.state == ProcessStates.STOPPING:
now = time.time()

self._check_and_adjust_for_system_clock_rollback(now)

if now > (self.laststopreport + 2): # every 2 seconds
self.config.options.logger.info(
'waiting for %s to stop' % as_string(self.config.name))
Expand Down Expand Up @@ -497,6 +522,9 @@ def finish(self, pid, sts):
es, msg = decode_wait_status(sts)

now = time.time()

self._check_and_adjust_for_system_clock_rollback(now)

self.laststop = now
processname = as_string(self.config.name)

Expand Down Expand Up @@ -604,6 +632,8 @@ def transition(self):
now = time.time()
state = self.state

self._check_and_adjust_for_system_clock_rollback(now)

logger = self.config.options.logger

if self.config.options.mood > SupervisorStates.RESTARTING:
Expand Down Expand Up @@ -836,6 +866,12 @@ def transition(self):
if dispatch_capable:
if self.dispatch_throttle:
now = time.time()

if now < self.last_dispatch:
# The system clock appears to have moved backward
# Reset self.last_dispatch accordingly
self.last_dispatch = now;

if now - self.last_dispatch < self.dispatch_throttle:
return
self.dispatch()
Expand Down
183 changes: 183 additions & 0 deletions supervisor/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,40 @@ def test_stop_report_logs_throttled_by_laststopreport(self):
instance.stop_report()
self.assertEqual(len(options.logger.data), 1) # throttled

def test_stop_report_laststopreport_in_future(self):
future_time = time.time() + 3600 # 1 hour into the future
options = DummyOptions()
config = DummyPConfig(options, 'test', '/test')
instance = self._makeOne(config)
instance.pid = 11
dispatcher = DummyDispatcher(writable=True)
instance.dispatchers = {'foo':dispatcher}
from supervisor.states import ProcessStates
instance.state = ProcessStates.STOPPING
instance.laststopreport = future_time

# This iteration of stop_report() should reset instance.laststopreport
# to the current time
instance.stop_report()

# No logging should have taken place
self.assertEqual(len(options.logger.data), 0)

# Ensure instance.laststopreport has rolled backward
self.assertTrue(instance.laststopreport < future_time)

# Sleep for 2 seconds
time.sleep(2)

# This iteration of stop_report() should actaully trigger the report
instance.stop_report()

self.assertEqual(len(options.logger.data), 1)
self.assertEqual(options.logger.data[0], 'waiting for test to stop')
self.assertNotEqual(instance.laststopreport, 0)
instance.stop_report()
self.assertEqual(len(options.logger.data), 1) # throttled

def test_give_up(self):
options = DummyOptions()
config = DummyPConfig(options, 'test', '/test')
Expand Down Expand Up @@ -1106,6 +1140,43 @@ def test_finish_starting_state_exited_too_quickly(self):
self.assertEqual(event.__class__, events.ProcessStateBackoffEvent)
self.assertEqual(event.from_state, ProcessStates.STARTING)

# This tests the case where the process has stayed alive longer than
# startsecs (i.e., long enough to enter the RUNNING state), however the
# system clock has since rolled backward such that the current time is
# greater than laststart but less than startsecs.
def test_finish_running_state_exited_too_quickly_due_to_clock_rollback(self):
options = DummyOptions()
config = DummyPConfig(options, 'notthere', '/notthere',
stdout_logfile='/tmp/foo', startsecs=10)
instance = self._makeOne(config)
instance.config.options.pidhistory[123] = instance
pipes = {'stdout':'','stderr':''}
instance.pipes = pipes
instance.config.exitcodes =[-1]
instance.laststart = time.time()
from supervisor.states import ProcessStates
from supervisor import events
instance.state = ProcessStates.RUNNING
L = []
events.subscribe(events.ProcessStateEvent, lambda x: L.append(x))
instance.pid = 123
instance.finish(123, 1)
self.assertFalse(instance.killing)
self.assertEqual(instance.pid, 0)
self.assertEqual(options.parent_pipes_closed, pipes)
self.assertEqual(instance.pipes, {})
self.assertEqual(instance.dispatchers, {})
self.assertEqual(options.logger.data[0],
'exited: notthere (terminated by SIGHUP; expected)')
self.assertEqual(instance.exitstatus, -1)
self.assertEqual(len(L), 1)
event = L[0]
self.assertEqual(event.__class__,
events.ProcessStateExitedEvent)
self.assertEqual(event.expected, True)
self.assertEqual(event.extra_values, [('expected', True), ('pid', 123)])
self.assertEqual(event.from_state, ProcessStates.RUNNING)

def test_finish_running_state_laststart_in_future(self):
options = DummyOptions()
config = DummyPConfig(options, 'notthere', '/notthere',
Expand Down Expand Up @@ -1403,6 +1474,92 @@ def test_transition_starting_to_running(self):
event = L[0]
self.assertEqual(event.__class__, events.ProcessStateRunningEvent)

def test_transition_starting_to_running_laststart_in_future(self):
from supervisor import events
L = []
events.subscribe(events.ProcessStateEvent, lambda x: L.append(x))
from supervisor.states import ProcessStates

future_time = time.time() + 3600 # 1 hour into the future
options = DummyOptions()
test_startsecs = 2

# this should go from STARTING to RUNNING via transition()
pconfig = DummyPConfig(options, 'process', 'process','/bin/process',
startsecs=test_startsecs)
process = self._makeOne(pconfig)
process.backoff = 1
process.delay = 1
process.system_stop = False
process.laststart = future_time
process.pid = 1
process.stdout_buffer = 'abc'
process.stderr_buffer = 'def'
process.state = ProcessStates.STARTING

# This iteration of transition() should reset process.laststart
# to the current time
process.transition()

# Process state should still be STARTING
self.assertEqual(process.state, ProcessStates.STARTING)

# Ensure process.laststart has rolled backward
self.assertTrue(process.laststart < future_time)

# Sleep for (startsecs + 1)
time.sleep(test_startsecs + 1)

# This iteration of transition() should actaully trigger the state
# transition to RUNNING
process.transition()

# this implies RUNNING
self.assertEqual(process.backoff, 0)
self.assertEqual(process.delay, 0)
self.assertFalse(process.system_stop)
self.assertEqual(process.state, ProcessStates.RUNNING)
self.assertEqual(options.logger.data[0],
'success: process entered RUNNING state, process has '
'stayed up for > than {} seconds (startsecs)'.format(test_startsecs))
self.assertEqual(len(L), 1)
event = L[0]
self.assertEqual(event.__class__, events.ProcessStateRunningEvent)

def test_transition_backoff_to_starting_delay_in_future(self):
from supervisor import events
L = []
events.subscribe(events.ProcessStateEvent, lambda x: L.append(x))
from supervisor.states import ProcessStates, SupervisorStates

future_time = time.time() + 3600 # 1 hour into the future
options = DummyOptions()

pconfig = DummyPConfig(options, 'process', 'process','/bin/process')
process = self._makeOne(pconfig)
process.laststart = 1
process.delay = future_time
process.backoff = 0
process.state = ProcessStates.BACKOFF

# This iteration of transition() should reset process.delay
# to the current time
process.transition()

# Process state should still be BACKOFF
self.assertEqual(process.state, ProcessStates.BACKOFF)

# Ensure process.delay has rolled backward
self.assertTrue(process.delay < future_time)

# This iteration of transition() should actaully trigger the state
# transition to STARTING
process.transition()

self.assertEqual(process.state, ProcessStates.STARTING)
self.assertEqual(len(L), 1)
self.assertEqual(L[0].__class__, events.ProcessStateStartingEvent)

def test_transition_backoff_to_fatal(self):
from supervisor import events
L = []
Expand Down Expand Up @@ -2034,6 +2191,32 @@ class DummyGroup:
self.assertEqual(process1.listener_state, EventListenerStates.BUSY)
self.assertEqual(process1.event, event)

def test_transition_event_proc_running_with_dispatch_throttle_last_dispatch_in_future(self):
future_time = time.time() + 3600 # 1 hour into the future
options = DummyOptions()
from supervisor.states import ProcessStates
pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1')
process1 = DummyProcess(pconfig1, state=ProcessStates.RUNNING)
gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1])
pool = self._makeOne(gconfig)
pool.dispatch_throttle = 5
pool.last_dispatch = future_time
pool.processes = {'process1': process1}
event = DummyEvent()
from supervisor.states import EventListenerStates
process1.listener_state = EventListenerStates.READY
class DummyGroup:
config = gconfig
process1.group = DummyGroup
pool._acceptEvent(event)
pool.transition()

self.assertEqual(process1.transitioned, True)
self.assertEqual(pool.event_buffer, [event]) # not popped

# Ensure pool.last_dispatch has been rolled backward
self.assertTrue(pool.last_dispatch < future_time)

def test__dispatchEvent_notready(self):
options = DummyOptions()
from supervisor.states import ProcessStates
Expand Down