Coverage for flask/src/rasp_water/scheduler.py: 100%
128 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-24 13:56 +0900
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-24 13:56 +0900
1#!/usr/bin/env python3
2import datetime
3import logging
4import pathlib
5import re
6import threading
7import time
8import traceback
10import my_lib.footprint
11import my_lib.serializer
12import my_lib.webapp.config
13import my_lib.webapp.log
14import rasp_water.webapp_valve
15import schedule
17RETRY_COUNT = 3
19schedule_lock = None
20should_terminate = threading.Event()
23def init():
24 global schedule_lock # noqa: PLW0603
25 schedule_lock = threading.Lock()
28def valve_auto_control_impl(config, period):
29 try:
30 # NOTE: Web 経由だと認証つけた場合に困るので,直接関数を呼ぶ
31 rasp_water.webapp_valve.set_valve_state(config, 1, period * 60, True, "scheduler")
32 return True
34 # logging.debug("Request scheduled execution to {url}".format(url=url))
35 # res = requests.post(
36 # url, params={"cmd": 1, "state": 1, "period": period * 60, "auto": True}
37 # )
38 # logging.debug(res.text)
39 # return res.status_code == 200
40 except Exception:
41 logging.exception("Failed to control valve automatically")
43 return False
46def valve_auto_control(config, period):
47 logging.info("Starts automatic control of the valve")
49 for _ in range(RETRY_COUNT):
50 if valve_auto_control_impl(config, period):
51 return True
53 my_lib.webapp.log.info("😵 水やりの自動実行に失敗しました。")
54 return False
57def schedule_validate(schedule_data): # noqa: C901, PLR0911
58 if len(schedule_data) != 2:
59 logging.warning("Count of entry is Invalid: %d", len(schedule_data))
60 return False
62 for entry in schedule_data:
63 for key in ["is_active", "time", "period", "wday"]:
64 if key not in entry:
65 logging.warning("Does not contain %s", key)
66 return False
67 if type(entry["is_active"]) is not bool:
68 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"]))
69 return False
70 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]):
71 logging.warning("Format of time is invalid: %s", entry["time"])
72 return False
73 if type(entry["period"]) is not int:
74 logging.warning("Type of period is invalid: %s", type(entry["period"]))
75 return False
76 if len(entry["wday"]) != 7:
77 logging.warning("Count of wday is Invalid: %d", len(entry["wday"]))
78 return False
79 for i, wday_flag in enumerate(entry["wday"]):
80 if type(wday_flag) is not bool:
81 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i]))
82 return False
83 return True
86def schedule_store(schedule_data):
87 global schedule_lock
88 try:
89 with schedule_lock:
90 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data)
91 except Exception:
92 logging.exception("Failed to save schedule settings.")
93 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。")
96def gen_schedule_default():
97 return [
98 {
99 "is_active": False,
100 "time": "00:00",
101 "period": 1,
102 "wday": [True] * 7,
103 }
104 ] * 2
107def schedule_load():
108 global schedule_lock
110 schedule_default = gen_schedule_default()
112 try:
113 with schedule_lock:
114 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default)
115 if schedule_validate(schedule_data):
116 return schedule_data
117 except Exception:
118 logging.exception("Failed to load schedule settings.")
119 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。")
121 return schedule_default
124def set_schedule(config, schedule_data): # noqa: C901
125 schedule.clear()
127 for entry in schedule_data:
128 if not entry["is_active"]:
129 continue
131 if entry["wday"][0]:
132 schedule.every().sunday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
133 valve_auto_control, config, entry["period"]
134 )
135 if entry["wday"][1]:
136 schedule.every().monday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
137 valve_auto_control, config, entry["period"]
138 )
139 if entry["wday"][2]:
140 schedule.every().tuesday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
141 valve_auto_control, config, entry["period"]
142 )
143 if entry["wday"][3]:
144 schedule.every().wednesday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
145 valve_auto_control, config, entry["period"]
146 )
147 if entry["wday"][4]:
148 schedule.every().thursday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
149 valve_auto_control, config, entry["period"]
150 )
151 if entry["wday"][5]:
152 schedule.every().friday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
153 valve_auto_control, config, entry["period"]
154 )
155 if entry["wday"][6]:
156 schedule.every().saturday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do(
157 valve_auto_control, config, entry["period"]
158 )
160 for job in schedule.get_jobs():
161 logging.info("Next run: %s", job.next_run)
163 idle_sec = schedule.idle_seconds()
164 if idle_sec is not None:
165 hours, remainder = divmod(idle_sec, 3600)
166 minutes, seconds = divmod(remainder, 60)
168 logging.info(
169 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)",
170 datetime.datetime.now(
171 tz=datetime.timezone(datetime.timedelta(hours=my_lib.webapp.config.TIMEZONE_OFFSET))
172 ).strftime("%Y-%m-%d %H:%M"),
173 hours,
174 minutes,
175 seconds,
176 )
178 return idle_sec
181def schedule_worker(config, queue):
182 global should_terminate
184 sleep_sec = 0.25
186 liveness_file = pathlib.Path(config["liveness"]["file"]["scheduler"])
188 logging.info("Load schedule")
189 set_schedule(config, schedule_load())
191 logging.info("Start schedule worker")
193 i = 0
194 while True:
195 if should_terminate.is_set():
196 schedule.clear()
197 break
198 try:
199 if not queue.empty():
200 schedule_data = queue.get()
201 set_schedule(config, schedule_data)
202 schedule_store(schedule_data)
204 schedule.run_pending()
205 logging.debug("Sleep %.1f sec...", sleep_sec)
206 time.sleep(sleep_sec)
207 except OverflowError: # pragma: no cover
208 # NOTE: テストする際,freezer 使って日付をいじるとこの例外が発生する
209 logging.debug(traceback.format_exc())
211 if i % (10 / sleep_sec) == 0:
212 my_lib.footprint.update(liveness_file)
214 i += 1
216 logging.info("Terminate schedule worker")
219if __name__ == "__main__":
220 import datetime
221 from multiprocessing import Queue
222 from multiprocessing.pool import ThreadPool
224 import logger
225 import my_lib.webapp.config
227 logger.init("test", level=logging.DEBUG)
229 def test_func():
230 logging.info("TEST")
232 should_terminate.set()
234 queue = Queue()
236 pool = ThreadPool(processes=1)
237 result = pool.apply_async(schedule_worker, (queue,))
239 exec_time = datetime.datetime.now(my_lib.webapp.config.TIMEZONE) + datetime.timedelta(seconds=5)
240 queue.put([{"time": exec_time.strftime("%H:%M"), "func": test_func}])
242 # NOTE: 終了するのを待つ
243 result.get()