Coverage for src/unit_cooler/controller/engine.py: 100%
58 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
1#!/usr/bin/env python3
2"""
3屋外の気象情報とエアコンの稼働状況に基づき、冷却モードを決定します。
5Usage:
6 engine.py [-c CONFIG] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml]
10 -D : デバッグモードで動作します。
11"""
13import copy
14import logging
16import my_lib.notify.slack
18import unit_cooler.controller.message
19import unit_cooler.controller.sensor
20import unit_cooler.util
22# 最低でもこの時間は ON にする (テスト時含む)
23ON_SEC_MIN = 5
24# 最低でもこの時間は OFF にする (テスト時含む)
25OFF_SEC_MIN = 5
28def dummy_cooling_mode():
29 import random
31 current_mode = dummy_cooling_mode.prev_mode
32 max_mode = len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1
34 # 60%の確率で現状維持、40%の確率で変更
35 if random.random() < 0.6: # noqa: S311
36 cooling_mode = current_mode
37 elif current_mode == 1:
38 # モード1の場合、特別な処理
39 if random.random() < 0.1: # noqa: S311 # 10%の確率で0へ
40 cooling_mode = 0
41 elif random.random() < 0.5: # noqa: S311 # 残り90%のうち50%で+1
42 cooling_mode = min(current_mode + 1, max_mode)
43 else: # 残り90%のうち50%で現状維持
44 cooling_mode = current_mode
45 elif current_mode == 0:
46 # モード0の場合、+1のみ
47 cooling_mode = 1
48 elif current_mode == max_mode:
49 # 最大モードの場合、-1のみ
50 cooling_mode = current_mode - 1
51 else:
52 # その他の場合、50%で+1、50%で-1
53 cooling_mode = current_mode + 1 if random.random() < 0.5 else current_mode - 1 # noqa: S311
55 dummy_cooling_mode.prev_mode = cooling_mode
57 logging.info("cooling_mode: %d (prev: %d)", cooling_mode, current_mode)
59 return {"cooling_mode": cooling_mode}
62dummy_cooling_mode.prev_mode = 0
65def judge_cooling_mode(config):
66 logging.info("Judge cooling mode")
68 sense_data = unit_cooler.controller.sensor.get_sense_data(config)
70 try:
71 cooler_activity = unit_cooler.controller.sensor.get_cooler_activity(sense_data)
72 except RuntimeError as e:
73 unit_cooler.util.notify_error(config, e.args[0])
74 cooler_activity = {"status": 0, "message": None}
76 if cooler_activity["status"] == 0:
77 outdoor_status = {"status": None, "message": None}
78 cooling_mode = 0
79 else:
80 outdoor_status = unit_cooler.controller.sensor.get_outdoor_status(sense_data)
81 cooling_mode = max(cooler_activity["status"] + outdoor_status["status"], 0)
83 if cooler_activity["message"] is not None:
84 logging.info(cooler_activity["message"])
85 if outdoor_status["message"] is not None:
86 logging.info(outdoor_status["message"])
88 logging.info(
89 "cooling_mode: %d (cooler_status: %s, outdoor_status: %s)",
90 cooling_mode,
91 cooler_activity["status"],
92 outdoor_status["status"],
93 )
95 return {
96 "cooling_mode": cooling_mode,
97 "cooler_status": cooler_activity,
98 "outdoor_status": outdoor_status,
99 "sense_data": sense_data,
100 }
103def gen_control_msg(config, dummy_mode=False, speedup=1):
104 mode = dummy_cooling_mode() if dummy_mode else judge_cooling_mode(config)
105 mode_index = min(mode["cooling_mode"], len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1)
106 control_msg = copy.deepcopy(unit_cooler.controller.message.CONTROL_MESSAGE_LIST[mode_index])
108 # NOTE: 参考として、どのモードかも通知する
109 control_msg["mode_index"] = mode_index
111 if dummy_mode:
112 control_msg["duty"]["on_sec"] = max(control_msg["duty"]["on_sec"] / speedup, ON_SEC_MIN)
113 control_msg["duty"]["off_sec"] = max(control_msg["duty"]["off_sec"] / speedup, OFF_SEC_MIN)
115 logging.info(control_msg)
117 return control_msg
120if __name__ == "__main__":
121 # TEST Code
122 import docopt
123 import my_lib.config
124 import my_lib.logger
125 import my_lib.pretty
127 args = docopt.docopt(__doc__)
129 config_file = args["-c"]
130 debug_mode = args["-D"]
132 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
134 config = my_lib.config.load(config_file)
136 logging.info(my_lib.pretty.format(gen_control_msg(config)))