Coverage for src/rasp_aqua/scheduler.py: 99%

61 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-24 19:24 +0900

1#!/usr/bin/env python3 

2""" 

3関数を指定時刻に呼び出します. 

4 

5Usage: 

6 scheduler.py [-c CONFIG] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します.[default: config.yaml] 

10""" 

11 

12import datetime 

13import logging 

14import pathlib 

15import threading 

16import time 

17import traceback 

18 

19import my_lib.footprint 

20import pytz 

21import schedule 

22 

23worker = None 

24should_terminate = threading.Event() 

25executed_job = False 

26 

27timezone = None 

28 

29 

30def init(timezone_, queue, liveness_file, check_interval_sec): 

31 global worker # noqa: PLW0603 

32 global timezone # noqa: PLW0603 

33 

34 timezone = timezone_ 

35 

36 worker = threading.Thread( 

37 target=schedule_worker, 

38 args=(queue, liveness_file, check_interval_sec), 

39 ) 

40 

41 worker.start() 

42 

43 

44def schedule_task(*args, **kwargs): 

45 global executed_job # noqa: PLW0603 

46 global timezone 

47 

48 func = args[0] 

49 

50 logging.info( 

51 "Now is %s", 

52 datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=timezone["offset"]))).strftime( 

53 "%Y-%m-%d %H:%M" 

54 ), 

55 ) 

56 logging.info( 

57 "Execute %s (%s:%s)", kwargs["name"], pathlib.Path(func.__code__.co_filename).name, func.__name__ 

58 ) 

59 

60 func(*args[1:]) 

61 

62 executed_job = True 

63 

64 

65def schedule_status(): 

66 global timezone 

67 

68 for job in sorted(schedule.get_jobs(), key=lambda job: job.next_run): 

69 logging.info("Schedule run of %-7s: %s", job.job_func.keywords["name"], job.next_run) 

70 

71 idle_sec = schedule.idle_seconds() 

72 if idle_sec is not None: 72 ↛ exitline 72 didn't return from function 'schedule_status' because the condition on line 72 was always true

73 hours, remainder = divmod(idle_sec, 3600) 

74 minutes, seconds = divmod(remainder, 60) 

75 

76 logging.info( 

77 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)", 

78 datetime.datetime.now( 

79 tz=datetime.timezone(datetime.timedelta(hours=timezone["offset"])) 

80 ).strftime("%Y-%m-%d %H:%M"), 

81 hours, 

82 minutes, 

83 seconds, 

84 ) 

85 

86 

87def set_schedule(schedule_data): 

88 global timezone 

89 

90 schedule.clear() 

91 

92 for entry in schedule_data: 

93 args = (entry["func"],) + entry["args"] 

94 schedule.every().day.at(entry["time"], pytz.timezone(timezone["zone"])).do( 

95 schedule_task, *args, name=entry["name"] 

96 ) 

97 

98 

99def schedule_worker(queue, liveness_file, check_interval_sec): 

100 global should_terminate 

101 global executed_job # noqa: PLW0603 

102 

103 logging.info("Start schedule worker") 

104 

105 i = 0 

106 while True: 

107 if should_terminate.is_set(): 

108 schedule.clear() 

109 break 

110 try: 

111 time_start = time.time() 

112 while not queue.empty(): 

113 logging.debug("Regist scheduled job") 

114 schedule_data = queue.get() 

115 set_schedule(schedule_data) 

116 schedule_status() 

117 

118 schedule.run_pending() 

119 

120 if executed_job: 

121 schedule_status() 

122 executed_job = False 

123 

124 sleep_sec = max(check_interval_sec - (time.time() - time_start), 0.1) 

125 logging.debug("Sleep %.1f sec...", sleep_sec) 

126 time.sleep(sleep_sec) 

127 except OverflowError: # pragma: no cover 

128 # NOTE: テストする際,freezer 使って日付をいじるとこの例外が発生する 

129 logging.debug(traceback.format_exc()) 

130 

131 # NOTE: 10秒以上経過していたら,liveness を更新する 

132 if (check_interval_sec >= 10) or (i % (10 / check_interval_sec) == 0): 

133 my_lib.footprint.update(liveness_file) 

134 

135 i += 1 

136 

137 logging.info("Terminate schedule worker")