Coverage for src/unit_cooler/pubsub/publish.py: 96%

105 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 11:52 +0000

1#!/usr/bin/env python3 

2""" 

3エアコン室外機の冷却モードの指示を出します。 

4 

5Usage: 

6 publish.py [-c CONFIG] [-s SERVER_HOST] [-p SERVER_PORT] [-r REAL_PORT] [-n COUNT] [-t SPEEDUP] [-d] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml] 

10 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost] 

11 -p SERVER_PORT : ZeroMQ の サーバーを動作させるポートを指定します。 [default: 2222] 

12 -r REAL_PORT : ZeroMQ の 本当のサーバーを動作させるポートを指定します。 [default: 2200] 

13 -n COUNT : n 回制御メッセージを生成したら終了します。0 は制限なし。 [default: 1] 

14 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 20] 

15 -d : ダミーモード(冷却モードをランダムに生成)で動作します。 

16 -D : デバッグモードで動作します。 

17""" 

18 

19import json 

20import logging 

21import time 

22 

23import zmq 

24 

25import unit_cooler.const 

26 

27 

28def wait_first_client(socket, timeout=10): 

29 start_time = time.time() 

30 

31 logging.info("Waiting for first client connection...") 

32 poller = zmq.Poller() 

33 poller.register(socket, zmq.POLLIN) 

34 

35 while True: 

36 events = dict(poller.poll(100)) 

37 if socket in events: 

38 event = socket.recv() 

39 if event[0] == 1: # 購読開始 39 ↛ 44line 39 didn't jump to line 44 because the condition on line 39 was always true

40 logging.info("First client connected.") 

41 # 購読イベントを処理 

42 socket.send(event) 

43 

44 if time.time() - start_time > timeout: 

45 logging.warning("Timeout waiting for first client connection.") 

46 break 

47 

48 

49def start_server(server_port, func, interval_sec, msg_count=0): 

50 logging.info("Start ZMQ server (port: %d)...", server_port) 

51 

52 context = zmq.Context() 

53 

54 socket = context.socket(zmq.XPUB) 

55 socket.bind(f"tcp://*:{server_port}") 

56 

57 logging.info("Server initialize done.") 

58 

59 # 最初のクライアント接続を待つ 

60 wait_first_client(socket) 

61 

62 send_count = 0 

63 try: 

64 while True: 

65 # 購読イベントをチェック(ノンブロッキング) 

66 try: 

67 event = socket.recv(zmq.NOBLOCK) 

68 if event[0] == 0: # 購読解除 

69 logging.debug("Client unsubscribed.") 

70 elif event[0] == 1: # 購読開始 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true

71 logging.debug("New client subscribed.") 

72 # イベントを転送 

73 socket.send(event) 

74 except zmq.Again: 

75 pass # イベントなし 

76 

77 start_time = time.time() 

78 socket.send_string(f"{unit_cooler.const.PUBSUB_CH} {json.dumps(func())}") 

79 

80 if msg_count != 0: 80 ↛ 88line 80 didn't jump to line 88 because the condition on line 80 was always true

81 send_count += 1 

82 logging.debug("(send_count, msg_count) = (%d, %d)", send_count, msg_count) 

83 # NOTE: Proxy が間に入るので、多く回す 

84 if send_count == (msg_count + 15): 

85 logging.info("Terminate, because the specified number of times has been reached.") 

86 break 

87 

88 sleep_sec = max(interval_sec - (time.time() - start_time), 0.5) 

89 logging.debug("Seep %.1f sec...", sleep_sec) 

90 time.sleep(sleep_sec) 

91 except Exception: 

92 logging.exception("Server failed") 

93 

94 socket.close() 

95 context.destroy() 

96 

97 logging.warning("Stop ZMQ server") 

98 

99 

100# NOTE: Last Value Caching Proxy 

101# see https://zguide.zeromq.org/docs/chapter5/ 

102def start_proxy(server_host, server_port, proxy_port, msg_count=0): # noqa: PLR0915, C901 

103 logging.info("Start ZMQ proxy server (front: %s:%d, port: %d)...", server_host, server_port, proxy_port) 

104 

105 context = zmq.Context() 

