Coverage for src/rasp_aqua/scheduler.py: 99%
61 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-24 19:24 +0900
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-24 19:24 +0900
1#!/usr/bin/env python3
2"""
3関数を指定時刻に呼び出します.
5Usage:
6 scheduler.py [-c CONFIG]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します.[default: config.yaml]
10"""
12import datetime
13import logging
14import pathlib
15import threading
16import time
17import traceback
19import my_lib.footprint
20import pytz
21import schedule
23worker = None
24should_terminate = threading.Event()
25executed_job = False
27timezone = None
30def init(timezone_, queue, liveness_file, check_interval_sec):
31 global worker # noqa: PLW0603
32 global timezone # noqa: PLW0603
34 timezone = timezone_
36 worker = threading.Thread(
37 target=schedule_worker,
38 args=(queue, liveness_file, check_interval_sec),
39 )
41 worker.start()
44def schedule_task(*args, **kwargs):
45 global executed_job # noqa: PLW0603
46 global timezone
48 func = args[0]
50 logging.info(
51 "Now is %s",
52 datetime.datetime.now(tz=datetime.timezone(datetime.timedelta(hours=timezone["offset"]))).strftime(
53 "%Y-%m-%d %H:%M"
54 ),
55 )
56 logging.info(
57 "Execute %s (%s:%s)", kwargs["name"], pathlib.Path(func.__code__.co_filename).name, func.__name__
58 )
60 func(*args[1:])
62 executed_job = True
65def schedule_status():
66 global timezone
68 for job in sorted(schedule.get_jobs(), key=lambda job: job.next_run):
69 logging.info("Schedule run of %-7s: %s", job.job_func.keywords["name"], job.next_run)
71 idle_sec = schedule.idle_seconds()
72 if idle_sec is not None: 72 ↛ exitline 72 didn't return from function 'schedule_status' because the condition on line 72 was always true
73 hours, remainder = divmod(idle_sec, 3600)
74 minutes, seconds = divmod(remainder, 60)
76 logging.info(
77 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)",
78 datetime.datetime.now(
79 tz=datetime.timezone(datetime.timedelta(hours=timezone["offset"]))
80 ).strftime("%Y-%m-%d %H:%M"),
81 hours,
82 minutes,
83 seconds,
84 )
87def set_schedule(schedule_data):
88 global timezone
90 schedule.clear()
92 for entry in schedule_data:
93 args = (entry["func"],) + entry["args"]
94 schedule.every().day.at(entry["time"], pytz.timezone(timezone["zone"])).do(
95 schedule_task, *args, name=entry["name"]
96 )
99def schedule_worker(queue, liveness_file, check_interval_sec):
100 global should_terminate
101 global executed_job # noqa: PLW0603
103 logging.info("Start schedule worker")
105 i = 0
106 while True:
107 if should_terminate.is_set():
108 schedule.clear()
109 break
110 try:
111 time_start = time.time()
112 while not queue.empty():
113 logging.debug("Regist scheduled job")
114 schedule_data = queue.get()
115 set_schedule(schedule_data)
116 schedule_status()
118 schedule.run_pending()
120 if executed_job:
121 schedule_status()
122 executed_job = False
124 sleep_sec = max(check_interval_sec - (time.time() - time_start), 0.1)
125 logging.debug("Sleep %.1f sec...", sleep_sec)
126 time.sleep(sleep_sec)
127 except OverflowError: # pragma: no cover
128 # NOTE: テストする際,freezer 使って日付をいじるとこの例外が発生する
129 logging.debug(traceback.format_exc())
131 # NOTE: 10秒以上経過していたら,liveness を更新する
132 if (check_interval_sec >= 10) or (i % (10 / check_interval_sec) == 0):
133 my_lib.footprint.update(liveness_file)
135 i += 1
137 logging.info("Terminate schedule worker")