Coverage for src/unit_cooler/actuator/control.py: 94%
61 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
1#!/usr/bin/env python3
2import logging
3import os
5import my_lib.time
7import unit_cooler.actuator.valve
8import unit_cooler.const
9import unit_cooler.util
10from unit_cooler.metrics import get_metrics_collector
12HAZARD_NOTIFY_INTERVAL_MIN = 30
15def gen_handle(config, message_queue):
16 return {
17 "config": config,
18 "message_queue": message_queue,
19 "receive_time": my_lib.time.now(),
20 "receive_count": 0,
21 }
24def hazard_register(config):
25 my_lib.footprint.update(config["actuator"]["control"]["hazard"]["file"])
28def hazard_clear(config):
29 my_lib.footprint.clear(config["actuator"]["control"]["hazard"]["file"])
32def hazard_notify(config, message):
33 if (
34 my_lib.footprint.elapsed(config["actuator"]["control"]["hazard"]["file"]) / 60
35 > HAZARD_NOTIFY_INTERVAL_MIN
36 ):
37 unit_cooler.actuator.work_log.add(message, unit_cooler.const.LOG_LEVEL.ERROR)
39 hazard_register(config)
41 unit_cooler.actuator.valve.set_state(unit_cooler.const.VALVE_STATE.CLOSE)
44def hazard_check(config):
45 if my_lib.footprint.exists(config["actuator"]["control"]["hazard"]["file"]):
46 hazard_notify(config, "過去に水漏れもしくは電磁弁の故障が検出されているので制御を停止しています。")
47 return True
48 else:
49 return False
52def get_control_message_impl(handle, last_message):
53 if handle["message_queue"].empty():
54 if (my_lib.time.now() - handle["receive_time"]).total_seconds() > handle["config"]["controller"][
55 "interval_sec"
56 ] * 3:
57 unit_cooler.actuator.work_log.add(
58 "冷却モードの指示を受信できません。", unit_cooler.const.LOG_LEVEL.ERROR
59 )
61 return last_message
63 control_message = None
64 while not handle["message_queue"].empty(): 64 ↛ 76line 64 didn't jump to line 76 because the condition on line 64 was always true
65 control_message = handle["message_queue"].get()
67 logging.info("Receive: %s", control_message)
69 handle["receive_time"] = my_lib.time.now()
70 handle["receive_count"] += 1
71 if os.environ.get("TEST", "false") == "true": 71 ↛ 64line 71 didn't jump to line 64 because the condition on line 71 was always true
72 # NOTE: テスト時は、コマンドの数を整合させたいので、
73 # 1 回に1個のコマンドのみ処理する。
74 break
76 if control_message["mode_index"] != last_message["mode_index"]:
77 unit_cooler.actuator.work_log.add(
78 ("冷却モードが変更されました。({before} → {after})").format(
79 before="init" if last_message["mode_index"] == -1 else last_message["mode_index"],
80 after=control_message["mode_index"],
81 )
82 )
84 return control_message
87def get_control_message(handle, last_message):
88 try:
89 return get_control_message_impl(handle, last_message)
90 except OverflowError: # pragma: no cover
91 # NOTE: テストする際、timemachinefreezer 使って日付をいじるとこの例外が発生する
92 logging.exception("Failed to get control message")
93 return last_message
96def execute(config, control_message):
97 if hazard_check(config):
98 control_message = {"mode_index": 0, "state": unit_cooler.const.COOLING_STATE.IDLE}
100 # メトリクス収集
101 try:
102 metrics_db_path = config["actuator"]["metrics"]["data"]
103 metrics_collector = get_metrics_collector(metrics_db_path)
105 # 冷却モードの記録
106 cooling_mode = control_message.get("mode_index", 0)
107 metrics_collector.update_cooling_mode(cooling_mode)
109 # Duty比の記録(control_messageに含まれている場合)
110 if "duty" in control_message:
111 duty_info = control_message["duty"]
112 if duty_info.get("enable", False):
113 on_time = duty_info.get("on_sec", 0)
114 total_time = on_time + duty_info.get("off_sec", 0)
115 if total_time > 0: 115 ↛ 120line 115 didn't jump to line 120 because the condition on line 115 was always true
116 metrics_collector.update_duty_ratio(on_time, total_time)
117 except Exception:
118 logging.exception("Failed to collect metrics data")
120 unit_cooler.actuator.valve.set_cooling_state(control_message)