Coverage for src/unit_cooler/actuator/worker.py: 98%

114 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 11:52 +0000

1#!/usr/bin/env python3 

2""" 

3アクチュエータで動作するワーカです。 

4 

5Usage: 

6 worker.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-n COUNT] [-t SPEEDUP] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml] 

10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost] 

11 -p PUB_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 2222] 

12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 1] 

13 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1] 

14 -D : デバッグモードで動作します。 

15""" 

16 

17import concurrent.futures 

18import logging 

19import pathlib 

20import threading 

21import time 

22import traceback 

23 

24import my_lib.footprint 

25 

26import unit_cooler.actuator.control 

27import unit_cooler.actuator.monitor 

28import unit_cooler.const 

29import unit_cooler.pubsub.subscribe 

30import unit_cooler.util 

31 

32# スレッドローカル変数として定義 

33thread_local = threading.local() 

34 

35 

36def get_last_control_message(): 

37 """スレッドローカルなlast_control_messageを取得""" 

38 if not hasattr(thread_local, "last_control_message"): 

39 thread_local.last_control_message = {"mode_index": -1, "state": unit_cooler.const.COOLING_STATE.IDLE} 

40 return thread_local.last_control_message 

41 

42 

43def set_last_control_message(message): 

44 """スレッドローカルなlast_control_messageを設定""" 

45 thread_local.last_control_message = message 

46 

47 

48should_terminate = threading.Event() 

49 

50 

51def queue_put(message_queue, message, liveness_file): 

52 message["state"] = unit_cooler.const.COOLING_STATE(message["state"]) 

53 

54 logging.info("Receive message: %s", message) 

55 

56 message_queue.put(message) 

57 my_lib.footprint.update(liveness_file) 

58 

59 

60def sleep_until_next_iter(start_time, interval_sec): 

61 sleep_sec = max(interval_sec - (time.time() - start_time), 0.5) 

62 logging.debug("Seep %.1f sec...", sleep_sec) 

63 time.sleep(sleep_sec) 

64 

65 

66# NOTE: コントローラから制御指示を受け取ってキューに積むワーカ 

67def subscribe_worker(config, control_host, pub_port, message_queue, liveness_file, msg_count=0): # noqa: PLR0913 

68 logging.info("Start actuator subscribe worker (%s:%d)", control_host, pub_port) 

69 ret = 0 

70 try: 

71 unit_cooler.pubsub.subscribe.start_client( 

72 control_host, 

73 pub_port, 

74 lambda message: queue_put(message_queue, message, liveness_file), 

75 msg_count, 

76 ) 

77 except Exception: 

78 logging.exception("Failed to receive control message") 

79 unit_cooler.util.notify_error(config, traceback.format_exc()) 

80 ret = -1 

81 

82 logging.warning("Stop subscribe worker") 

83 return ret 

84 

85 

86# NOTE: バルブの状態をモニタするワーカ 

87def monitor_worker(config, liveness_file, dummy_mode=False, speedup=1, msg_count=0): 

88 global should_terminate 

89 

90 logging.info("Start monitor worker") 

91 

92 interval_sec = config["actuator"]["monitor"]["interval_sec"] / speedup 

93 try: 

94 handle = unit_cooler.actuator.monitor.gen_handle(config, interval_sec) 

95 except Exception: 

96 logging.exception("Failed to create handle") 

97 

98 unit_cooler.actuator.work_log.add( 

99 "流量のロギングを開始できません。", unit_cooler.const.LOG_LEVEL.ERROR 

100 ) 

101 return -1 

102 

103 i = 0 

104 ret = 0 

105 try: 

106 while True: 

107 start_time = time.time() 

108 

109 need_logging = (i % handle["log_period"]) == 0 

110 i += 1 

111 

112 mist_condition = unit_cooler.actuator.monitor.get_mist_condition() 

113 unit_cooler.actuator.monitor.check(handle, mist_condition, need_logging) 

114 unit_cooler.actuator.monitor.send_mist_condition( 

115 handle, mist_condition, get_last_control_message(), dummy_mode 

116 ) 

117 

118 my_lib.footprint.update(liveness_file) 

119 

120 if should_terminate.is_set(): 

121 logging.info("Terminate monitor worker") 

122 break 

123 

124 if msg_count != 0: 124 ↛ 133line 124 didn't jump to line 133 because the condition on line 124 was always true

125 logging.debug("(monitor_count, msg_count) = (%d, %d)", handle["monitor_count"], msg_count) 

126 # NOTE: monitor_worker が先に終了しないようにする 

127 if handle["monitor_count"] >= (msg_count + 20): 

128 logging.info( 

129 "Terminate monitor worker, because the specified number of times has been reached." 

130 ) 

131 break 

132 

133 sleep_until_next_iter(start_time, interval_sec) 

134 except Exception: 

135 unit_cooler.util.notify_error(config, traceback.format_exc()) 

136 ret = -1 

137 

138 logging.warning("Stop monitor worker") 

139 return ret 

140 

141 

142# NOTE: バルブを制御するワーカ 

143def control_worker(config, message_queue, liveness_file, dummy_mode=False, speedup=1, msg_count=0): # noqa: PLR0913 

144 global should_terminate 

