Coverage for flask/src/rasp_water/scheduler.py: 99%
137 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 13:51 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 13:51 +0900
1#!/usr/bin/env python3
2import datetime
3import logging
4import os
5import re
6import threading
7import time
8import traceback
10import my_lib.footprint
11import my_lib.serializer
12import my_lib.webapp.config
13import my_lib.webapp.log
14import rasp_water.webapp_valve
15import schedule
17RETRY_COUNT = 3
19schedule_lock = None
20should_terminate = threading.Event()
22# Worker-specific scheduler instance for pytest-xdist parallel execution
23_scheduler_instances = {}
26def get_scheduler():
27 """Get worker-specific scheduler instance for pytest-xdist parallel execution"""
28 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main")
30 if worker_id not in _scheduler_instances:
31 # Create a new scheduler instance for this worker
32 _scheduler_instances[worker_id] = schedule.Scheduler()
34 return _scheduler_instances[worker_id]
37def init():
38 global schedule_lock # noqa: PLW0603
39 global should_terminate
41 schedule_lock = threading.Lock()
42 should_terminate.clear()
45def term():
46 global should_terminate
48 should_terminate.set()
51def valve_auto_control_impl(config, period):
52 try:
53 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ
54 rasp_water.webapp_valve.set_valve_state(config, 1, period * 60, True, "scheduler")
55 return True
57 # logging.debug("Request scheduled execution to {url}".format(url=url))
58 # res = requests.post(
59 # url, params={"cmd": 1, "state": 1, "period": period * 60, "auto": True}
60 # )
61 # logging.debug(res.text)
62 # return res.status_code == 200
63 except Exception:
64 logging.exception("Failed to control valve automatically")
66 return False
69def valve_auto_control(config, period):
70 logging.info("Starts automatic control of the valve")
72 for _ in range(RETRY_COUNT):
73 if valve_auto_control_impl(config, period):
74 return True
76 my_lib.webapp.log.info("😵 水やりの自動実行に失敗しました。")
77 return False
80def schedule_validate(schedule_data): # noqa: C901, PLR0911
81 if len(schedule_data) != 2:
82 logging.warning("Count of entry is Invalid: %d", len(schedule_data))
83 return False
85 for entry in schedule_data:
86 for key in ["is_active", "time", "period", "wday"]:
87 if key not in entry:
88 logging.warning("Does not contain %s", key)
89 return False
90 if type(entry["is_active"]) is not bool:
91 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"]))
92 return False
93 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]):
94 logging.warning("Format of time is invalid: %s", entry["time"])
95 return False
96 if type(entry["period"]) is not int:
97 logging.warning("Type of period is invalid: %s", type(entry["period"]))
98 return False
99 if len(entry["wday"]) != 7:
100 logging.warning("Count of wday is Invalid: %d", len(entry["wday"]))
101 return False
102 for i, wday_flag in enumerate(entry["wday"]):
103 if type(wday_flag) is not bool:
104 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i]))
105 return False
106 return True
109def schedule_store(schedule_data):
110 global schedule_lock
111 try:
112 with schedule_lock:
113 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data)
114 except Exception:
115 logging.exception("Failed to save schedule settings.")
116 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。")
119def gen_schedule_default():
120 return [
121 {
122 "is_active": False,
123 "time": "00:00",
124 "period": 1,
125 "wday": [True] * 7,
126 }
127 ] * 2
130def schedule_load():
131 global schedule_lock
133 schedule_default = gen_schedule_default()
135 try:
136 with schedule_lock:
137 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default)
138 if schedule_validate(schedule_data):
139 return schedule_data
140 except Exception:
141 logging.exception("Failed to load schedule settings.")
142 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。")
144 return schedule_default
147def set_schedule(config, schedule_data): # noqa: C901
148 scheduler = get_scheduler()
149 scheduler.clear()
151 for entry in schedule_data:
152 if not entry["is_active"]:
153 continue
155 if entry["wday"][0]:
156 scheduler.every().sunday.at(entry["time"], my_lib.time.get_pytz()).do(
157 valve_auto_control, config, entry["period"]
158 )
159 if entry["wday"][1]:
160 scheduler.every().monday.at(entry["time"], my_lib.time.get_pytz()).do(
161 valve_auto_control, config, entry["period"]
162 )
163 if entry["wday"][2]:
164 scheduler.every().tuesday.at(entry["time"], my_lib.time.get_pytz()).do(
165 valve_auto_control, config, entry["period"]
166 )
167 if entry["wday"][3]:
168 scheduler.every().wednesday.at(entry["time"], my_lib.time.get_pytz()).do(
169 valve_auto_control, config, entry["period"]
170 )
171 if entry["wday"][4]:
172 scheduler.every().thursday.at(entry["time"], my_lib.time.get_pytz()).do(
173 valve_auto_control, config, entry["period"]
174 )
175 if entry["wday"][5]:
176 scheduler.every().friday.at(entry["time"], my_lib.time.get_pytz()).do(
177 valve_auto_control, config, entry["period"]
178 )
179 if entry["wday"][6]:
180 scheduler.every().saturday.at(entry["time"], my_lib.time.get_pytz()).do(
181 valve_auto_control, config, entry["period"]
182 )
184 for job in scheduler.get_jobs():
185 logging.info("Next run: %s", job.next_run)
187 idle_sec = scheduler.idle_seconds
188 if idle_sec is not None:
189 hours, remainder = divmod(idle_sec, 3600)
190 minutes, seconds = divmod(remainder, 60)
192 logging.info(
193 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)",
194 my_lib.time.now().strftime("%Y-%m-%d %H:%M"),
195 hours,
196 minutes,
197 seconds,
198 )
200 return idle_sec
203def schedule_worker(config, queue):
204 global should_terminate
206 sleep_sec = 0.25
207 scheduler = get_scheduler()
209 logging.info("Load schedule")
210 set_schedule(config, schedule_load())
212 logging.info("Start schedule worker")
214 i = 0
215 while True:
216 if should_terminate.is_set():
217 scheduler.clear()
218 break
219 try:
220 if not queue.empty():
221 schedule_data = queue.get()
222 set_schedule(config, schedule_data)
224 scheduler.run_pending()
225 logging.debug("Sleep %.1f sec...", sleep_sec)
226 time.sleep(sleep_sec)
227 except OverflowError: # pragma: no cover
228 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する
229 logging.debug(traceback.format_exc())
231 if i % (10 / sleep_sec) == 0:
232 my_lib.footprint.update(config["liveness"]["file"]["scheduler"])
234 i += 1
236 logging.info("Terminate schedule worker")
239if __name__ == "__main__":
240 import datetime
241 import multiprocessing
242 import multiprocessing.pool
244 import logger
245 import my_lib.webapp.config
247 logger.init("test", level=logging.DEBUG)
249 def test_func():
250 logging.info("TEST")
252 should_terminate.set()
254 queue = multiprocessing.Queue()
256 pool = multiprocessing.pool.ThreadPool(processes=1)
257 result = pool.apply_async(schedule_worker, (queue,))
259 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5)
260 queue.put([{"time": exec_time.strftime("%H:%M"), "func": test_func}])
262 # NOTE: 終了するのを待つ
263 result.get()