Coverage for flask/src/rasp_shutter/control/scheduler.py: 34%
289 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-23 19:38 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-23 19:38 +0900
1#!/usr/bin/env python3
2import datetime
3import enum
4import logging
5import os
6import pathlib
7import re
8import threading
9import time
10import traceback
12import my_lib.footprint
13import my_lib.serializer
14import my_lib.webapp.config
15import my_lib.webapp.log
16import rasp_shutter.control.config
17import rasp_shutter.control.webapi.control
18import rasp_shutter.control.webapi.sensor
19import schedule
22class BRIGHTNESS_STATE(enum.IntEnum): # noqa: N801
23 DARK = 0
24 BRIGHT = 1
25 UNKNOWN = 2
28RETRY_COUNT = 3
30schedule_lock = None
31schedule_data = None
32should_terminate = threading.Event()
34# Worker-specific instances for pytest-xdist parallel execution
35_scheduler_instances = {}
36_schedule_data_instances = {}
37_auto_control_events = {}
40def get_scheduler():
41 """Get worker-specific scheduler instance for pytest-xdist parallel execution"""
42 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main")
44 if worker_id not in _scheduler_instances: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was never true
45 # Create a new scheduler instance for this worker
46 _scheduler_instances[worker_id] = schedule.Scheduler()
48 return _scheduler_instances[worker_id]
51def get_auto_control_event():
52 """テスト同期用のワーカー固有自動制御イベントを取得"""
53 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main")
55 if worker_id not in _auto_control_events: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 _auto_control_events[worker_id] = threading.Event()
58 return _auto_control_events[worker_id]
61def _signal_auto_control_completed():
62 """自動制御サイクルの完了をシグナル(テスト用)"""
63 # テスト環境でのみイベントを設定
64 if os.environ.get("PYTEST_CURRENT_TEST"):
65 event = get_auto_control_event()
66 event.set()
69def wait_for_auto_control_completion(timeout=5.0):
70 """自動制御の完了を待機(テスト用)"""
71 if not os.environ.get("PYTEST_CURRENT_TEST"): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 return True
74 event = get_auto_control_event()
75 event.clear() # 待機前にクリア
76 return event.wait(timeout)
79def get_schedule_data():
80 """Get worker-specific schedule data for pytest-xdist parallel execution"""
81 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main")
83 if worker_id not in _schedule_data_instances:
84 _schedule_data_instances[worker_id] = None
86 return _schedule_data_instances[worker_id]
89def set_schedule_data(data):
90 """Set worker-specific schedule data for pytest-xdist parallel execution"""
91 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main")
92 _schedule_data_instances[worker_id] = data
95def init():
96 global schedule_lock # noqa: PLW0603
97 global should_terminate
99 schedule_lock = threading.Lock()
100 should_terminate.clear()
103def term():
104 global should_terminate
106 should_terminate.set()
109def brightness_text(sense_data, cur_schedule_data):
110 text = [
111 "{sensor}: current {current:.1f} {cmp} threshold {threshold:.1f}".format(
112 sensor=sensor,
113 current=sense_data[sensor]["value"],
114 threshold=cur_schedule_data[sensor],
115 cmp=">"
116 if sense_data[sensor]["value"] > cur_schedule_data[sensor]
117 else ("<" if sense_data[sensor]["value"] < cur_schedule_data[sensor] else "="),
118 )
119 for sensor in ["solar_rad", "lux", "altitude"]
120 ]
122 return ", ".join(text)
125def check_brightness(sense_data, action):
126 if (not sense_data["lux"]["valid"]) or (not sense_data["solar_rad"]["valid"]):
127 return BRIGHTNESS_STATE.UNKNOWN
129 schedule_data = get_schedule_data()
131 if action == "close":
132 if (
133 (sense_data["lux"]["value"] < schedule_data[action]["lux"])
134 and (sense_data["solar_rad"]["value"] < schedule_data[action]["solar_rad"])
135 ) or (sense_data["altitude"]["value"] < schedule_data[action]["altitude"]):
136 logging.info("Getting darker %s", brightness_text(sense_data, schedule_data[action]))
137 return BRIGHTNESS_STATE.DARK
138 else:
139 return BRIGHTNESS_STATE.BRIGHT
140 else: # noqa: PLR5501
141 if (
142 (sense_data["lux"]["value"] > schedule_data[action]["lux"])
143 and (sense_data["solar_rad"]["value"] > schedule_data[action]["solar_rad"])
144 and (sense_data["altitude"]["value"] > schedule_data[action]["altitude"])
145 ):
146 logging.info("Getting brighter %s", brightness_text(sense_data, schedule_data[action]))
147 return BRIGHTNESS_STATE.BRIGHT
148 else:
149 return BRIGHTNESS_STATE.DARK
152def exec_shutter_control_impl(config, state, mode, sense_data, user):
153 try:
154 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ
155 rasp_shutter.control.webapi.control.set_shutter_state(
156 config, list(range(len(config["shutter"]))), state, mode, sense_data, user
157 )
158 return True
159 except Exception as e:
160 logging.warning(e)
161 logging.warning(traceback.format_exc())
163 return False
166def exec_shutter_control(config, state, mode, sense_data, user):
167 logging.debug("Execute shutter control")
169 for _ in range(RETRY_COUNT):
170 if exec_shutter_control_impl(config, state, mode, sense_data, user):
171 return True
172 logging.debug("Retry")
174 my_lib.webapp.log.info("😵 シャッターの制御に失敗しました。")
175 return False
178def shutter_auto_open(config):
179 logging.debug("try auto open")
181 schedule_data = get_schedule_data()
182 if not schedule_data["open"]["is_active"]:
183 logging.debug("inactive")
184 return
186 elapsed_pendiing_open = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_PENDING_OPEN)
187 if elapsed_pendiing_open > 6 * 60 * 60:
188 # NOTE: 暗くて開けるのを延期されている場合以外は処理を行わない。
189 logging.debug("NOT pending")
190 return
192 if (
193 my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE)
194 < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60
195 ):
196 # NOTE: 自動で閉めてから時間が経っていない場合は、処理を行わない。
197 logging.debug(
198 "just closed before %d", my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE)
199 )
200 return
202 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config)
203 if check_brightness(sense_data, "open") == BRIGHTNESS_STATE.BRIGHT:
204 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data)
205 my_lib.webapp.log.info(f"📝 暗くて延期されていましたが、明るくなってきたので開けます。{sensor_text}")
207 exec_shutter_control(
208 config,
209 "open",
210 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO,
211 sense_data,
212 "sensor",
213 )
214 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN)
215 my_lib.footprint.clear(rasp_shutter.control.config.STAT_AUTO_CLOSE)
216 else:
217 logging.debug(
218 "Skip pendding open (solar_rad: %.1f W/m^2, lux: %.1f LUX)",
219 sense_data["solar_rad"]["value"] if sense_data["solar_rad"]["valid"] else -1,
220 sense_data["lux"]["value"] if sense_data["lux"]["valid"] else -1,
221 )
224def conv_schedule_time_to_datetime(schedule_time):
225 return (
226 datetime.datetime.strptime(
227 my_lib.time.now().strftime("%Y/%m/%d ") + schedule_time,
228 "%Y/%m/%d %H:%M",
229 )
230 ).replace(
231 tzinfo=my_lib.time.get_zoneinfo(),
232 day=my_lib.time.now().day,
233 )
236def shutter_auto_close(config):
237 logging.debug("try auto close")
239 schedule_data = get_schedule_data()
240 if not schedule_data["close"]["is_active"]:
241 logging.debug("inactive")
242 return
243 elif abs(
244 my_lib.time.now() - conv_schedule_time_to_datetime(schedule_data["open"]["time"])
245 ) < datetime.timedelta(minutes=1):
246 # NOTE: 開ける時刻付近の場合は処理しない
247 logging.debug("near open time")
248 return
249 elif (
250 my_lib.time.now() <= conv_schedule_time_to_datetime(schedule_data["open"]["time"])
251 ) or my_lib.footprint.exists(rasp_shutter.control.config.STAT_PENDING_OPEN):
252 # NOTE: 開ける時刻よりも早い場合は処理しない
253 logging.debug("before open time")
254 return
255 elif conv_schedule_time_to_datetime(schedule_data["close"]["time"]) <= my_lib.time.now():
256 # NOTE: スケジュールで閉めていた場合は処理しない
257 logging.debug("after close time")
258 return
259 elif my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE) <= 12 * 60 * 60:
260 # NOTE: 12時間以内に自動で閉めていた場合は処理しない
261 logging.debug("already close")
262 return
264 for index in range(len(config["shutter"])):
265 if (
266 my_lib.footprint.elapsed(rasp_shutter.control.webapi.control.exec_stat_file("open", index))
267 < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60
268 ):
269 # NOTE: 自動で開けてから時間が経っていない場合は、処理を行わない。
270 logging.debug(
271 "just opened before %d sec (%d)",
272 my_lib.footprint.elapsed(rasp_shutter.control.webapi.control.exec_stat_file("open", index)),
273 index,
274 )
275 return
277 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config)
278 if check_brightness(sense_data, "close") == BRIGHTNESS_STATE.DARK:
279 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data)
280 my_lib.webapp.log.info(
281 f"📝 予定より早いですが、暗くなってきたので閉めます。{sensor_text}",
282 )
284 exec_shutter_control(
285 config,
286 "close",
287 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO,
288 sense_data,
289 "sensor",
290 )
291 logging.info("Set Auto CLOSE")
292 my_lib.footprint.update(rasp_shutter.control.config.STAT_AUTO_CLOSE)
294 # NOTE: まだ明るくなる可能性がある時間帯の場合、再度自動的に開けるようにする
295 hour = my_lib.time.now().hour
296 if (hour > 5) and (hour < 13):
297 logging.info("Set Pending OPEN")
298 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN)
300 else: # pragma: no cover
301 # NOTE: pending close の制御は無いのでここには来ない。
302 logging.debug(
303 "Skip pendding close (solar_rad: %.1f W/m^2, lux: %.1f LUX)",
304 sense_data["solar_rad"]["value"] if sense_data["solar_rad"]["valid"] else -1,
305 sense_data["lux"]["value"] if sense_data["lux"]["valid"] else -1,
306 )
309def shutter_auto_control(config):
310 hour = my_lib.time.now().hour
312 # NOTE: 時間帯によって自動制御の内容を分ける
313 if (hour > 5) and (hour < 12):
314 shutter_auto_open(config)
316 if (hour > 5) and (hour < 20):
317 shutter_auto_close(config)
319 # テスト同期用の完了シグナル
320 _signal_auto_control_completed()
323def shutter_schedule_control(config, state):
324 logging.info("Execute schedule control")
326 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config)
328 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.UNKNOWN:
329 error_sensor = []
331 if not sense_data["solar_rad"]["valid"]:
332 error_sensor.append("日射センサ")
333 if not sense_data["lux"]["valid"]:
334 error_sensor.append("照度センサ")
336 my_lib.webapp.log.error(
337 "😵 {error_sensor}の値が不明なので{state}るのを見合わせました。".format(
338 error_sensor="と".join(error_sensor),
339 state="開け" if state == "open" else "閉め",
340 )
341 )
342 return
344 if state == "open":
345 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.DARK:
346 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data)
347 my_lib.webapp.log.info(f"📝 まだ暗いので開けるのを見合わせました。{sensor_text}")
349 rasp_shutter.control.webapi.control.cmd_hist_push(
350 {
351 "cmd": "pending",
352 "state": state,
353 }
354 )
356 # NOTE: 暗いので開けれなかったことを通知
357 logging.info("Set Pending OPEN")
358 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN)
359 else:
360 # NOTE: ここにきたときのみ、スケジュールに従って開ける
361 exec_shutter_control(
362 config,
363 state,
364 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE,
365 sense_data,
366 "scheduler",
367 )
368 else:
369 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN)
370 exec_shutter_control(
371 config,
372 state,
373 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE,
374 sense_data,
375 "scheduler",
376 )
379def schedule_validate(schedule_data): # noqa: C901, PLR0911
380 if len(schedule_data) != 2:
381 logging.warning("Count of entry is Invalid: %d", len(schedule_data))
382 return False
384 for entry in schedule_data.values():
385 for key in ["is_active", "time", "wday", "solar_rad", "lux", "altitude"]:
386 if key not in entry:
387 logging.warning("Does not contain %s", key)
388 return False
389 if type(entry["is_active"]) is not bool:
390 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"]))
391 return False
392 if type(entry["lux"]) is not int:
393 logging.warning("Type of lux is invalid: %s", type(entry["lux"]))
394 return False
395 if type(entry["altitude"]) is not int: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 logging.warning("Type of altitude is invalid: %s", type(entry["altitude"]))
397 return False
398 if type(entry["solar_rad"]) is not int:
399 logging.warning("Type of solar_rad is invalid: %s", type(entry["solar_rad"]))
400 return False
401 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]):
402 logging.warning("Format of time is invalid: %s", entry["time"])
403 return False
404 if len(entry["wday"]) != 7:
405 logging.warning("Count of wday is Invalid: %d", len(entry["wday"]))
406 return False
407 for i, wday_flag in enumerate(entry["wday"]):
408 if type(wday_flag) is not bool:
409 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i]))
410 return False
411 return True
414def schedule_store(schedule_data):
415 global schedule_lock
416 try:
417 with schedule_lock:
418 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data)
419 except Exception:
420 logging.exception("Failed to save schedule settings.")
421 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。")
424def gen_schedule_default():
425 schedule_data = {
426 "is_active": False,
427 "time": "00:00",
428 "solar_rad": 0,
429 "lux": 0,
430 "altitude": 0,
431 "wday": [True] * 7,
432 }
434 return {
435 "open": schedule_data | {"time": "08:00", "solar_rad": 150, "lux": 1000},
436 "close": schedule_data | {"time": "17:00", "solar_rad": 80, "lux": 1200},
437 }
440def schedule_load():
441 global schedule_lock
443 schedule_default = gen_schedule_default()
445 try:
446 with schedule_lock:
447 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default)
448 if schedule_validate(schedule_data):
449 return schedule_data
450 except Exception:
451 logging.exception("Failed to load schedule settings.")
452 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。")
454 return schedule_default
457def set_schedule(config, schedule_data): # noqa: C901
458 scheduler = get_scheduler()
459 scheduler.clear()
461 for state, entry in schedule_data.items():
462 if not entry["is_active"]:
463 continue
465 if entry["wday"][0]:
466 scheduler.every().sunday.at(entry["time"], my_lib.time.get_pytz()).do(
467 shutter_schedule_control, config, state
468 )
469 if entry["wday"][1]:
470 scheduler.every().monday.at(entry["time"], my_lib.time.get_pytz()).do(
471 shutter_schedule_control, config, state
472 )
473 if entry["wday"][2]:
474 scheduler.every().tuesday.at(entry["time"], my_lib.time.get_pytz()).do(
475 shutter_schedule_control, config, state
476 )
477 if entry["wday"][3]:
478 scheduler.every().wednesday.at(entry["time"], my_lib.time.get_pytz()).do(
479 shutter_schedule_control, config, state
480 )
481 if entry["wday"][4]:
482 scheduler.every().thursday.at(entry["time"], my_lib.time.get_pytz()).do(
483 shutter_schedule_control, config, state
484 )
485 if entry["wday"][5]:
486 scheduler.every().friday.at(entry["time"], my_lib.time.get_pytz()).do(
487 shutter_schedule_control, config, state
488 )
489 if entry["wday"][6]:
490 scheduler.every().saturday.at(entry["time"], my_lib.time.get_pytz()).do(
491 shutter_schedule_control, config, state
492 )
494 for job in scheduler.get_jobs():
495 logging.info("Next run: %s", job.next_run)
497 idle_sec = scheduler.idle_seconds
498 if idle_sec is not None:
499 hours, remainder = divmod(idle_sec, 3600)
500 minutes, seconds = divmod(remainder, 60)
502 logging.info(
503 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)",
504 my_lib.time.now().strftime("%Y-%m-%d %H:%M"),
505 hours,
506 minutes,
507 seconds,
508 )
510 scheduler.every(1).seconds.do(shutter_auto_control, config)
513def schedule_worker(config, queue):
514 global should_terminate
516 sleep_sec = 0.5
517 scheduler = get_scheduler()
519 liveness_file = pathlib.Path(config["liveness"]["file"]["scheduler"])
521 logging.info("Load schedule")
522 schedule_data = schedule_load()
523 set_schedule_data(schedule_data)
525 set_schedule(config, schedule_data)
527 logging.info("Start schedule worker")
529 i = 0
530 while True:
531 if should_terminate.is_set():
532 scheduler.clear()
533 break
535 try:
536 if not queue.empty():
537 schedule_data = queue.get()
538 set_schedule_data(schedule_data)
539 set_schedule(config, schedule_data)
540 schedule_store(schedule_data)
542 idle_sec = scheduler.idle_seconds
543 if idle_sec is not None:
544 hours, remainder = divmod(idle_sec, 3600)
545 minutes, seconds = divmod(remainder, 60)
547 scheduler.run_pending()
549 logging.debug("Sleep %.1f sec...", sleep_sec)
550 time.sleep(sleep_sec)
551 except OverflowError: # pragma: no cover
552 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する
553 logging.debug(traceback.format_exc())
555 if i % (10 / sleep_sec) == 0:
556 my_lib.footprint.update(liveness_file)
558 i += 1
560 logging.info("Terminate schedule worker")
563if __name__ == "__main__":
564 import multiprocessing
565 import multiprocessing.pool
567 import my_lib.config
568 import my_lib.logger
570 my_lib.logger.init("test", level=logging.DEBUG)
572 def test_func():
573 logging.info("TEST")
575 should_terminate.set()
577 config = my_lib.config.load()
578 queue = multiprocessing.Queue()
580 init()
582 pool = multiprocessing.pool.ThreadPool(processes=1)
583 result = pool.apply_async(schedule_worker, (config, queue))
585 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5)
586 queue.put(
587 {
588 "open": {
589 "time": exec_time.strftime("%H:%M"),
590 "is_active": True,
591 "wday": [True] * 7,
592 "solar_rad": 0,
593 "lux": 0,
594 "altitude": 0,
595 "func": test_func,
596 }
597 }
598 )
600 # NOTE: 終了するのを待つ
601 result.get()