106 

107 frontend = context.socket(zmq.SUB) 

108 frontend.connect(f"tcp://{server_host}:{server_port}") 

109 frontend.setsockopt_string(zmq.SUBSCRIBE, unit_cooler.const.PUBSUB_CH) 

110 

111 backend = context.socket(zmq.XPUB) 

112 backend.setsockopt(zmq.XPUB_VERBOSE, 1) 

113 backend.bind(f"tcp://*:{proxy_port}") 

114 

115 cache = {} 

116 

117 poller = zmq.Poller() 

118 poller.register(frontend, zmq.POLLIN) 

119 poller.register(backend, zmq.POLLIN) 

120 

121 subscribed = False # NOTE: テスト用 

122 proxy_count = 0 

123 while True: 

124 try: 

125 events = dict(poller.poll(100)) 

126 except KeyboardInterrupt: # pragma: no cover 

127 break 

128 

129 if frontend in events: 

130 recv_data = frontend.recv_string() 

131 ch, json_str = recv_data.split(" ", 1) 

132 logging.debug("Store cache") 

133 cache[ch] = json_str 

134 

135 logging.info("Proxy message") 

136 backend.send_string(recv_data) 

137 

138 if subscribed: 138 ↛ 141line 138 didn't jump to line 141 because the condition on line 138 was always true

139 proxy_count += 1 

140 

141 if backend in events: 

142 logging.debug("Backend event") 

143 event = backend.recv() 

144 if event[0] == 0: 

145 logging.info("Client unsubscribed.") 

146 elif event[0] == 1: 

147 logging.info("New client subscribed.") 

148 subscribed = True 

149 ch = event[1:].decode("utf-8") 

150 if ch in cache: 

151 logging.info("Send cache") 

152 backend.send_string(f"{unit_cooler.const.PUBSUB_CH} {cache[unit_cooler.const.PUBSUB_CH]}") 

153 proxy_count += 1 

154 else: 

155 logging.warning("Cache is empty") 

156 else: # pragma: no cover 

157 pass 

158 

159 if msg_count != 0: 159 ↛ 123line 159 didn't jump to line 123 because the condition on line 159 was always true

160 logging.debug("(proxy_count, msg_count) = (%d, %d)", proxy_count, msg_count) 

161 if proxy_count == msg_count: 

162 logging.info("Terminate, because the specified number of times has been reached.") 

163 break 

164 

165 frontend.close() 

166 backend.close() 

167 context.destroy() 

168 

169 logging.warning("Stop ZMQ proxy server") 

170 

171 

172if __name__ == "__main__": 

173 # TEST Code 

174 import os 

175 import threading 

176 

177 import docopt 

178 import my_lib.config 

179 import my_lib.logger 

180 import my_lib.pretty 

181 

182 import unit_cooler.actuator.subscribe 

183 import unit_cooler.const 

184 import unit_cooler.controller.engine 

185 

186 args = docopt.docopt(__doc__) 

187 

188 config_file = args["-c"] 

189 server_host = args["-s"] 

190 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"])) 

191 real_port = int(args["-r"]) 

192 msg_count = int(args["-n"]) 

193 speedup = int(args["-t"]) 

194 dummy_mode = args["-D"] 

195 debug_mode = args["-D"] 

196 

197 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

198 

199 config = my_lib.config.load(config_file) 

200 

201 proxy_thread = threading.Thread( 

202 target=start_proxy, 

203 args=(server_host, real_port, server_port, msg_count), 

204 ) 

205 proxy_thread.start() 

206 

207 server_thread = threading.Thread( 

208 target=start_server, 

209 args=( 

210 real_port, 

211 lambda: unit_cooler.controller.engine.gen_control_msg(config, dummy_mode, speedup), 

212 config["controller"]["interval_sec"] / speedup, 

213 msg_count, 

214 ), 

215 ) 

216 server_thread.start() 

217 

218 unit_cooler.actuator.subscribe.start_client( 

219 server_host, 

220 server_port, 

221 lambda message: logging.info("receive: %s", message), 

222 msg_count, 

223 ) 

224 

225 server_thread.join() 

226 proxy_thread.join()