Coverage for flask/src/rasp_water/control/scheduler.py: 99%
149 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-04 12:06 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-04 12:06 +0900
1#!/usr/bin/env python3
2import datetime
3import logging
4import os
5import re
6import threading
7import time
8import traceback
10import my_lib.footprint
11import my_lib.serializer
12import my_lib.webapp.config
13import my_lib.webapp.log
14import rasp_water.control.webapi.valve
15import rasp_water.control.webapi.test.time
16import schedule
18RETRY_COUNT = 3
20schedule_lock = None
21should_terminate = threading.Event()
23# Worker-specific scheduler instance for pytest-xdist parallel execution
24_scheduler_instances = {}
27def get_scheduler():
28 """Get worker-specific scheduler instance for pytest-xdist parallel execution"""
29 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main")
31 if worker_id not in _scheduler_instances:
32 # Create a new scheduler instance for this worker
33 _scheduler_instances[worker_id] = schedule.Scheduler()
35 return _scheduler_instances[worker_id]
37def init():
38 global schedule_lock # noqa: PLW0603
39 global should_terminate
41 schedule_lock = threading.Lock()
42 should_terminate.clear()
45def term():
46 global should_terminate
48 should_terminate.set()
51def valve_auto_control_impl(config, period):
52 try:
53 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ
54 rasp_water.control.webapi.valve.set_valve_state(config, 1, period * 60, True, "scheduler")
55 return True
57 # logging.debug("Request scheduled execution to {url}".format(url=url))
58 # res = requests.post(
59 # url, params={"cmd": 1, "state": 1, "period": period * 60, "auto": True}
60 # )
61 # logging.debug(res.text)
62 # return res.status_code == 200
63 except Exception as e:
64 logging.exception("Failed to control valve automatically: %s", str(e))
65 import traceback
66 logging.error("Full traceback: %s", traceback.format_exc())
68 return False
71def valve_auto_control(config, period):
72 logging.info("Starts automatic control of the valve")
74 for _ in range(RETRY_COUNT):
75 if valve_auto_control_impl(config, period):
76 return True
78 my_lib.webapp.log.info("😵 水やりの自動実行に失敗しました。")
79 return False
82def schedule_validate(schedule_data): # noqa: C901, PLR0911
83 if len(schedule_data) != 2:
84 logging.warning("Count of entry is Invalid: %d", len(schedule_data))
85 return False
87 for entry in schedule_data:
88 for key in ["is_active", "time", "period", "wday"]:
89 if key not in entry:
90 logging.warning("Does not contain %s", key)
91 return False
92 if type(entry["is_active"]) is not bool:
93 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"]))
94 return False
95 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]):
96 logging.warning("Format of time is invalid: %s", entry["time"])
97 return False
98 if type(entry["period"]) is not int:
99 logging.warning("Type of period is invalid: %s", type(entry["period"]))
100 return False
101 if len(entry["wday"]) != 7:
102 logging.warning("Count of wday is Invalid: %d", len(entry["wday"]))
103 return False
104 for i, wday_flag in enumerate(entry["wday"]):
105 if type(wday_flag) is not bool:
106 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i]))
107 return False
108 return True
111def schedule_store(schedule_data):
112 global schedule_lock
113 try:
114 with schedule_lock:
115 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data)
116 except Exception:
117 logging.exception("Failed to save schedule settings.")
118 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。")
121def gen_schedule_default():
122 return [
123 {
124 "is_active": False,
125 "time": "00:00",
126 "period": 1,
127 "wday": [True] * 7,
128 }
129 ] * 2
132def schedule_load():
133 global schedule_lock
135 schedule_default = gen_schedule_default()
137 try:
138 with schedule_lock:
139 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default)
140 if schedule_validate(schedule_data):
141 return schedule_data
142 except Exception:
143 logging.exception("Failed to load schedule settings.")
144 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。")
146 return schedule_default
149def set_schedule(config, schedule_data): # noqa: C901
150 scheduler = get_scheduler()
151 scheduler.clear()
153 # DUMMY_MODEでは、scheduleライブラリの時刻関数をmy_lib.time.now()に設定
154 if os.environ.get("DUMMY_MODE", "false") == "true": 154 ↛ 160line 154 didn't jump to line 160 because the condition on line 154 was always true
155 scheduler._time_func = my_lib.time.now
157 # DUMMY_MODEでは、スケジュールライブラリのモッキングはしない
158 # 代わりに、テスト用APIでモックした時刻を利用
160 for entry in schedule_data:
161 if not entry["is_active"]:
162 continue
164 if entry["wday"][0]:
165 scheduler.every().sunday.at(entry["time"], my_lib.time.get_pytz()).do(
166 valve_auto_control, config, entry["period"]
167 )
168 if entry["wday"][1]:
169 scheduler.every().monday.at(entry["time"], my_lib.time.get_pytz()).do(
170 valve_auto_control, config, entry["period"]
171 )
172 if entry["wday"][2]:
173 scheduler.every().tuesday.at(entry["time"], my_lib.time.get_pytz()).do(
174 valve_auto_control, config, entry["period"]
175 )
176 if entry["wday"][3]:
177 scheduler.every().wednesday.at(entry["time"], my_lib.time.get_pytz()).do(
178 valve_auto_control, config, entry["period"]
179 )
180 if entry["wday"][4]:
181 scheduler.every().thursday.at(entry["time"], my_lib.time.get_pytz()).do(
182 valve_auto_control, config, entry["period"]
183 )
184 if entry["wday"][5]:
185 scheduler.every().friday.at(entry["time"], my_lib.time.get_pytz()).do(
186 valve_auto_control, config, entry["period"]
187 )
188 if entry["wday"][6]:
189 scheduler.every().saturday.at(entry["time"], my_lib.time.get_pytz()).do(
190 valve_auto_control, config, entry["period"]
191 )
193 for job in scheduler.get_jobs():
194 logging.info("Next run: %s", job.next_run)
196 idle_sec = scheduler.idle_seconds
197 if idle_sec is not None:
198 hours, remainder = divmod(idle_sec, 3600)
199 minutes, seconds = divmod(remainder, 60)
201 logging.info(
202 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)",
203 my_lib.time.now().strftime("%Y-%m-%d %H:%M"),
204 hours,
205 minutes,
206 seconds,
207 )
209 return idle_sec
212def schedule_worker(config, queue):
213 global should_terminate
215 sleep_sec = 0.25
216 scheduler = get_scheduler()
218 logging.info("Load schedule")
219 set_schedule(config, schedule_load())
221 logging.info("Start schedule worker")
223 i = 0
224 while True:
225 if should_terminate.is_set():
226 scheduler.clear()
227 break
228 try:
229 if not queue.empty():
230 schedule_data = queue.get()
231 set_schedule(config, schedule_data)
233 # デバッグ: スケジューラーの実行前に時刻とジョブ状態をログ出力
234 current_time = my_lib.time.now()
235 pending_jobs = len(scheduler.jobs)
236 if pending_jobs > 0:
237 next_run = scheduler.next_run
238 logging.debug("Scheduler check - Current time (my_lib): %s, System time: %s, Pending jobs: %d, Next run: %s",
239 current_time, datetime.datetime.now(), pending_jobs, next_run)
241 # DUMMY_MODEでは、scheduleライブラリの内部時刻をmy_lib.time.now()に強制同期
242 if os.environ.get("DUMMY_MODE", "false") == "true":
243 scheduler._time_func = my_lib.time.now
245 scheduler.run_pending()
246 logging.debug("Sleep %.1f sec...", sleep_sec)
247 time.sleep(sleep_sec)
248 except OverflowError: # pragma: no cover
249 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する
250 logging.debug(traceback.format_exc())
252 if i % (10 / sleep_sec) == 0:
253 my_lib.footprint.update(config["liveness"]["file"]["scheduler"])
255 i += 1
257 logging.info("Terminate schedule worker")
260if __name__ == "__main__":
261 import datetime
262 import multiprocessing
263 import multiprocessing.pool
265 import logger
266 import my_lib.webapp.config
268 logger.init("test", level=logging.DEBUG)
270 def test_func():
271 logging.info("TEST")
273 should_terminate.set()
275 queue = multiprocessing.Queue()
277 pool = multiprocessing.pool.ThreadPool(processes=1)
278 result = pool.apply_async(schedule_worker, (queue,))
280 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5)
281 queue.put([{"time": exec_time.strftime("%H:%M"), "func": test_func}])
283 # NOTE: 終了するのを待つ
284 result.get()