145 

146 logging.info("Start control worker") 

147 

148 if dummy_mode: 

149 logging.warning("DUMMY mode") 

150 

151 interval_sec = config["actuator"]["control"]["interval_sec"] / speedup 

152 handle = unit_cooler.actuator.control.gen_handle(config, message_queue) 

153 

154 ret = 0 

155 try: 

156 while True: 

157 start_time = time.time() 

158 

159 current_message = unit_cooler.actuator.control.get_control_message( 

160 handle, get_last_control_message() 

161 ) 

162 

163 set_last_control_message(current_message) 

164 

165 unit_cooler.actuator.control.execute(config, current_message) 

166 

167 my_lib.footprint.update(liveness_file) 

168 

169 if should_terminate.is_set(): 

170 logging.info("Terminate control worker") 

171 break 

172 

173 if msg_count != 0: 173 ↛ 179line 173 didn't jump to line 179 because the condition on line 173 was always true

174 logging.debug("(receive_count, msg_count) = (%d, %d)", handle["receive_count"], msg_count) 

175 if handle["receive_count"] >= msg_count: 

176 logging.info("Terminate control, because the specified number of times has been reached.") 

177 break 

178 

179 sleep_until_next_iter(start_time, interval_sec) 

180 except Exception: 

181 logging.exception("Failed to control valve") 

182 unit_cooler.util.notify_error(config, traceback.format_exc()) 

183 ret = -1 

184 

185 logging.warning("Stop control worker") 

186 # NOTE: Queue を close した後に put されると ValueError が発生するので、 

187 # 明示的に閉じるのをやめた。 

188 # message_queue.close() 

189 

190 return ret 

191 

192 

193def get_worker_def(config, message_queue, setting): 

194 return [ 

195 { 

196 "name": "subscribe_worker", 

197 "param": [ 

198 subscribe_worker, 

199 config, 

200 setting["control_host"], 

201 setting["pub_port"], 

202 message_queue, 

203 pathlib.Path(config["actuator"]["subscribe"]["liveness"]["file"]), 

204 setting["msg_count"], 

205 ], 

206 }, 

207 { 

208 "name": "monitor_worker", 

209 "param": [ 

210 monitor_worker, 

211 config, 

212 pathlib.Path(config["actuator"]["monitor"]["liveness"]["file"]), 

213 setting["dummy_mode"], 

214 setting["speedup"], 

215 setting["msg_count"], 

216 ], 

217 }, 

218 { 

219 "name": "control_worker", 

220 "param": [ 

221 control_worker, 

222 config, 

223 message_queue, 

224 pathlib.Path(config["actuator"]["control"]["liveness"]["file"]), 

225 setting["dummy_mode"], 

226 setting["speedup"], 

227 setting["msg_count"], 

228 ], 

229 }, 

230 ] 

231 

232 

233def start(executor, worker_def): 

234 global should_terminate 

235 

236 should_terminate.clear() 

237 thread_list = [] 

238 

239 for worker_info in worker_def: 

240 future = executor.submit(*worker_info["param"]) 

241 thread_list.append({"name": worker_info["name"], "future": future}) 

242 

243 return thread_list 

244 

245 

246def term(): 

247 global should_terminate 

248 

249 should_terminate.set() 

250 

251 

252if __name__ == "__main__": 

253 # TEST Code 

254 import multiprocessing 

255 import os 

256 import pathlib 

257 

258 import docopt 

259 import my_lib.config 

260 import my_lib.logger 

261 import my_lib.pretty 

262 import my_lib.webapp.config 

263 

264 import unit_cooler.actuator.valve 

265 

266 args = docopt.docopt(__doc__) 

267 

268 config_file = args["-c"] 

269 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"]) 

270 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"])) 

271 speedup = int(args["-t"]) 

272 msg_count = int(args["-n"]) 

273 debug_mode = args["-D"] 

274 

275 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

276 

277 config = my_lib.config.load(config_file) 

278 message_queue = multiprocessing.Queue() 

279 event_queue = multiprocessing.Queue() 

280 

281 os.environ["DUMMY_MODE"] = "true" 

282 

283 my_lib.webapp.config.init(config["actuator"]) 

284 my_lib.webapp.log.init(config) 

285 unit_cooler.actuator.work_log.init(config, event_queue) 

286 

287 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"]) 

288 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"]) 

289 

290 # NOTE: テストしやすいように、threading.Thread ではなく multiprocessing.pool.ThreadPool を使う 

291 executor = concurrent.futures.ThreadPoolExecutor() 

292 

293 setting = { 

294 "control_host": control_host, 

295 "pub_port": pub_port, 

296 "speedup": speedup, 

297 "msg_count": msg_count, 

298 "dummy_mode": True, 

299 } 

300 

301 thread_list = start(executor, get_worker_def(config, message_queue, setting)) 

302 

303 for thread_info in thread_list: 

304 logging.info("Wait %s finish", thread_info["name"]) 

305 

306 if thread_info["future"].result() != 0: 

307 logging.warning("Error occurred in %s", thread_info["name"]) 

308 

309 unit_cooler.actuator.work_log.term() 

310 

311 logging.info("Shutdown executor") 

312 executor.shutdown()