Coverage for src/actuator.py: 100%
57 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
1#!/usr/bin/env python3
2"""
3電磁弁を制御してエアコン室外機の冷却を行います。
5Usage:
6 actuator.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-n COUNT] [-d] [-t SPEEDUP] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml]
10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost]
11 -p PUB_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 2222]
12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 0]
13 -d : ダミーモードで実行します。
14 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1]
15 -D : デバッグモードで動作します。
16"""
18import concurrent.futures
19import logging
20import multiprocessing
21import os
22import signal
23import time
25SCHEMA_CONFIG = "config.schema"
28def sig_handler(num, frame): # noqa: ARG001
29 import unit_cooler.actuator.worker
31 logging.warning("Receive signal %d", num)
33 if num == signal.SIGTERM:
34 unit_cooler.actuator.worker.term()
37def wait_before_start(config):
38 for i in range(config["actuator"]["control"]["interval_sec"]):
39 logging.info(
40 "Wait for the old Pod to finish (%3d / %3d)", i + 1, config["actuator"]["control"]["interval_sec"]
41 )
42 time.sleep(1)
45def start(config, arg):
46 global should_terminate # noqa: PLW0603
47 global log_server_handle # noqa: PLW0603
49 setting = {
50 "control_host": "localhost",
51 "pub_port": 2222,
52 "log_port": 5001,
53 "dummy_mode": False,
54 "speedup": 1,
55 "msg_count": 0,
56 "debug_mode": False,
57 }
58 setting.update(arg)
60 should_terminate = False
62 logging.info("Using ZMQ server of %s:%d", setting["control_host"], setting["pub_port"])
64 # NOTE: オプションでダミーモードが指定された場合、環境変数もそれに揃えておく
65 if setting["dummy_mode"]:
66 logging.warning("Set dummy mode")
67 os.environ["DUMMY_MODE"] = "true"
69 manager = multiprocessing.Manager()
70 message_queue = manager.Queue()
71 event_queue = manager.Queue()
73 # NOTE: Blueprint のパス指定を YAML で行いたいので、my_lib.webapp の import 順を制御
74 import unit_cooler.actuator.log_server
76 log_server_handle = unit_cooler.actuator.log_server.start(config, event_queue, setting["log_port"])
78 if not setting["dummy_mode"] and (os.environ.get("TEST", "false") != "true"):
79 # NOTE: 動作開始前に待つ。これを行わないと、複数の Pod が電磁弁を制御することに
80 # なり、電磁弁の故障を誤判定する可能性がある。
81 wait_before_start(config)
83 import unit_cooler.actuator.work_log
84 import unit_cooler.actuator.worker
86 unit_cooler.actuator.work_log.init(config, event_queue)
88 logging.info("Initialize valve")
89 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"])
90 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"])
92 executor = concurrent.futures.ThreadPoolExecutor()
94 thread_list = unit_cooler.actuator.worker.start(
95 executor, unit_cooler.actuator.worker.get_worker_def(config, message_queue, setting)
96 )
98 signal.signal(signal.SIGTERM, sig_handler)
100 return (executor, thread_list, log_server_handle)
103def wait_and_term(executor, thread_list, log_server_handle, terminate=True):
104 global should_terminate # noqa: PLW0603
106 import unit_cooler.actuator.log_server
107 import unit_cooler.actuator.work_log
109 should_terminate = terminate
111 ret = 0
112 for thread_info in thread_list:
113 logging.info("Wait %s finish", thread_info["name"])
115 if thread_info["future"].result() != 0:
116 logging.error("Error occurred in %s", thread_info["name"])
117 ret = -1
119 logging.info("Shutdown executor")
120 executor.shutdown(wait=True)
122 unit_cooler.actuator.log_server.term(log_server_handle)
123 unit_cooler.actuator.work_log.term()
125 logging.warning("Terminate unit_cooler")
127 return ret
130######################################################################
131if __name__ == "__main__":
132 import pathlib
133 import sys
135 import docopt
136 import my_lib.config
137 import my_lib.logger
139 args = docopt.docopt(__doc__)
141 config_file = args["-c"]
142 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"])
143 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"]))
144 dummy_mode = os.environ.get("DUMMY_MODE", args["-d"])
145 speedup = int(args["-t"])
146 msg_count = int(args["-n"])
147 debug_mode = args["-D"]
149 my_lib.logger.init("hems.unit_cooler", level=logging.DEBUG if debug_mode else logging.INFO)
151 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG))
153 sys.exit(
154 wait_and_term(
155 *start(
156 config,
157 {
158 "control_host": control_host,
159 "pub_port": pub_port,
160 "log_port": config["actuator"]["log_server"]["webapp"]["port"],
161 "dummy_mode": dummy_mode,
162 "speedup": speedup,
163 "msg_count": msg_count,
164 "debug_mode": debug_mode,
165 },
166 ),
167 terminate=False,
168 )
169 )