Coverage for src/unit_cooler/actuator/control.py: 97%
46 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
1#!/usr/bin/env python3
2import logging
3import os
5import my_lib.time
7import unit_cooler.actuator.valve
8import unit_cooler.const
9import unit_cooler.util
11HAZARD_NOTIFY_INTERVAL_MIN = 30
14def gen_handle(config, message_queue):
15 return {
16 "config": config,
17 "message_queue": message_queue,
18 "receive_time": my_lib.time.now(),
19 "receive_count": 0,
20 }
23def hazard_register(config):
24 my_lib.footprint.update(config["actuator"]["control"]["hazard"]["file"])
27def hazard_clear(config):
28 my_lib.footprint.clear(config["actuator"]["control"]["hazard"]["file"])
31def hazard_notify(config, message):
32 if (
33 my_lib.footprint.elapsed(config["actuator"]["control"]["hazard"]["file"]) / 60
34 > HAZARD_NOTIFY_INTERVAL_MIN
35 ):
36 unit_cooler.actuator.work_log.add(message, unit_cooler.const.LOG_LEVEL.ERROR)
38 hazard_register(config)
40 unit_cooler.actuator.valve.set_state(unit_cooler.const.VALVE_STATE.CLOSE)
43def hazard_check(config):
44 if my_lib.footprint.exists(config["actuator"]["control"]["hazard"]["file"]):
45 hazard_notify(config, "過去に水漏れもしくは電磁弁の故障が検出されているので制御を停止しています。")
46 return True
47 else:
48 return False
51def get_control_message_impl(handle, last_message):
52 if handle["message_queue"].empty():
53 if (my_lib.time.now() - handle["receive_time"]).total_seconds() > handle["config"]["controller"][
54 "interval_sec"
55 ] * 3:
56 unit_cooler.actuator.work_log.add(
57 "冷却モードの指示を受信できません。", unit_cooler.const.LOG_LEVEL.ERROR
58 )
60 return last_message
62 control_message = None
63 while not handle["message_queue"].empty(): 63 ↛ 75line 63 didn't jump to line 75 because the condition on line 63 was always true
64 control_message = handle["message_queue"].get()
66 logging.info("Receive: %s", control_message)
68 handle["receive_time"] = my_lib.time.now()
69 handle["receive_count"] += 1
70 if os.environ.get("TEST", "false") == "true": 70 ↛ 63line 70 didn't jump to line 63 because the condition on line 70 was always true
71 # NOTE: テスト時は、コマンドの数を整合させたいので、
72 # 1 回に1個のコマンドのみ処理する。
73 break
75 if control_message["mode_index"] != last_message["mode_index"]:
76 unit_cooler.actuator.work_log.add(
77 ("冷却モードが変更されました。({before} → {after})").format(
78 before="init" if last_message["mode_index"] == -1 else last_message["mode_index"],
79 after=control_message["mode_index"],
80 )
81 )
83 return control_message
86def get_control_message(handle, last_message):
87 try:
88 return get_control_message_impl(handle, last_message)
89 except OverflowError: # pragma: no cover
90 # NOTE: テストする際、timemachinefreezer 使って日付をいじるとこの例外が発生する
91 logging.exception("Failed to get control message")
92 return last_message
95def execute(config, control_message):
96 if hazard_check(config):
97 control_message = {"mode_index": 0, "state": unit_cooler.const.COOLING_STATE.IDLE}
99 unit_cooler.actuator.valve.set_cooling_state(control_message)