Coverage for src/unit_cooler/controller/engine.py: 98%
62 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
1#!/usr/bin/env python3
2"""
3屋外の気象情報とエアコンの稼働状況に基づき、冷却モードを決定します。
5Usage:
6 engine.py [-c CONFIG] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml]
10 -D : デバッグモードで動作します。
11"""
13import copy
14import logging
16import my_lib.notify.slack
18import unit_cooler.controller.message
19import unit_cooler.controller.sensor
20import unit_cooler.util
22# 最低でもこの時間は ON にする (テスト時含む)
23ON_SEC_MIN = 5
24# 最低でもこの時間は OFF にする (テスト時含む)
25OFF_SEC_MIN = 5
28def dummy_cooling_mode():
29 import random
31 current_mode = dummy_cooling_mode.prev_mode
32 max_mode = len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1
34 # 60%の確率で現状維持、40%の確率で変更
35 if random.random() < 0.6: # noqa: S311
36 cooling_mode = current_mode
37 elif current_mode == 1:
38 # モード1の場合、特別な処理
39 if random.random() < 0.1: # noqa: S311 # 10%の確率で0へ
40 cooling_mode = 0
41 elif random.random() < 0.5: # noqa: S311 # 残り90%のうち50%で+1
42 cooling_mode = min(current_mode + 1, max_mode)
43 else: # 残り90%のうち50%で現状維持
44 cooling_mode = current_mode
45 elif current_mode == 0:
46 # モード0の場合、+1のみ
47 cooling_mode = 1
48 elif current_mode == max_mode: 48 ↛ 50line 48 didn't jump to line 50 because the condition on line 48 was never true
49 # 最大モードの場合、-1のみ
50 cooling_mode = current_mode - 1
51 else:
52 # その他の場合、50%で+1、50%で-1
53 cooling_mode = current_mode + 1 if random.random() < 0.5 else current_mode - 1 # noqa: S311
55 dummy_cooling_mode.prev_mode = cooling_mode
57 logging.info("cooling_mode: %d (prev: %d)", cooling_mode, current_mode)
59 return {"cooling_mode": cooling_mode}
62dummy_cooling_mode.prev_mode = 0
65def judge_cooling_mode(config, sense_data):
66 logging.info("Judge cooling mode")
68 try:
69 cooler_activity = unit_cooler.controller.sensor.get_cooler_activity(sense_data)
70 except RuntimeError as e:
71 unit_cooler.util.notify_error(config, e.args[0])
72 cooler_activity = {"status": 0, "message": None}
74 if cooler_activity["status"] == 0:
75 outdoor_status = {"status": None, "message": None}
76 cooling_mode = 0
77 else:
78 outdoor_status = unit_cooler.controller.sensor.get_outdoor_status(sense_data)
79 cooling_mode = max(cooler_activity["status"] + outdoor_status["status"], 0)
81 if cooler_activity["message"] is not None:
82 logging.info(cooler_activity["message"])
83 if outdoor_status["message"] is not None:
84 logging.info(outdoor_status["message"])
86 logging.info(
87 "cooling_mode: %d (cooler_status: %s, outdoor_status: %s)",
88 cooling_mode,
89 cooler_activity["status"],
90 outdoor_status["status"],
91 )
93 return {
94 "cooling_mode": cooling_mode,
95 "cooler_status": cooler_activity,
96 "outdoor_status": outdoor_status,
97 "sense_data": sense_data,
98 }
101def gen_control_msg(config, dummy_mode=False, speedup=1):
102 if dummy_mode:
103 sense_data = {}
104 mode = dummy_cooling_mode()
105 else:
106 sense_data = unit_cooler.controller.sensor.get_sense_data(config)
107 mode = judge_cooling_mode(config, sense_data)
109 mode_index = min(mode["cooling_mode"], len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1)
111 control_msg = copy.deepcopy(unit_cooler.controller.message.CONTROL_MESSAGE_LIST[mode_index])
113 # NOTE: 参考として、どのモードかも通知する
114 control_msg["mode_index"] = mode_index
115 # NOTE: メトリクス用に、センサーデータも送る
116 control_msg["sense_data"] = sense_data
118 if dummy_mode:
119 control_msg["duty"]["on_sec"] = max(control_msg["duty"]["on_sec"] / speedup, ON_SEC_MIN)
120 control_msg["duty"]["off_sec"] = max(control_msg["duty"]["off_sec"] / speedup, OFF_SEC_MIN)
122 logging.info(control_msg)
124 return control_msg
127if __name__ == "__main__":
128 # TEST Code
129 import docopt
130 import my_lib.config
131 import my_lib.logger
132 import my_lib.pretty
134 args = docopt.docopt(__doc__)
136 config_file = args["-c"]
137 debug_mode = args["-D"]
139 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
141 config = my_lib.config.load(config_file)
143 logging.info(my_lib.pretty.format(gen_control_msg(config)))