from __future__ import absolute_import import sys import time import celery.utils.timer2 as timer2 from celery.tests.case import Case, Mock, patch from kombu.tests.case import redirect_stdouts class test_Entry(Case): def test_call(self): scratch = [None] def timed(x, y, moo='foo'): scratch[0] = (x, y, moo) tref = timer2.Entry(timed, (4, 4), {'moo': 'baz'}) tref() self.assertTupleEqual(scratch[0], (4, 4, 'baz')) def test_cancel(self): tref = timer2.Entry(lambda x: x, (1, ), {}) tref.cancel() self.assertTrue(tref.cancelled) def test_repr(self): tref = timer2.Entry(lambda x: x(1, ), {}) self.assertTrue(repr(tref)) class test_Schedule(Case): def test_supports_Timer_interface(self): x = timer2.Schedule() x.stop() tref = Mock() x.cancel(tref) tref.cancel.assert_called_with() self.assertIs(x.schedule, x) def test_handle_error(self): from datetime import datetime scratch = [None] def on_error(exc_info): scratch[0] = exc_info s = timer2.Schedule(on_error=on_error) with patch('kombu.async.timer.to_timestamp') as tot: tot.side_effect = OverflowError() s.enter_at(timer2.Entry(lambda: None, (), {}), eta=datetime.now()) s.enter_at(timer2.Entry(lambda: None, (), {}), eta=None) s.on_error = None with self.assertRaises(OverflowError): s.enter_at(timer2.Entry(lambda: None, (), {}), eta=datetime.now()) exc = scratch[0] self.assertIsInstance(exc, OverflowError) class test_Timer(Case): def test_enter_after(self): t = timer2.Timer() try: done = [False] def set_done(): done[0] = True t.call_after(0.3, set_done) mss = 0 while not done[0]: if mss >= 2.0: raise Exception('test timed out') time.sleep(0.1) mss += 0.1 finally: t.stop() def test_exit_after(self): t = timer2.Timer() t.call_after = Mock() t.exit_after(0.3, priority=10) t.call_after.assert_called_with(0.3, sys.exit, 10) def test_ensure_started_not_started(self): t = timer2.Timer() t.running = True t.start = Mock() t.ensure_started() self.assertFalse(t.start.called) def test_call_repeatedly(self): t = timer2.Timer() try: t.schedule.enter_after = Mock() myfun = Mock() myfun.__name__ = 'myfun' t.call_repeatedly(0.03, myfun) self.assertEqual(t.schedule.enter_after.call_count, 1) args1, _ = t.schedule.enter_after.call_args_list[0] sec1, tref1, _ = args1 self.assertEqual(sec1, 0.03) tref1() self.assertEqual(t.schedule.enter_after.call_count, 2) args2, _ = t.schedule.enter_after.call_args_list[1] sec2, tref2, _ = args2 self.assertEqual(sec2, 0.03) tref2.cancelled = True tref2() self.assertEqual(t.schedule.enter_after.call_count, 2) finally: t.stop() @patch('kombu.async.timer.logger') def test_apply_entry_error_handled(self, logger): t = timer2.Timer() t.schedule.on_error = None fun = Mock() fun.side_effect = ValueError() t.schedule.apply_entry(fun) self.assertTrue(logger.error.called) @redirect_stdouts def test_apply_entry_error_not_handled(self, stdout, stderr): t = timer2.Timer() t.schedule.on_error = Mock() fun = Mock() fun.side_effect = ValueError() t.schedule.apply_entry(fun) fun.assert_called_with() self.assertFalse(stderr.getvalue()) @patch('os._exit') def test_thread_crash(self, _exit): t = timer2.Timer() t._next_entry = Mock() t._next_entry.side_effect = OSError(131) t.run() _exit.assert_called_with(1) def test_gc_race_lost(self): t = timer2.Timer() t._is_stopped.set = Mock() t._is_stopped.set.side_effect = TypeError() t._is_shutdown.set() t.run() t._is_stopped.set.assert_called_with() def test_to_timestamp(self): self.assertIs(timer2.to_timestamp(3.13), 3.13) def test_test_enter(self): t = timer2.Timer() t._do_enter = Mock() e = Mock() t.enter(e, 13, 0) t._do_enter.assert_called_with('enter_at', e, 13, priority=0) def test_test_enter_after(self): t = timer2.Timer() t._do_enter = Mock() t.enter_after() t._do_enter.assert_called_with('enter_after') def test_cancel(self): t = timer2.Timer() tref = Mock() t.cancel(tref) tref.cancel.assert_called_with()