Coverage for src/unit_cooler/pubsub/publish.py: 96%
105 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 11:52 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 11:52 +0000
1#!/usr/bin/env python3
2"""
3エアコン室外機の冷却モードの指示を出します。
5Usage:
6 publish.py [-c CONFIG] [-s SERVER_HOST] [-p SERVER_PORT] [-r REAL_PORT] [-n COUNT] [-t SPEEDUP] [-d] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml]
10 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost]
11 -p SERVER_PORT : ZeroMQ の サーバーを動作させるポートを指定します。 [default: 2222]
12 -r REAL_PORT : ZeroMQ の 本当のサーバーを動作させるポートを指定します。 [default: 2200]
13 -n COUNT : n 回制御メッセージを生成したら終了します。0 は制限なし。 [default: 1]
14 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 20]
15 -d : ダミーモード(冷却モードをランダムに生成)で動作します。
16 -D : デバッグモードで動作します。
17"""
19import json
20import logging
21import time
23import zmq
25import unit_cooler.const
28def wait_first_client(socket, timeout=10):
29 start_time = time.time()
31 logging.info("Waiting for first client connection...")
32 poller = zmq.Poller()
33 poller.register(socket, zmq.POLLIN)
35 while True:
36 events = dict(poller.poll(100))
37 if socket in events:
38 event = socket.recv()
39 if event[0] == 1: # 購読開始 39 ↛ 44line 39 didn't jump to line 44 because the condition on line 39 was always true
40 logging.info("First client connected.")
41 # 購読イベントを処理
42 socket.send(event)
44 if time.time() - start_time > timeout:
45 logging.warning("Timeout waiting for first client connection.")
46 break
49def start_server(server_port, func, interval_sec, msg_count=0):
50 logging.info("Start ZMQ server (port: %d)...", server_port)
52 context = zmq.Context()
54 socket = context.socket(zmq.XPUB)
55 socket.bind(f"tcp://*:{server_port}")
57 logging.info("Server initialize done.")
59 # 最初のクライアント接続を待つ
60 wait_first_client(socket)
62 send_count = 0
63 try:
64 while True:
65 # 購読イベントをチェック(ノンブロッキング)
66 try:
67 event = socket.recv(zmq.NOBLOCK)
68 if event[0] == 0: # 購読解除
69 logging.debug("Client unsubscribed.")
70 elif event[0] == 1: # 購読開始 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true
71 logging.debug("New client subscribed.")
72 # イベントを転送
73 socket.send(event)
74 except zmq.Again:
75 pass # イベントなし
77 start_time = time.time()
78 socket.send_string(f"{unit_cooler.const.PUBSUB_CH} {json.dumps(func())}")
80 if msg_count != 0: 80 ↛ 88line 80 didn't jump to line 88 because the condition on line 80 was always true
81 send_count += 1
82 logging.debug("(send_count, msg_count) = (%d, %d)", send_count, msg_count)
83 # NOTE: Proxy が間に入るので、多く回す
84 if send_count == (msg_count + 15):
85 logging.info("Terminate, because the specified number of times has been reached.")
86 break
88 sleep_sec = max(interval_sec - (time.time() - start_time), 0.5)
89 logging.debug("Seep %.1f sec...", sleep_sec)
90 time.sleep(sleep_sec)
91 except Exception:
92 logging.exception("Server failed")
94 socket.close()
95 context.destroy()
97 logging.warning("Stop ZMQ server")
100# NOTE: Last Value Caching Proxy
101# see https://zguide.zeromq.org/docs/chapter5/
102def start_proxy(server_host, server_port, proxy_port, msg_count=0): # noqa: PLR0915, C901
103 logging.info("Start ZMQ proxy server (front: %s:%d, port: %d)...", server_host, server_port, proxy_port)
105 context = zmq.Context()
107 frontend = context.socket(zmq.SUB)
108 frontend.connect(f"tcp://{server_host}:{server_port}")
109 frontend.setsockopt_string(zmq.SUBSCRIBE, unit_cooler.const.PUBSUB_CH)
111 backend = context.socket(zmq.XPUB)
112 backend.setsockopt(zmq.XPUB_VERBOSE, 1)
113 backend.bind(f"tcp://*:{proxy_port}")
115 cache = {}
117 poller = zmq.Poller()
118 poller.register(frontend, zmq.POLLIN)
119 poller.register(backend, zmq.POLLIN)
121 subscribed = False # NOTE: テスト用
122 proxy_count = 0
123 while True:
124 try:
125 events = dict(poller.poll(100))
126 except KeyboardInterrupt: # pragma: no cover
127 break
129 if frontend in events:
130 recv_data = frontend.recv_string()
131 ch, json_str = recv_data.split(" ", 1)
132 logging.debug("Store cache")
133 cache[ch] = json_str
135 logging.info("Proxy message")
136 backend.send_string(recv_data)
138 if subscribed: 138 ↛ 141line 138 didn't jump to line 141 because the condition on line 138 was always true
139 proxy_count += 1
141 if backend in events:
142 logging.debug("Backend event")
143 event = backend.recv()
144 if event[0] == 0:
145 logging.info("Client unsubscribed.")
146 elif event[0] == 1:
147 logging.info("New client subscribed.")
148 subscribed = True
149 ch = event[1:].decode("utf-8")
150 if ch in cache:
151 logging.info("Send cache")
152 backend.send_string(f"{unit_cooler.const.PUBSUB_CH} {cache[unit_cooler.const.PUBSUB_CH]}")
153 proxy_count += 1
154 else:
155 logging.warning("Cache is empty")
156 else: # pragma: no cover
157 pass
159 if msg_count != 0: 159 ↛ 123line 159 didn't jump to line 123 because the condition on line 159 was always true
160 logging.debug("(proxy_count, msg_count) = (%d, %d)", proxy_count, msg_count)
161 if proxy_count == msg_count:
162 logging.info("Terminate, because the specified number of times has been reached.")
163 break
165 frontend.close()
166 backend.close()
167 context.destroy()
169 logging.warning("Stop ZMQ proxy server")
172if __name__ == "__main__":
173 # TEST Code
174 import os
175 import threading
177 import docopt
178 import my_lib.config
179 import my_lib.logger
180 import my_lib.pretty
182 import unit_cooler.actuator.subscribe
183 import unit_cooler.const
184 import unit_cooler.controller.engine
186 args = docopt.docopt(__doc__)
188 config_file = args["-c"]
189 server_host = args["-s"]
190 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"]))
191 real_port = int(args["-r"])
192 msg_count = int(args["-n"])
193 speedup = int(args["-t"])
194 dummy_mode = args["-D"]
195 debug_mode = args["-D"]
197 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
199 config = my_lib.config.load(config_file)
201 proxy_thread = threading.Thread(
202 target=start_proxy,
203 args=(server_host, real_port, server_port, msg_count),
204 )
205 proxy_thread.start()
207 server_thread = threading.Thread(
208 target=start_server,
209 args=(
210 real_port,
211 lambda: unit_cooler.controller.engine.gen_control_msg(config, dummy_mode, speedup),
212 config["controller"]["interval_sec"] / speedup,
213 msg_count,
214 ),
215 )
216 server_thread.start()
218 unit_cooler.actuator.subscribe.start_client(
219 server_host,
220 server_port,
221 lambda message: logging.info("receive: %s", message),
222 msg_count,
223 )
225 server_thread.join()
226 proxy_thread.join()