Coverage for src/actuator.py: 96%
64 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
1#!/usr/bin/env python3
2"""
3電磁弁を制御してエアコン室外機の冷却を行います。
5Usage:
6 actuator.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-l LOG_PORT] [-n COUNT] [-d] [-t SPEEDUP] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml]
10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost]
11 -p PUB_PORT : コントローラの ZeroMQ Pub サーバーのポートを指定します。 [default: 2222]
12 -l LOG_PORT : 動作ログを提供する WEB サーバーのポートを指定します。 [default: 5001]
13 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 0]
14 -d : ダミーモードで実行します。
15 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1]
16 -D : デバッグモードで動作します。
17"""
19import concurrent.futures
20import logging
21import multiprocessing
22import os
23import signal
24import time
26SCHEMA_CONFIG = "config.schema"
29def sig_handler(num, frame): # noqa: ARG001
30 import unit_cooler.actuator.worker
32 logging.warning("Receive signal %d", num)
34 if num == signal.SIGTERM:
35 unit_cooler.actuator.worker.term()
38def wait_before_start(config):
39 for i in range(config["actuator"]["control"]["interval_sec"]):
40 logging.info(
41 "Wait for the old Pod to finish (%3d / %3d)", i + 1, config["actuator"]["control"]["interval_sec"]
42 )
43 time.sleep(1)
46def start(config, arg):
47 global log_server_handle # noqa: PLW0603
49 setting = {
50 "control_host": "localhost",
51 "pub_port": 2222,
52 "log_port": 5001,
53 "dummy_mode": False,
54 "speedup": 1,
55 "msg_count": 0,
56 "debug_mode": False,
57 }
58 setting.update(arg)
60 logging.info("Using ZMQ server of %s:%d", setting["control_host"], setting["pub_port"])
62 # NOTE: オプションでダミーモードが指定された場合、環境変数もそれに揃えておく
63 if setting["dummy_mode"]:
64 logging.warning("Set dummy mode")
65 os.environ["DUMMY_MODE"] = "true"
67 manager = multiprocessing.Manager()
68 message_queue = manager.Queue()
69 event_queue = manager.Queue()
71 if not setting["dummy_mode"] and (os.environ.get("TEST", "false") != "true"):
72 # NOTE: 動作開始前に待つ。これを行わないと、複数の Pod が電磁弁を制御することに
73 # なり、電磁弁の故障を誤判定する可能性がある。
74 wait_before_start(config)
76 import unit_cooler.actuator.monitor
77 import unit_cooler.actuator.valve
78 import unit_cooler.actuator.work_log
79 import unit_cooler.actuator.worker
81 unit_cooler.actuator.work_log.init(config, event_queue)
83 logging.info("Initialize valve")
84 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"], config)
85 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"])
87 # NOTE: Blueprint のパス指定を YAML で行いたいので、my_lib.webapp の import 順を制御
88 import unit_cooler.actuator.web_server
90 try:
91 logging.info("Starting web server on port %d", setting["log_port"])
92 log_server_handle = unit_cooler.actuator.web_server.start(config, event_queue, setting["log_port"])
93 logging.info("Web server started successfully")
94 except Exception:
95 logging.exception("Failed to start web server")
96 raise
98 executor = concurrent.futures.ThreadPoolExecutor()
100 thread_list = unit_cooler.actuator.worker.start(
101 executor, unit_cooler.actuator.worker.get_worker_def(config, message_queue, setting)
102 )
104 signal.signal(signal.SIGTERM, sig_handler)
106 return (executor, thread_list, log_server_handle)
109def wait_and_term(executor, thread_list, log_server_handle, terminate=True): # noqa: ARG001
110 import unit_cooler.actuator.web_server
111 import unit_cooler.actuator.work_log
113 ret = 0
114 for thread_info in thread_list:
115 logging.info("Wait %s finish", thread_info["name"])
117 if thread_info["future"].result() != 0:
118 logging.error("Error occurred in %s", thread_info["name"])
119 ret = -1
121 unit_cooler.actuator.worker.term()
123 logging.info("Shutdown executor")
124 executor.shutdown(wait=True)
126 unit_cooler.actuator.web_server.term(log_server_handle)
127 unit_cooler.actuator.work_log.term()
129 logging.warning("Terminate unit_cooler")
131 return ret
134######################################################################
135if __name__ == "__main__":
136 import pathlib
137 import sys
139 import docopt
140 import my_lib.config
141 import my_lib.logger
143 args = docopt.docopt(__doc__)
145 config_file = args["-c"]
146 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"])
147 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"]))
148 log_port = int(os.environ.get("HEMS_LOG_PORT", args["-l"]))
149 dummy_mode = os.environ.get("DUMMY_MODE", args["-d"])
150 speedup = int(args["-t"])
151 msg_count = int(args["-n"])
152 debug_mode = args["-D"]
154 my_lib.logger.init("hems.unit_cooler", level=logging.DEBUG if debug_mode else logging.INFO)
156 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG))
157 sys.exit(
158 wait_and_term(
159 *start(
160 config,
161 {
162 "control_host": control_host,
163 "pub_port": pub_port,
164 "log_port": log_port,
165 "dummy_mode": dummy_mode,
166 "speedup": speedup,
167 "msg_count": msg_count,
168 "debug_mode": debug_mode,
169 },
170 ),
171 terminate=False,
172 )
173 )