Coverage for src/unit_cooler/actuator/worker.py: 98%
114 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 11:52 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 11:52 +0000
1#!/usr/bin/env python3
2"""
3アクチュエータで動作するワーカです。
5Usage:
6 worker.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-n COUNT] [-t SPEEDUP] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml]
10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost]
11 -p PUB_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 2222]
12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 1]
13 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1]
14 -D : デバッグモードで動作します。
15"""
17import concurrent.futures
18import logging
19import pathlib
20import threading
21import time
22import traceback
24import my_lib.footprint
26import unit_cooler.actuator.control
27import unit_cooler.actuator.monitor
28import unit_cooler.const
29import unit_cooler.pubsub.subscribe
30import unit_cooler.util
32# スレッドローカル変数として定義
33thread_local = threading.local()
36def get_last_control_message():
37 """スレッドローカルなlast_control_messageを取得"""
38 if not hasattr(thread_local, "last_control_message"):
39 thread_local.last_control_message = {"mode_index": -1, "state": unit_cooler.const.COOLING_STATE.IDLE}
40 return thread_local.last_control_message
43def set_last_control_message(message):
44 """スレッドローカルなlast_control_messageを設定"""
45 thread_local.last_control_message = message
48should_terminate = threading.Event()
51def queue_put(message_queue, message, liveness_file):
52 message["state"] = unit_cooler.const.COOLING_STATE(message["state"])
54 logging.info("Receive message: %s", message)
56 message_queue.put(message)
57 my_lib.footprint.update(liveness_file)
60def sleep_until_next_iter(start_time, interval_sec):
61 sleep_sec = max(interval_sec - (time.time() - start_time), 0.5)
62 logging.debug("Seep %.1f sec...", sleep_sec)
63 time.sleep(sleep_sec)
66# NOTE: コントローラから制御指示を受け取ってキューに積むワーカ
67def subscribe_worker(config, control_host, pub_port, message_queue, liveness_file, msg_count=0): # noqa: PLR0913
68 logging.info("Start actuator subscribe worker (%s:%d)", control_host, pub_port)
69 ret = 0
70 try:
71 unit_cooler.pubsub.subscribe.start_client(
72 control_host,
73 pub_port,
74 lambda message: queue_put(message_queue, message, liveness_file),
75 msg_count,
76 )
77 except Exception:
78 logging.exception("Failed to receive control message")
79 unit_cooler.util.notify_error(config, traceback.format_exc())
80 ret = -1
82 logging.warning("Stop subscribe worker")
83 return ret
86# NOTE: バルブの状態をモニタするワーカ
87def monitor_worker(config, liveness_file, dummy_mode=False, speedup=1, msg_count=0):
88 global should_terminate
90 logging.info("Start monitor worker")
92 interval_sec = config["actuator"]["monitor"]["interval_sec"] / speedup
93 try:
94 handle = unit_cooler.actuator.monitor.gen_handle(config, interval_sec)
95 except Exception:
96 logging.exception("Failed to create handle")
98 unit_cooler.actuator.work_log.add(
99 "流量のロギングを開始できません。", unit_cooler.const.LOG_LEVEL.ERROR
100 )
101 return -1
103 i = 0
104 ret = 0
105 try:
106 while True:
107 start_time = time.time()
109 need_logging = (i % handle["log_period"]) == 0
110 i += 1
112 mist_condition = unit_cooler.actuator.monitor.get_mist_condition()
113 unit_cooler.actuator.monitor.check(handle, mist_condition, need_logging)
114 unit_cooler.actuator.monitor.send_mist_condition(
115 handle, mist_condition, get_last_control_message(), dummy_mode
116 )
118 my_lib.footprint.update(liveness_file)
120 if should_terminate.is_set():
121 logging.info("Terminate monitor worker")
122 break
124 if msg_count != 0: 124 ↛ 133line 124 didn't jump to line 133 because the condition on line 124 was always true
125 logging.debug("(monitor_count, msg_count) = (%d, %d)", handle["monitor_count"], msg_count)
126 # NOTE: monitor_worker が先に終了しないようにする
127 if handle["monitor_count"] >= (msg_count + 20):
128 logging.info(
129 "Terminate monitor worker, because the specified number of times has been reached."
130 )
131 break
133 sleep_until_next_iter(start_time, interval_sec)
134 except Exception:
135 unit_cooler.util.notify_error(config, traceback.format_exc())
136 ret = -1
138 logging.warning("Stop monitor worker")
139 return ret
142# NOTE: バルブを制御するワーカ
143def control_worker(config, message_queue, liveness_file, dummy_mode=False, speedup=1, msg_count=0): # noqa: PLR0913
144 global should_terminate
146 logging.info("Start control worker")
148 if dummy_mode:
149 logging.warning("DUMMY mode")
151 interval_sec = config["actuator"]["control"]["interval_sec"] / speedup
152 handle = unit_cooler.actuator.control.gen_handle(config, message_queue)
154 ret = 0
155 try:
156 while True:
157 start_time = time.time()
159 current_message = unit_cooler.actuator.control.get_control_message(
160 handle, get_last_control_message()
161 )
163 set_last_control_message(current_message)
165 unit_cooler.actuator.control.execute(config, current_message)
167 my_lib.footprint.update(liveness_file)
169 if should_terminate.is_set():
170 logging.info("Terminate control worker")
171 break
173 if msg_count != 0: 173 ↛ 179line 173 didn't jump to line 179 because the condition on line 173 was always true
174 logging.debug("(receive_count, msg_count) = (%d, %d)", handle["receive_count"], msg_count)
175 if handle["receive_count"] >= msg_count:
176 logging.info("Terminate control, because the specified number of times has been reached.")
177 break
179 sleep_until_next_iter(start_time, interval_sec)
180 except Exception:
181 logging.exception("Failed to control valve")
182 unit_cooler.util.notify_error(config, traceback.format_exc())
183 ret = -1
185 logging.warning("Stop control worker")
186 # NOTE: Queue を close した後に put されると ValueError が発生するので、
187 # 明示的に閉じるのをやめた。
188 # message_queue.close()
190 return ret
193def get_worker_def(config, message_queue, setting):
194 return [
195 {
196 "name": "subscribe_worker",
197 "param": [
198 subscribe_worker,
199 config,
200 setting["control_host"],
201 setting["pub_port"],
202 message_queue,
203 pathlib.Path(config["actuator"]["subscribe"]["liveness"]["file"]),
204 setting["msg_count"],
205 ],
206 },
207 {
208 "name": "monitor_worker",
209 "param": [
210 monitor_worker,
211 config,
212 pathlib.Path(config["actuator"]["monitor"]["liveness"]["file"]),
213 setting["dummy_mode"],
214 setting["speedup"],
215 setting["msg_count"],
216 ],
217 },
218 {
219 "name": "control_worker",
220 "param": [
221 control_worker,
222 config,
223 message_queue,
224 pathlib.Path(config["actuator"]["control"]["liveness"]["file"]),
225 setting["dummy_mode"],
226 setting["speedup"],
227 setting["msg_count"],
228 ],
229 },
230 ]
233def start(executor, worker_def):
234 global should_terminate
236 should_terminate.clear()
237 thread_list = []
239 for worker_info in worker_def:
240 future = executor.submit(*worker_info["param"])
241 thread_list.append({"name": worker_info["name"], "future": future})
243 return thread_list
246def term():
247 global should_terminate
249 should_terminate.set()
252if __name__ == "__main__":
253 # TEST Code
254 import multiprocessing
255 import os
256 import pathlib
258 import docopt
259 import my_lib.config
260 import my_lib.logger
261 import my_lib.pretty
262 import my_lib.webapp.config
264 import unit_cooler.actuator.valve
266 args = docopt.docopt(__doc__)
268 config_file = args["-c"]
269 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"])
270 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"]))
271 speedup = int(args["-t"])
272 msg_count = int(args["-n"])
273 debug_mode = args["-D"]
275 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
277 config = my_lib.config.load(config_file)
278 message_queue = multiprocessing.Queue()
279 event_queue = multiprocessing.Queue()
281 os.environ["DUMMY_MODE"] = "true"
283 my_lib.webapp.config.init(config["actuator"])
284 my_lib.webapp.log.init(config)
285 unit_cooler.actuator.work_log.init(config, event_queue)
287 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"])
288 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"])
290 # NOTE: テストしやすいように、threading.Thread ではなく multiprocessing.pool.ThreadPool を使う
291 executor = concurrent.futures.ThreadPoolExecutor()
293 setting = {
294 "control_host": control_host,
295 "pub_port": pub_port,
296 "speedup": speedup,
297 "msg_count": msg_count,
298 "dummy_mode": True,
299 }
301 thread_list = start(executor, get_worker_def(config, message_queue, setting))
303 for thread_info in thread_list:
304 logging.info("Wait %s finish", thread_info["name"])
306 if thread_info["future"].result() != 0:
307 logging.warning("Error occurred in %s", thread_info["name"])
309 unit_cooler.actuator.work_log.term()
311 logging.info("Shutdown executor")
312 executor.shutdown()