diff --git a/supervisor/process.py b/supervisor/process.py index 9133ad767..2f2483a6f 100644 --- a/supervisor/process.py +++ b/supervisor/process.py @@ -355,6 +355,28 @@ def _spawn_as_child(self, filename, argv): options.write(2, "supervisor: child process was not spawned\n") options._exit(127) # exit process with code for spawn failure + def _check_and_adjust_for_system_clock_rollback(self, test_time): + """ + Check if system clock has rolled backward beyond test_time. If so, set + affected timestamps to test_time. + """ + if self.state == ProcessStates.STARTING: + if test_time < self.laststart: + self.laststart = test_time; + if self.delay > 0 and test_time < (self.delay - self.config.startsecs): + self.delay = test_time + self.config.startsecs + elif self.state == ProcessStates.RUNNING: + if test_time > self.laststart and test_time < (self.laststart + self.config.startsecs): + self.laststart = test_time - self.config.startsecs + elif self.state == ProcessStates.STOPPING: + if test_time < self.laststopreport: + self.laststopreport = test_time; + if self.delay > 0 and test_time < (self.delay - self.config.stopwaitsecs): + self.delay = test_time + self.config.stopwaitsecs + elif self.state == ProcessStates.BACKOFF: + if self.delay > 0 and test_time < (self.delay - self.backoff): + self.delay = test_time + self.backoff + def stop(self): """ Administrative stop """ self.administrative_stop = True @@ -365,6 +387,9 @@ def stop_report(self): """ Log a 'waiting for x to stop' message with throttling. """ if self.state == ProcessStates.STOPPING: now = time.time() + + self._check_and_adjust_for_system_clock_rollback(now) + if now > (self.laststopreport + 2): # every 2 seconds self.config.options.logger.info( 'waiting for %s to stop' % as_string(self.config.name)) @@ -497,6 +522,9 @@ def finish(self, pid, sts): es, msg = decode_wait_status(sts) now = time.time() + + self._check_and_adjust_for_system_clock_rollback(now) + self.laststop = now processname = as_string(self.config.name) @@ -604,6 +632,8 @@ def transition(self): now = time.time() state = self.state + self._check_and_adjust_for_system_clock_rollback(now) + logger = self.config.options.logger if self.config.options.mood > SupervisorStates.RESTARTING: @@ -836,6 +866,12 @@ def transition(self): if dispatch_capable: if self.dispatch_throttle: now = time.time() + + if now < self.last_dispatch: + # The system clock appears to have moved backward + # Reset self.last_dispatch accordingly + self.last_dispatch = now; + if now - self.last_dispatch < self.dispatch_throttle: return self.dispatch() diff --git a/supervisor/tests/test_process.py b/supervisor/tests/test_process.py index 7144b4c7d..6c592be52 100644 --- a/supervisor/tests/test_process.py +++ b/supervisor/tests/test_process.py @@ -738,6 +738,40 @@ def test_stop_report_logs_throttled_by_laststopreport(self): instance.stop_report() self.assertEqual(len(options.logger.data), 1) # throttled + def test_stop_report_laststopreport_in_future(self): + future_time = time.time() + 3600 # 1 hour into the future + options = DummyOptions() + config = DummyPConfig(options, 'test', '/test') + instance = self._makeOne(config) + instance.pid = 11 + dispatcher = DummyDispatcher(writable=True) + instance.dispatchers = {'foo':dispatcher} + from supervisor.states import ProcessStates + instance.state = ProcessStates.STOPPING + instance.laststopreport = future_time + + # This iteration of stop_report() should reset instance.laststopreport + # to the current time + instance.stop_report() + + # No logging should have taken place + self.assertEqual(len(options.logger.data), 0) + + # Ensure instance.laststopreport has rolled backward + self.assertTrue(instance.laststopreport < future_time) + + # Sleep for 2 seconds + time.sleep(2) + + # This iteration of stop_report() should actaully trigger the report + instance.stop_report() + + self.assertEqual(len(options.logger.data), 1) + self.assertEqual(options.logger.data[0], 'waiting for test to stop') + self.assertNotEqual(instance.laststopreport, 0) + instance.stop_report() + self.assertEqual(len(options.logger.data), 1) # throttled + def test_give_up(self): options = DummyOptions() config = DummyPConfig(options, 'test', '/test') @@ -1106,6 +1140,43 @@ def test_finish_starting_state_exited_too_quickly(self): self.assertEqual(event.__class__, events.ProcessStateBackoffEvent) self.assertEqual(event.from_state, ProcessStates.STARTING) + # This tests the case where the process has stayed alive longer than + # startsecs (i.e., long enough to enter the RUNNING state), however the + # system clock has since rolled backward such that the current time is + # greater than laststart but less than startsecs. + def test_finish_running_state_exited_too_quickly_due_to_clock_rollback(self): + options = DummyOptions() + config = DummyPConfig(options, 'notthere', '/notthere', + stdout_logfile='/tmp/foo', startsecs=10) + instance = self._makeOne(config) + instance.config.options.pidhistory[123] = instance + pipes = {'stdout':'','stderr':''} + instance.pipes = pipes + instance.config.exitcodes =[-1] + instance.laststart = time.time() + from supervisor.states import ProcessStates + from supervisor import events + instance.state = ProcessStates.RUNNING + L = [] + events.subscribe(events.ProcessStateEvent, lambda x: L.append(x)) + instance.pid = 123 + instance.finish(123, 1) + self.assertFalse(instance.killing) + self.assertEqual(instance.pid, 0) + self.assertEqual(options.parent_pipes_closed, pipes) + self.assertEqual(instance.pipes, {}) + self.assertEqual(instance.dispatchers, {}) + self.assertEqual(options.logger.data[0], + 'exited: notthere (terminated by SIGHUP; expected)') + self.assertEqual(instance.exitstatus, -1) + self.assertEqual(len(L), 1) + event = L[0] + self.assertEqual(event.__class__, + events.ProcessStateExitedEvent) + self.assertEqual(event.expected, True) + self.assertEqual(event.extra_values, [('expected', True), ('pid', 123)]) + self.assertEqual(event.from_state, ProcessStates.RUNNING) + def test_finish_running_state_laststart_in_future(self): options = DummyOptions() config = DummyPConfig(options, 'notthere', '/notthere', @@ -1403,6 +1474,92 @@ def test_transition_starting_to_running(self): event = L[0] self.assertEqual(event.__class__, events.ProcessStateRunningEvent) + def test_transition_starting_to_running_laststart_in_future(self): + from supervisor import events + L = [] + events.subscribe(events.ProcessStateEvent, lambda x: L.append(x)) + from supervisor.states import ProcessStates + + future_time = time.time() + 3600 # 1 hour into the future + options = DummyOptions() + test_startsecs = 2 + + # this should go from STARTING to RUNNING via transition() + pconfig = DummyPConfig(options, 'process', 'process','/bin/process', + startsecs=test_startsecs) + process = self._makeOne(pconfig) + process.backoff = 1 + process.delay = 1 + process.system_stop = False + process.laststart = future_time + process.pid = 1 + process.stdout_buffer = 'abc' + process.stderr_buffer = 'def' + process.state = ProcessStates.STARTING + + # This iteration of transition() should reset process.laststart + # to the current time + process.transition() + + # Process state should still be STARTING + self.assertEqual(process.state, ProcessStates.STARTING) + + # Ensure process.laststart has rolled backward + self.assertTrue(process.laststart < future_time) + + # Sleep for (startsecs + 1) + time.sleep(test_startsecs + 1) + + # This iteration of transition() should actaully trigger the state + # transition to RUNNING + process.transition() + + # this implies RUNNING + self.assertEqual(process.backoff, 0) + self.assertEqual(process.delay, 0) + self.assertFalse(process.system_stop) + self.assertEqual(process.state, ProcessStates.RUNNING) + self.assertEqual(options.logger.data[0], + 'success: process entered RUNNING state, process has ' + 'stayed up for > than {} seconds (startsecs)'.format(test_startsecs)) + self.assertEqual(len(L), 1) + event = L[0] + self.assertEqual(event.__class__, events.ProcessStateRunningEvent) + + def test_transition_backoff_to_starting_delay_in_future(self): + from supervisor import events + L = [] + events.subscribe(events.ProcessStateEvent, lambda x: L.append(x)) + from supervisor.states import ProcessStates, SupervisorStates + + future_time = time.time() + 3600 # 1 hour into the future + options = DummyOptions() + + pconfig = DummyPConfig(options, 'process', 'process','/bin/process') + process = self._makeOne(pconfig) + process.laststart = 1 + process.delay = future_time + process.backoff = 0 + process.state = ProcessStates.BACKOFF + + # This iteration of transition() should reset process.delay + # to the current time + process.transition() + + # Process state should still be BACKOFF + self.assertEqual(process.state, ProcessStates.BACKOFF) + + # Ensure process.delay has rolled backward + self.assertTrue(process.delay < future_time) + + # This iteration of transition() should actaully trigger the state + # transition to STARTING + process.transition() + + self.assertEqual(process.state, ProcessStates.STARTING) + self.assertEqual(len(L), 1) + self.assertEqual(L[0].__class__, events.ProcessStateStartingEvent) + def test_transition_backoff_to_fatal(self): from supervisor import events L = [] @@ -2034,6 +2191,32 @@ class DummyGroup: self.assertEqual(process1.listener_state, EventListenerStates.BUSY) self.assertEqual(process1.event, event) + def test_transition_event_proc_running_with_dispatch_throttle_last_dispatch_in_future(self): + future_time = time.time() + 3600 # 1 hour into the future + options = DummyOptions() + from supervisor.states import ProcessStates + pconfig1 = DummyPConfig(options, 'process1', 'process1','/bin/process1') + process1 = DummyProcess(pconfig1, state=ProcessStates.RUNNING) + gconfig = DummyPGroupConfig(options, pconfigs=[pconfig1]) + pool = self._makeOne(gconfig) + pool.dispatch_throttle = 5 + pool.last_dispatch = future_time + pool.processes = {'process1': process1} + event = DummyEvent() + from supervisor.states import EventListenerStates + process1.listener_state = EventListenerStates.READY + class DummyGroup: + config = gconfig + process1.group = DummyGroup + pool._acceptEvent(event) + pool.transition() + + self.assertEqual(process1.transitioned, True) + self.assertEqual(pool.event_buffer, [event]) # not popped + + # Ensure pool.last_dispatch has been rolled backward + self.assertTrue(pool.last_dispatch < future_time) + def test__dispatchEvent_notready(self): options = DummyOptions() from supervisor.states import ProcessStates