Coverage for src / rasp_shutter / control / scheduler.py: 90%
367 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-13 00:10 +0900
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-13 00:10 +0900
1#!/usr/bin/env python3
2import datetime
3import enum
4import logging
5import re
6import threading
7import time
8import traceback
9from typing import Any
11import my_lib.footprint
12import my_lib.pytest_util
13import my_lib.serializer
14import my_lib.time
15import my_lib.webapp.config
16import my_lib.webapp.log
17import schedule
19import rasp_shutter.config
20import rasp_shutter.control.config
21import rasp_shutter.control.webapi.control
22import rasp_shutter.control.webapi.sensor
23import rasp_shutter.type_defs
24import rasp_shutter.util
27class BRIGHTNESS_STATE(enum.IntEnum):
28 DARK = 0
29 BRIGHT = 1
30 UNKNOWN = 2
33RETRY_COUNT = 3
35# schedule ライブラリの曜日メソッド名(日曜始まり、wday[0]=日曜 に対応)
36WEEKDAY_METHODS = ["sunday", "monday", "tuesday", "wednesday", "thursday", "friday", "saturday"]
38# 時刻フォーマット検証用パターン(HH:MM形式)
39SCHEDULE_TIME_PATTERN = re.compile(r"\d{2}:\d{2}")
41should_terminate = threading.Event()
43# Worker-specific instances for pytest-xdist parallel execution
44_scheduler_instances: dict[str, schedule.Scheduler] = {}
45_schedule_data_instances: dict[str, rasp_shutter.type_defs.ScheduleData | None] = {}
46_schedule_lock_instances: dict[str, threading.Lock] = {}
47_auto_control_events: dict[str, threading.Event] = {}
49# ワーカー固有のループシーケンス番号(テスト同期用)
50_loop_sequence: dict[str, int] = {}
51_loop_condition: dict[str, threading.Condition] = {}
52_loop_condition_lock = threading.Lock()
55def get_scheduler() -> schedule.Scheduler:
56 """Get worker-specific scheduler instance for pytest-xdist parallel execution"""
57 worker_id = my_lib.pytest_util.get_worker_id()
59 if worker_id not in _scheduler_instances:
60 # Create a new scheduler instance for this worker
61 _scheduler_instances[worker_id] = schedule.Scheduler()
63 return _scheduler_instances[worker_id]
66def get_schedule_lock() -> threading.Lock:
67 """Get worker-specific schedule lock for pytest-xdist parallel execution"""
68 worker_id = my_lib.pytest_util.get_worker_id()
70 if worker_id not in _schedule_lock_instances:
71 _schedule_lock_instances[worker_id] = threading.Lock()
73 return _schedule_lock_instances[worker_id]
76def clear_scheduler_jobs() -> None:
77 """スケジューラのジョブとスケジュールデータをクリア(テスト用)
79 テスト間でスケジューラの状態が干渉しないようにするため、
80 各テスト開始前に呼び出す。
81 """
82 worker_id = my_lib.pytest_util.get_worker_id()
84 # スケジューラインスタンスのジョブをクリア
85 if worker_id in _scheduler_instances: 85 ↛ 91line 85 didn't jump to line 91 because the condition on line 85 was always true
86 scheduler = _scheduler_instances[worker_id]
87 scheduler.clear()
88 logging.debug("Cleared scheduler jobs for worker %s", worker_id)
90 # スケジュールデータをクリア
91 if worker_id in _schedule_data_instances: 91 ↛ exitline 91 didn't return from function 'clear_scheduler_jobs' because the condition on line 91 was always true
92 _schedule_data_instances[worker_id] = None
93 logging.debug("Cleared schedule data for worker %s", worker_id)
96def reset_loop_sequence() -> None:
97 """ループシーケンス番号をリセット(テスト用)
99 テスト間でシーケンス番号が累積しないようにリセットする。
100 """
101 worker_id = my_lib.pytest_util.get_worker_id()
102 if worker_id in _loop_sequence:
103 _loop_sequence[worker_id] = 0
104 logging.debug("Reset loop sequence for worker %s", worker_id)
107def get_auto_control_event():
108 """テスト同期用のワーカー固有自動制御イベントを取得"""
109 worker_id = my_lib.pytest_util.get_worker_id()
111 if worker_id not in _auto_control_events:
112 _auto_control_events[worker_id] = threading.Event()
114 return _auto_control_events[worker_id]
117def _signal_auto_control_completed():
118 """自動制御サイクルの完了をシグナル(テスト用)"""
119 # テスト環境でのみイベントを設定
120 if rasp_shutter.util.is_pytest_running(): 120 ↛ exitline 120 didn't return from function '_signal_auto_control_completed' because the condition on line 120 was always true
121 event = get_auto_control_event()
122 event.set()
125def wait_for_auto_control_completion(timeout=5.0):
126 """自動制御の完了を待機(テスト用)"""
127 if not rasp_shutter.util.is_pytest_running():
128 return True
130 event = get_auto_control_event()
131 event.clear() # 待機前にクリア
132 return event.wait(timeout)
135def _get_loop_condition() -> threading.Condition:
136 """ループ完了通知用のConditionを取得(スレッドセーフ)"""
137 worker_id = my_lib.pytest_util.get_worker_id()
138 with _loop_condition_lock:
139 if worker_id not in _loop_condition:
140 _loop_condition[worker_id] = threading.Condition()
141 return _loop_condition[worker_id]
144def get_loop_sequence() -> int:
145 """現在のループシーケンス番号を取得"""
146 worker_id = my_lib.pytest_util.get_worker_id()
147 return _loop_sequence.get(worker_id, 0)
150def _increment_loop_sequence() -> None:
151 """ループシーケンス番号をインクリメントして通知"""
152 worker_id = my_lib.pytest_util.get_worker_id()
153 condition = _get_loop_condition()
154 with condition:
155 _loop_sequence[worker_id] = _loop_sequence.get(worker_id, 0) + 1
156 condition.notify_all()
159def wait_for_loop_after(sequence: int, timeout: float = 10.0) -> bool:
160 """指定シーケンス番号より大きくなるまで待機
162 Args:
163 sequence: 待機開始時のシーケンス番号
164 timeout: タイムアウト秒数
166 Returns:
167 成功したら True、タイムアウトしたら False
168 """
169 # NOTE: threading.Condition を使用して効率的に待機する。
170 # _increment_loop_sequence() が notify_all() を呼ぶので、
171 # シーケンス番号がインクリメントされた時点で即座に待機が終了する。
172 # time_machine の影響を避けるため、time.perf_counter() でタイムアウトをチェックし、
173 # Condition.wait() は短い間隔で呼び出す。
174 condition = _get_loop_condition()
175 start = time.perf_counter() # time_machineの影響を受けない
176 poll_interval = 0.1 # 100ms間隔でCondition.waitを呼び出す
178 with condition:
179 while time.perf_counter() - start < timeout:
180 if get_loop_sequence() > sequence:
181 return True
182 # NOTE: Condition.wait() を使用することで、notify_all() 呼び出し時に
183 # 即座に起床する。poll_interval は time_machine の影響を受ける可能性があるが、
184 # 外側の time.perf_counter() チェックでタイムアウトを正確に管理する。
185 condition.wait(timeout=poll_interval)
187 return get_loop_sequence() > sequence
190def get_schedule_data() -> rasp_shutter.type_defs.ScheduleData | None:
191 """Get worker-specific schedule data for pytest-xdist parallel execution"""
192 worker_id = my_lib.pytest_util.get_worker_id()
194 if worker_id not in _schedule_data_instances:
195 _schedule_data_instances[worker_id] = None
197 return _schedule_data_instances[worker_id]
200def set_schedule_data(data: rasp_shutter.type_defs.ScheduleData | dict[str, Any] | None) -> None:
201 """Set worker-specific schedule data for pytest-xdist parallel execution"""
202 worker_id = my_lib.pytest_util.get_worker_id()
203 _schedule_data_instances[worker_id] = data # type: ignore[assignment]
206def init() -> None:
207 global should_terminate
209 # ワーカー固有のロックを初期化
210 get_schedule_lock()
211 should_terminate.clear()
214def term():
215 global should_terminate
217 should_terminate.set()
220def brightness_text(
221 sense_data: rasp_shutter.type_defs.SensorData,
222 cur_schedule_data: rasp_shutter.type_defs.ScheduleEntry | dict[str, Any],
223) -> str:
224 # TypedDictへの動的アクセスを避けるため、dictに展開
225 schedule_dict: dict[str, Any] = {**cur_schedule_data}
227 def sensor_text(sensor: str) -> str:
228 sensor_value = getattr(sense_data, sensor)
229 current = sensor_value.value
230 threshold = schedule_dict[sensor]
231 if current > threshold:
232 cmp = ">"
233 elif current < threshold:
234 cmp = "<"
235 else:
236 cmp = "="
237 return f"{sensor}: current {current:.1f} {cmp} threshold {threshold:.1f}"
239 text = [sensor_text(sensor) for sensor in ["solar_rad", "lux", "altitude"]]
241 return ", ".join(text)
244def check_brightness(sense_data: rasp_shutter.type_defs.SensorData, action: str) -> BRIGHTNESS_STATE:
245 if not sense_data.lux.valid or not sense_data.solar_rad.valid:
246 return BRIGHTNESS_STATE.UNKNOWN
248 schedule_data = get_schedule_data()
249 if schedule_data is None: 249 ↛ 251line 249 didn't jump to line 251 because the condition on line 249 was never true
250 # テスト間のクリア中は不明として扱う
251 return BRIGHTNESS_STATE.UNKNOWN
253 # validがTrueの場合、valueはNoneではない
254 lux_value = sense_data.lux.value
255 solar_rad_value = sense_data.solar_rad.value
256 altitude_value = sense_data.altitude.value
257 assert lux_value is not None and solar_rad_value is not None # noqa: S101
258 assert altitude_value is not None # noqa: S101
260 if action == "close":
261 close_data = schedule_data["close"]
262 if (
263 lux_value < close_data["lux"]
264 or solar_rad_value < close_data["solar_rad"]
265 or altitude_value < close_data["altitude"]
266 ):
267 logging.info("Getting darker %s", brightness_text(sense_data, close_data))
268 return BRIGHTNESS_STATE.DARK
269 else:
270 return BRIGHTNESS_STATE.BRIGHT
271 else:
272 open_data = schedule_data["open"]
273 if (
274 lux_value > open_data["lux"]
275 and solar_rad_value > open_data["solar_rad"]
276 and altitude_value > open_data["altitude"]
277 ):
278 logging.info("Getting brighter %s", brightness_text(sense_data, open_data))
279 return BRIGHTNESS_STATE.BRIGHT
280 else:
281 return BRIGHTNESS_STATE.DARK
284def exec_shutter_control_impl(
285 config: rasp_shutter.config.AppConfig,
286 state: str,
287 mode: rasp_shutter.control.webapi.control.CONTROL_MODE,
288 sense_data: rasp_shutter.type_defs.SensorData,
289 user: str,
290) -> bool:
291 try:
292 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ
293 rasp_shutter.control.webapi.control.set_shutter_state(
294 config, list(range(len(config.shutter))), state, mode, sense_data, user
295 )
296 return True
297 except Exception:
298 logging.exception("Failed to control shutter")
300 return False
303def exec_shutter_control(
304 config: rasp_shutter.config.AppConfig,
305 state: str,
306 mode: rasp_shutter.control.webapi.control.CONTROL_MODE,
307 sense_data: rasp_shutter.type_defs.SensorData,
308 user: str,
309) -> bool:
310 logging.debug("Execute shutter control")
312 for _ in range(RETRY_COUNT):
313 if exec_shutter_control_impl(config, state, mode, sense_data, user):
314 return True
315 logging.debug("Retry")
317 my_lib.webapp.log.info("😵 シャッターの制御に失敗しました。")
318 return False
321def shutter_auto_open(config: rasp_shutter.config.AppConfig) -> None:
322 logging.debug("try auto open")
324 schedule_data = get_schedule_data()
325 if schedule_data is None: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was never true
326 # テスト間のクリア中は何もしない
327 logging.debug("Schedule data not set, skipping auto open")
328 return
329 if not schedule_data["open"]["is_active"]:
330 logging.debug("inactive")
331 return
333 elapsed_pending_open = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
334 if elapsed_pending_open > rasp_shutter.control.config.ELAPSED_PENDING_OPEN_MAX_SEC:
335 # NOTE: 暗くて開けるのを延期されている場合以外は処理を行わない。
336 logging.debug("NOT pending")
337 return
339 elapsed_auto_close = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path())
340 if elapsed_auto_close < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60: 340 ↛ 342line 340 didn't jump to line 342 because the condition on line 340 was never true
341 # NOTE: 自動で閉めてから時間が経っていない場合は、処理を行わない。
342 logging.debug("just closed before %d", elapsed_auto_close)
343 return
345 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config)
346 if check_brightness(sense_data, "open") == BRIGHTNESS_STATE.BRIGHT:
347 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data)
348 my_lib.webapp.log.info(f"🌅 暗くて延期されていましたが、明るくなってきたので開けます。{sensor_text}")
350 exec_shutter_control(
351 config,
352 "open",
353 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO,
354 sense_data,
355 "sensor",
356 )
357 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
358 my_lib.footprint.clear(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path())
359 else:
360 logging.debug(
361 "Skip pendding open (solar_rad: %.1f W/m^2, lux: %.1f LUX)",
362 sense_data.solar_rad.value if sense_data.solar_rad.valid else -1,
363 sense_data.lux.value if sense_data.lux.valid else -1,
364 )
367def conv_schedule_time_to_datetime(schedule_time: str) -> datetime.datetime:
368 now = my_lib.time.now()
369 time_obj = datetime.datetime.strptime(schedule_time, "%H:%M").time()
370 return datetime.datetime.combine(now.date(), time_obj, tzinfo=my_lib.time.get_zoneinfo())
373def shutter_auto_close(config: rasp_shutter.config.AppConfig) -> None:
374 logging.debug("try auto close")
376 schedule_data = get_schedule_data()
377 if schedule_data is None: 377 ↛ 379line 377 didn't jump to line 379 because the condition on line 377 was never true
378 # テスト間のクリア中は何もしない
379 logging.debug("Schedule data not set, skipping auto close")
380 return
381 if not schedule_data["close"]["is_active"]:
382 logging.debug("inactive")
383 return
384 elif abs(
385 my_lib.time.now() - conv_schedule_time_to_datetime(schedule_data["open"]["time"])
386 ) < datetime.timedelta(minutes=1):
387 # NOTE: 開ける時刻付近の場合は処理しない
388 logging.debug("near open time")
389 return
390 elif (
391 my_lib.time.now() <= conv_schedule_time_to_datetime(schedule_data["open"]["time"])
392 ) or my_lib.footprint.exists(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()):
393 # NOTE: 開ける時刻よりも早い場合は処理しない
394 logging.debug("before open time")
395 return
396 elif conv_schedule_time_to_datetime(schedule_data["close"]["time"]) <= my_lib.time.now():
397 # NOTE: スケジュールで閉めていた場合は処理しない
398 logging.debug("after close time")
399 return
400 elif (
401 my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path())
402 <= rasp_shutter.control.config.ELAPSED_AUTO_CLOSE_MAX_SEC
403 ):
404 # NOTE: 12時間以内に自動で閉めていた場合は処理しない
405 logging.debug("already close")
406 return
408 for index in range(len(config.shutter)):
409 elapsed_open = my_lib.footprint.elapsed(
410 rasp_shutter.control.webapi.control.exec_stat_file("open", index)
411 )
412 if elapsed_open < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60:
413 # NOTE: 自動で開けてから時間が経っていない場合は、処理を行わない。
414 logging.debug("just opened before %d sec (%d)", elapsed_open, index)
415 return
417 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config)
418 if check_brightness(sense_data, "close") == BRIGHTNESS_STATE.DARK:
419 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data)
420 my_lib.webapp.log.info(
421 f"🌇 予定より早いですが、暗くなってきたので閉めます。{sensor_text}",
422 )
424 exec_shutter_control(
425 config,
426 "close",
427 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO,
428 sense_data,
429 "sensor",
430 )
431 logging.info("Set Auto CLOSE")
432 my_lib.footprint.update(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path())
434 # NOTE: まだ明るくなる可能性がある時間帯の場合、再度自動的に開けるようにする
435 hour = my_lib.time.now().hour
436 if (
437 hour > rasp_shutter.control.config.HOUR_MORNING_START
438 and hour < rasp_shutter.control.config.HOUR_PENDING_OPEN_END
439 ):
440 logging.info("Set Pending OPEN")
441 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
443 else: # pragma: no cover
444 # NOTE: pending close の制御は無いのでここには来ない。
445 logging.debug(
446 "Skip pendding close (solar_rad: %.1f W/m^2, lux: %.1f LUX)",
447 sense_data.solar_rad.value if sense_data.solar_rad.valid else -1,
448 sense_data.lux.value if sense_data.lux.valid else -1,
449 )
452def shutter_auto_control(config: rasp_shutter.config.AppConfig) -> None:
453 hour = my_lib.time.now().hour
454 cfg = rasp_shutter.control.config
456 # NOTE: 時間帯によって自動制御の内容を分ける
457 if hour > cfg.HOUR_MORNING_START and hour < cfg.HOUR_AUTO_OPEN_END:
458 shutter_auto_open(config)
460 if hour > cfg.HOUR_MORNING_START and hour < cfg.HOUR_AUTO_CLOSE_END:
461 shutter_auto_close(config)
463 # テスト同期用の完了シグナル
464 _signal_auto_control_completed()
467def shutter_schedule_control(config: rasp_shutter.config.AppConfig, state: str) -> None:
468 logging.info("Execute schedule control")
470 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config)
472 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.UNKNOWN:
473 error_sensor = []
475 if not sense_data.solar_rad.valid:
476 error_sensor.append("日射センサ")
477 if not sense_data.lux.valid:
478 error_sensor.append("照度センサ")
480 error_sensor_text = "と".join(error_sensor)
481 state_text = rasp_shutter.type_defs.state_to_action_text(state)
482 my_lib.webapp.log.error(f"😵 {error_sensor_text}の値が不明なので{state_text}るのを見合わせました。")
483 _signal_auto_control_completed()
484 return
486 if state == "open":
487 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.DARK: 487 ↛ 503line 487 didn't jump to line 503 because the condition on line 487 was always true
488 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data)
489 my_lib.webapp.log.info(f"📝 まだ暗いので開けるのを見合わせました。{sensor_text}")
491 rasp_shutter.control.webapi.control.cmd_hist_push(
492 {
493 "cmd": "pending",
494 "state": state,
495 }
496 )
498 # NOTE: 暗いので開けれなかったことを通知
499 logging.info("Set Pending OPEN")
500 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
501 else:
502 # NOTE: ここにきたときのみ、スケジュールに従って開ける
503 exec_shutter_control(
504 config,
505 state,
506 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE,
507 sense_data,
508 "scheduler",
509 )
510 else:
511 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
512 exec_shutter_control(
513 config,
514 state,
515 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE,
516 sense_data,
517 "scheduler",
518 )
520 # テスト同期用の完了シグナル
521 _signal_auto_control_completed()
524SCHEDULE_FIELD_TYPES: dict[str, type] = {
525 "is_active": bool,
526 "lux": int,
527 "altitude": int,
528 "solar_rad": int,
529}
532def schedule_validate(schedule_data: dict) -> bool:
533 if len(schedule_data) != 2:
534 logging.warning("Count of entry is Invalid: %d", len(schedule_data))
535 return False
537 for entry in schedule_data.values():
538 for key in ["is_active", "time", "wday", "solar_rad", "lux", "altitude"]:
539 if key not in entry:
540 logging.warning("Does not contain %s", key)
541 return False
543 # 辞書ベースのループで型チェック
544 for field, expected_type in SCHEDULE_FIELD_TYPES.items():
545 if not isinstance(entry.get(field), expected_type):
546 logging.warning("Type of %s is invalid: %s", field, type(entry.get(field)))
547 return False
549 if not SCHEDULE_TIME_PATTERN.search(entry["time"]):
550 logging.warning("Format of time is invalid: %s", entry["time"])
551 return False
552 if len(entry["wday"]) != 7:
553 logging.warning("Count of wday is Invalid: %d", len(entry["wday"]))
554 return False
555 for i, wday_flag in enumerate(entry["wday"]):
556 if not isinstance(wday_flag, bool):
557 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i]))
558 return False
559 return True
562def schedule_store(schedule_data: dict) -> None:
563 schedule_path = my_lib.webapp.config.SCHEDULE_FILE_PATH
564 assert schedule_path is not None, "SCHEDULE_FILE_PATH not configured" # noqa: S101
566 try:
567 with get_schedule_lock():
568 my_lib.serializer.store(schedule_path, schedule_data)
569 except Exception:
570 logging.exception("Failed to save schedule settings.")
571 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。")
574def gen_schedule_default():
575 _cfg = rasp_shutter.control.config
576 schedule_data = {
577 "is_active": False,
578 "time": "00:00",
579 "solar_rad": 0,
580 "lux": 0,
581 "altitude": 0,
582 "wday": [True] * 7,
583 }
585 return {
586 "open": schedule_data
587 | {
588 "time": _cfg.DEFAULT_OPEN_TIME,
589 "solar_rad": _cfg.DEFAULT_OPEN_SOLAR_RAD,
590 "lux": _cfg.DEFAULT_OPEN_LUX,
591 },
592 "close": schedule_data
593 | {
594 "time": _cfg.DEFAULT_CLOSE_TIME,
595 "solar_rad": _cfg.DEFAULT_CLOSE_SOLAR_RAD,
596 "lux": _cfg.DEFAULT_CLOSE_LUX,
597 },
598 }
601def schedule_load() -> dict:
602 schedule_default = gen_schedule_default()
603 schedule_path = my_lib.webapp.config.SCHEDULE_FILE_PATH
604 assert schedule_path is not None, "SCHEDULE_FILE_PATH not configured" # noqa: S101
606 try:
607 with get_schedule_lock():
608 schedule_data = my_lib.serializer.load(schedule_path, schedule_default)
609 if schedule_validate(schedule_data):
610 return schedule_data
611 except Exception:
612 logging.exception("Failed to load schedule settings.")
613 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。")
615 return schedule_default
618def set_schedule(config: rasp_shutter.config.AppConfig, schedule_data: dict) -> None:
619 scheduler = get_scheduler()
620 scheduler.clear()
622 for state, entry in schedule_data.items():
623 if not entry["is_active"]:
624 continue
626 for i, wday_method_name in enumerate(WEEKDAY_METHODS):
627 if entry["wday"][i]:
628 wday_method = getattr(scheduler.every(), wday_method_name)
629 wday_method.at(entry["time"], my_lib.time.get_pytz()).do(
630 shutter_schedule_control, config, state
631 )
633 for job in scheduler.get_jobs():
634 logging.info("Next run: %s", job.next_run)
636 idle_sec = scheduler.idle_seconds
637 if idle_sec is not None:
638 hours, remainder = divmod(idle_sec, 3600)
639 minutes, seconds = divmod(remainder, 60)
641 logging.info(
642 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)",
643 my_lib.time.now().strftime("%Y-%m-%d %H:%M"),
644 hours,
645 minutes,
646 seconds,
647 )
649 scheduler.every(1).seconds.do(shutter_auto_control, config)
652def schedule_worker(config: rasp_shutter.config.AppConfig, queue) -> None:
653 global should_terminate
655 # DUMMY_MODEではより短い間隔でループして、テストの応答性を向上
656 # 本番環境では0.5秒、テスト環境では0.1秒
657 sleep_sec = 0.1 if rasp_shutter.util.is_dummy_mode() else 0.5
658 scheduler = get_scheduler()
660 liveness_file = config.liveness.file.scheduler
662 logging.info("Load schedule")
663 schedule_data = schedule_load()
664 set_schedule_data(schedule_data)
666 set_schedule(config, schedule_data)
668 logging.info("Start schedule worker")
670 i = 0
671 while True:
672 if should_terminate.is_set():
673 scheduler.clear()
674 break
676 run_pending_elapsed = 0.0
677 try:
678 loop_start = time.perf_counter()
680 if not queue.empty():
681 schedule_data = queue.get()
682 set_schedule_data(schedule_data)
683 set_schedule(config, schedule_data)
684 schedule_store(schedule_data)
686 idle_sec = scheduler.idle_seconds # noqa: F841
688 run_pending_start = time.perf_counter()
689 scheduler.run_pending()
690 run_pending_elapsed = time.perf_counter() - run_pending_start
692 time.sleep(sleep_sec)
694 loop_elapsed = time.perf_counter() - loop_start
696 # 1秒以上かかった場合は警告ログを出力
697 if loop_elapsed > 1.0: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 logging.warning(
699 "Scheduler loop took %.2fs (run_pending: %.2fs, sequence: %d)",
700 loop_elapsed,
701 run_pending_elapsed,
702 get_loop_sequence(),
703 )
704 except OverflowError: # pragma: no cover
705 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する
706 logging.debug(traceback.format_exc())
707 except Exception: # pragma: no cover
708 # NOTE: その他の例外(ログシステムのBrokenPipeError、IOErrorなど)が発生しても
709 # スケジューラループを継続する。例外でループが停止すると、テストの同期が
710 # 取れなくなり、タイムアウトエラーが発生する。
711 logging.warning("Exception in scheduler loop, continuing: %s", traceback.format_exc())
712 finally:
713 # NOTE: 例外が発生してもシーケンス番号を更新する。
714 # テスト同期で使用されるため、ループが動いていることを常に示す必要がある。
715 _increment_loop_sequence()
717 if i % (10 / sleep_sec) == 0:
718 my_lib.footprint.update(liveness_file)
720 i += 1
722 logging.info("Terminate schedule worker")
725if __name__ == "__main__":
726 import multiprocessing
727 import multiprocessing.pool
729 import my_lib.config
730 import my_lib.logger
732 my_lib.logger.init("test", level=logging.DEBUG)
734 def test_func():
735 logging.info("TEST")
737 should_terminate.set()
739 config = my_lib.config.load()
740 queue: multiprocessing.Queue[dict] = multiprocessing.Queue()
742 init()
744 pool = multiprocessing.pool.ThreadPool(processes=1)
745 result = pool.apply_async(schedule_worker, (config, queue))
747 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5)
748 queue.put(
749 {
750 "open": {
751 "time": exec_time.strftime("%H:%M"),
752 "is_active": True,
753 "wday": [True] * 7,
754 "solar_rad": 0,
755 "lux": 0,
756 "altitude": 0,
757 "func": test_func,
758 }
759 }
760 )
762 # NOTE: 終了するのを待つ
763 result.get()