Coverage for src/unit_cooler/actuator/worker.py: 94%
158 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
1#!/usr/bin/env python3
2"""
3アクチュエータで動作するワーカです。
5Usage:
6 worker.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-n COUNT] [-t SPEEDUP] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml]
10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost]
11 -p PUB_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 2222]
12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 1]
13 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1]
14 -D : デバッグモードで動作します。
15"""
17import concurrent.futures
18import logging
19import os
20import pathlib
21import threading
22import time
23import traceback
25import my_lib.footprint
27import unit_cooler.actuator.control
28import unit_cooler.actuator.monitor
29import unit_cooler.const
30import unit_cooler.pubsub.subscribe
31import unit_cooler.util
33# グローバル辞書(pytestワーカー毎に独立)
34_control_messages = {}
35_should_terminate = {}
37# メッセージの初期値
38MESSAGE_INIT = {"mode_index": 0, "state": unit_cooler.const.COOLING_STATE.IDLE}
41def get_worker_id():
42 return os.environ.get("PYTEST_XDIST_WORKER", "")
45def get_last_control_message():
46 """グローバル辞書からlast_control_messageを取得"""
47 worker_id = get_worker_id()
48 if worker_id not in _control_messages:
49 set_last_control_message(MESSAGE_INIT.copy())
51 return _control_messages[worker_id]
54def set_last_control_message(message):
55 """グローバル辞書にlast_control_messageを設定"""
56 _control_messages[get_worker_id()] = message
59def get_should_terminate():
60 return _should_terminate.get(get_worker_id(), None)
63def init_should_terminate():
64 should_terminate = get_should_terminate()
66 if should_terminate is None:
67 _should_terminate[get_worker_id()] = threading.Event()
69 else:
70 should_terminate.clear()
73def collect_environmental_metrics(config, current_message):
74 """環境データのメトリクス収集"""
75 from unit_cooler.metrics import get_metrics_collector
77 try:
78 metrics_db_path = config["actuator"]["metrics"]["data"]
79 metrics_collector = get_metrics_collector(metrics_db_path)
81 # current_messageのsense_dataからセンサーデータを取得
82 sense_data = current_message.get("sense_data", {})
84 if sense_data:
85 # 各センサーデータの最新値を取得
86 temperature = None
87 humidity = None
88 lux = None
89 solar_radiation = None
90 rain_amount = None
92 if sense_data.get("temp") and len(sense_data["temp"]) > 0: 92 ↛ 94line 92 didn't jump to line 94 because the condition on line 92 was always true
93 temperature = sense_data["temp"][0].get("value")
94 if sense_data.get("humi") and len(sense_data["humi"]) > 0: 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was always true
95 humidity = sense_data["humi"][0].get("value")
96 if sense_data.get("lux") and len(sense_data["lux"]) > 0: 96 ↛ 98line 96 didn't jump to line 98 because the condition on line 96 was always true
97 lux = sense_data["lux"][0].get("value")
98 if sense_data.get("solar_rad") and len(sense_data["solar_rad"]) > 0: 98 ↛ 102line 98 didn't jump to line 102 because the condition on line 98 was always true
99 solar_radiation = sense_data["solar_rad"][0].get("value")
100 logging.debug("Solar radiation data found: %s W/m²", solar_radiation)
101 else:
102 logging.debug(
103 "No solar radiation data in sense_data: %s",
104 list(sense_data.keys()) if sense_data else "empty",
105 )
106 if sense_data.get("rain") and len(sense_data["rain"]) > 0: 106 ↛ 110line 106 didn't jump to line 110 because the condition on line 106 was always true
107 rain_amount = sense_data["rain"][0].get("value")
109 # 環境データをメトリクスに記録
110 metrics_collector.update_environmental_data(
111 temperature, humidity, lux, solar_radiation, rain_amount
112 )
114 except Exception:
115 logging.exception("Failed to collect environmental metrics")
118def queue_put(message_queue, message, liveness_file):
119 message["state"] = unit_cooler.const.COOLING_STATE(message["state"])
121 logging.info("Receive message: %s", message)
123 message_queue.put(message)
124 my_lib.footprint.update(liveness_file)
127def sleep_until_next_iter(start_time, interval_sec):
128 sleep_sec = max(interval_sec - (time.time() - start_time), 0.5)
129 logging.debug("Seep %.1f sec...", sleep_sec)
131 # should_terminate が設定されるまで待機(最大 sleep_sec 秒)
132 get_should_terminate().wait(timeout=sleep_sec)
135# NOTE: コントローラから制御指示を受け取ってキューに積むワーカ
136def subscribe_worker(config, control_host, pub_port, message_queue, liveness_file, msg_count=0): # noqa: PLR0913
137 logging.info("Start actuator subscribe worker (%s:%d)", control_host, pub_port)
138 ret = 0
139 try:
140 unit_cooler.pubsub.subscribe.start_client(
141 control_host,
142 pub_port,
143 lambda message: queue_put(message_queue, message, liveness_file),
144 msg_count,
145 )
146 except Exception:
147 logging.exception("Failed to receive control message")
148 unit_cooler.util.notify_error(config, traceback.format_exc())
149 ret = -1
151 logging.warning("Stop subscribe worker")
152 return ret
155# NOTE: バルブの状態をモニタするワーカ
156def monitor_worker(config, liveness_file, dummy_mode=False, speedup=1, msg_count=0):
157 logging.info("Start monitor worker")
159 interval_sec = config["actuator"]["monitor"]["interval_sec"] / speedup
160 try:
161 handle = unit_cooler.actuator.monitor.gen_handle(config, interval_sec)
162 except Exception:
163 logging.exception("Failed to create handle")
165 unit_cooler.actuator.work_log.add(
166 "流量のロギングを開始できません。", unit_cooler.const.LOG_LEVEL.ERROR
167 )
168 return -1
170 i = 0
171 ret = 0
172 try:
173 while True:
174 start_time = time.time()
176 need_logging = (i % handle["log_period"]) == 0
177 i += 1
179 mist_condition = unit_cooler.actuator.monitor.get_mist_condition()
180 unit_cooler.actuator.monitor.check(handle, mist_condition, need_logging)
181 unit_cooler.actuator.monitor.send_mist_condition(
182 handle, mist_condition, get_last_control_message(), dummy_mode
183 )
185 my_lib.footprint.update(liveness_file)
187 if get_should_terminate().is_set():
188 logging.info("Terminate monitor worker")
189 break
191 if msg_count != 0: 191 ↛ 200line 191 didn't jump to line 200 because the condition on line 191 was always true
192 logging.debug("(monitor_count, msg_count) = (%d, %d)", handle["monitor_count"], msg_count)
193 # NOTE: monitor_worker が先に終了しないようにする
194 if handle["monitor_count"] >= (msg_count + 20):
195 logging.info(
196 "Terminate monitor worker, because the specified number of times has been reached."
197 )
198 break
200 sleep_until_next_iter(start_time, interval_sec)
201 except Exception:
202 unit_cooler.util.notify_error(config, traceback.format_exc())
203 ret = -1
205 logging.warning("Stop monitor worker")
206 return ret
209# NOTE: バルブを制御するワーカ
210def control_worker(config, message_queue, liveness_file, dummy_mode=False, speedup=1, msg_count=0): # noqa: PLR0913
211 logging.info("Start control worker")
213 if dummy_mode:
214 logging.warning("DUMMY mode")
216 interval_sec = config["actuator"]["control"]["interval_sec"] / speedup
217 handle = unit_cooler.actuator.control.gen_handle(config, message_queue)
219 ret = 0
220 try:
221 while True:
222 start_time = time.time()
224 current_message = unit_cooler.actuator.control.get_control_message(
225 handle, get_last_control_message()
226 )
228 set_last_control_message(current_message)
230 unit_cooler.actuator.control.execute(config, current_message)
232 # 環境データのメトリクス収集(定期的に実行)
233 try:
234 collect_environmental_metrics(config, current_message)
235 except Exception:
236 logging.debug("Failed to collect environmental metrics")
238 my_lib.footprint.update(liveness_file)
240 if get_should_terminate().is_set():
241 logging.info("Terminate control worker")
242 break
244 if msg_count != 0: 244 ↛ 250line 244 didn't jump to line 250 because the condition on line 244 was always true
245 logging.debug("(receive_count, msg_count) = (%d, %d)", handle["receive_count"], msg_count)
246 if handle["receive_count"] >= msg_count:
247 logging.info("Terminate control, because the specified number of times has been reached.")
248 break
250 sleep_until_next_iter(start_time, interval_sec)
251 except Exception:
252 logging.exception("Failed to control valve")
253 unit_cooler.util.notify_error(config, traceback.format_exc())
254 ret = -1
256 logging.warning("Stop control worker")
257 # NOTE: Queue を close した後に put されると ValueError が発生するので、
258 # 明示的に閉じるのをやめた。
259 # message_queue.close()
261 return ret
264def get_worker_def(config, message_queue, setting):
265 return [
266 {
267 "name": "subscribe_worker",
268 "param": [
269 subscribe_worker,
270 config,
271 setting["control_host"],
272 setting["pub_port"],
273 message_queue,
274 pathlib.Path(config["actuator"]["subscribe"]["liveness"]["file"]),
275 setting["msg_count"],
276 ],
277 },
278 {
279 "name": "monitor_worker",
280 "param": [
281 monitor_worker,
282 config,
283 pathlib.Path(config["actuator"]["monitor"]["liveness"]["file"]),
284 setting["dummy_mode"],
285 setting["speedup"],
286 setting["msg_count"],
287 ],
288 },
289 {
290 "name": "control_worker",
291 "param": [
292 control_worker,
293 config,
294 message_queue,
295 pathlib.Path(config["actuator"]["control"]["liveness"]["file"]),
296 setting["dummy_mode"],
297 setting["speedup"],
298 setting["msg_count"],
299 ],
300 },
301 ]
304def start(executor, worker_def):
305 init_should_terminate()
306 thread_list = []
308 for worker_info in worker_def:
309 future = executor.submit(*worker_info["param"])
310 thread_list.append({"name": worker_info["name"], "future": future})
312 return thread_list
315def term():
316 logging.info("Terminate actuator worker")
317 get_should_terminate().set()
320if __name__ == "__main__":
321 # TEST Code
322 import multiprocessing
323 import os
324 import pathlib
326 import docopt
327 import my_lib.config
328 import my_lib.logger
329 import my_lib.pretty
330 import my_lib.webapp.config
332 import unit_cooler.actuator.valve
334 args = docopt.docopt(__doc__)
336 config_file = args["-c"]
337 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"])
338 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"]))
339 speedup = int(args["-t"])
340 msg_count = int(args["-n"])
341 debug_mode = args["-D"]
343 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
345 config = my_lib.config.load(config_file)
346 message_queue = multiprocessing.Queue()
347 event_queue = multiprocessing.Queue()
349 os.environ["DUMMY_MODE"] = "true"
351 my_lib.webapp.config.init(config["actuator"])
352 my_lib.webapp.log.init(config)
353 unit_cooler.actuator.work_log.init(config, event_queue)
355 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"], config)
356 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"])
358 # NOTE: テストしやすいように、threading.Thread ではなく multiprocessing.pool.ThreadPool を使う
359 executor = concurrent.futures.ThreadPoolExecutor()
361 setting = {
362 "control_host": control_host,
363 "pub_port": pub_port,
364 "speedup": speedup,
365 "msg_count": msg_count,
366 "dummy_mode": True,
367 }
369 thread_list = start(executor, get_worker_def(config, message_queue, setting))
371 for thread_info in thread_list:
372 logging.info("Wait %s finish", thread_info["name"])
374 if thread_info["future"].result() != 0:
375 logging.warning("Error occurred in %s", thread_info["name"])
377 unit_cooler.actuator.work_log.term()
379 logging.info("Shutdown executor")
380 executor.shutdown()