Coverage for src/unit_cooler/actuator/worker.py: 94%

158 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 14:35 +0000

1#!/usr/bin/env python3 

2""" 

3アクチュエータで動作するワーカです。 

4 

5Usage: 

6 worker.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-n COUNT] [-t SPEEDUP] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml] 

10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost] 

11 -p PUB_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 2222] 

12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 1] 

13 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1] 

14 -D : デバッグモードで動作します。 

15""" 

16 

17import concurrent.futures 

18import logging 

19import os 

20import pathlib 

21import threading 

22import time 

23import traceback 

24 

25import my_lib.footprint 

26 

27import unit_cooler.actuator.control 

28import unit_cooler.actuator.monitor 

29import unit_cooler.const 

30import unit_cooler.pubsub.subscribe 

31import unit_cooler.util 

32 

33# グローバル辞書(pytestワーカー毎に独立) 

34_control_messages = {} 

35_should_terminate = {} 

36 

37# メッセージの初期値 

38MESSAGE_INIT = {"mode_index": 0, "state": unit_cooler.const.COOLING_STATE.IDLE} 

39 

40 

41def get_worker_id(): 

42 return os.environ.get("PYTEST_XDIST_WORKER", "") 

43 

44 

45def get_last_control_message(): 

46 """グローバル辞書からlast_control_messageを取得""" 

47 worker_id = get_worker_id() 

48 if worker_id not in _control_messages: 

49 set_last_control_message(MESSAGE_INIT.copy()) 

50 

51 return _control_messages[worker_id] 

52 

53 

54def set_last_control_message(message): 

55 """グローバル辞書にlast_control_messageを設定""" 

56 _control_messages[get_worker_id()] = message 

57 

58 

59def get_should_terminate(): 

60 return _should_terminate.get(get_worker_id(), None) 

61 

62 

63def init_should_terminate(): 

64 should_terminate = get_should_terminate() 

65 

66 if should_terminate is None: 

67 _should_terminate[get_worker_id()] = threading.Event() 

68 

69 else: 

70 should_terminate.clear() 

71 

72 

73def collect_environmental_metrics(config, current_message): 

74 """環境データのメトリクス収集""" 

75 from unit_cooler.metrics import get_metrics_collector 

76 

77 try: 

78 metrics_db_path = config["actuator"]["metrics"]["data"] 

79 metrics_collector = get_metrics_collector(metrics_db_path) 

80 

81 # current_messageのsense_dataからセンサーデータを取得 

82 sense_data = current_message.get("sense_data", {}) 

83 

84 if sense_data: 

85 # 各センサーデータの最新値を取得 

86 temperature = None 

87 humidity = None 

88 lux = None 

89 solar_radiation = None 

90 rain_amount = None 

91 

92 if sense_data.get("temp") and len(sense_data["temp"]) > 0: 92 ↛ 94line 92 didn't jump to line 94 because the condition on line 92 was always true

93 temperature = sense_data["temp"][0].get("value") 

94 if sense_data.get("humi") and len(sense_data["humi"]) > 0: 94 ↛ 96line 94 didn't jump to line 96 because the condition on line 94 was always true

95 humidity = sense_data["humi"][0].get("value") 

96 if sense_data.get("lux") and len(sense_data["lux"]) > 0: 96 ↛ 98line 96 didn't jump to line 98 because the condition on line 96 was always true

97 lux = sense_data["lux"][0].get("value") 

98 if sense_data.get("solar_rad") and len(sense_data["solar_rad"]) > 0: 98 ↛ 102line 98 didn't jump to line 102 because the condition on line 98 was always true

99 solar_radiation = sense_data["solar_rad"][0].get("value") 

100 logging.debug("Solar radiation data found: %s W/m²", solar_radiation) 

101 else: 

102 logging.debug( 

103 "No solar radiation data in sense_data: %s", 

104 list(sense_data.keys()) if sense_data else "empty", 

105 ) 

106 if sense_data.get("rain") and len(sense_data["rain"]) > 0: 106 ↛ 110line 106 didn't jump to line 110 because the condition on line 106 was always true

107 rain_amount = sense_data["rain"][0].get("value") 

108 

109 # 環境データをメトリクスに記録 

110 metrics_collector.update_environmental_data( 

111 temperature, humidity, lux, solar_radiation, rain_amount 

112 ) 

113 

114 except Exception: 

115 logging.exception("Failed to collect environmental metrics") 

116 

117 

118def queue_put(message_queue, message, liveness_file): 

119 message["state"] = unit_cooler.const.COOLING_STATE(message["state"]) 

120 

121 logging.info("Receive message: %s", message) 

122 

123 message_queue.put(message) 

124 my_lib.footprint.update(liveness_file) 

125 

126 

127def sleep_until_next_iter(start_time, interval_sec): 

128 sleep_sec = max(interval_sec - (time.time() - start_time), 0.5) 

129 logging.debug("Seep %.1f sec...", sleep_sec) 

130 

131 # should_terminate が設定されるまで待機(最大 sleep_sec 秒) 

132 get_should_terminate().wait(timeout=sleep_sec) 

133 

134 

135# NOTE: コントローラから制御指示を受け取ってキューに積むワーカ 

136def subscribe_worker(config, control_host, pub_port, message_queue, liveness_file, msg_count=0): # noqa: PLR0913 

137 logging.info("Start actuator subscribe worker (%s:%d)", control_host, pub_port) 

138 ret = 0 

139 try: 

140 unit_cooler.pubsub.subscribe.start_client( 

141 control_host, 

142 pub_port, 

143 lambda message: queue_put(message_queue, message, liveness_file), 

144 msg_count, 

145 ) 

146 except Exception: 

147 logging.exception("Failed to receive control message") 

148 unit_cooler.util.notify_error(config, traceback.format_exc()) 

149 ret = -1 

150 

151 logging.warning("Stop subscribe worker") 

152 return ret 

153 

154 

155# NOTE: バルブの状態をモニタするワーカ 

156def monitor_worker(config, liveness_file, dummy_mode=False, speedup=1, msg_count=0): 

157 logging.info("Start monitor worker") 

158 

159 interval_sec = config["actuator"]["monitor"]["interval_sec"] / speedup 

160 try: 

161 handle = unit_cooler.actuator.monitor.gen_handle(config, interval_sec) 

162 except Exception: 

163 logging.exception("Failed to create handle") 

164 

165 unit_cooler.actuator.work_log.add( 

166 "流量のロギングを開始できません。", unit_cooler.const.LOG_LEVEL.ERROR 

167 ) 

168 return -1 

169 

170 i = 0 

171 ret = 0 

172 try: 

173 while True: 

174 start_time = time.time() 

175 

176 need_logging = (i % handle["log_period"]) == 0 

177 i += 1 

178 

179 mist_condition = unit_cooler.actuator.monitor.get_mist_condition() 

180 unit_cooler.actuator.monitor.check(handle, mist_condition, need_logging) 

181 unit_cooler.actuator.monitor.send_mist_condition( 

182 handle, mist_condition, get_last_control_message(), dummy_mode 

183 ) 

184 

185 my_lib.footprint.update(liveness_file) 

186 

187 if get_should_terminate().is_set(): 

188 logging.info("Terminate monitor worker") 

189 break 

190 

191 if msg_count != 0: 191 ↛ 200line 191 didn't jump to line 200 because the condition on line 191 was always true

192 logging.debug("(monitor_count, msg_count) = (%d, %d)", handle["monitor_count"], msg_count) 

193 # NOTE: monitor_worker が先に終了しないようにする 

194 if handle["monitor_count"] >= (msg_count + 20): 

195 logging.info( 

196 "Terminate monitor worker, because the specified number of times has been reached." 

197 ) 

198 break 

199 

200 sleep_until_next_iter(start_time, interval_sec) 

201 except Exception: 

202 unit_cooler.util.notify_error(config, traceback.format_exc()) 

203 ret = -1 

204 

205 logging.warning("Stop monitor worker") 

206 return ret 

207 

208 

209# NOTE: バルブを制御するワーカ 

210def control_worker(config, message_queue, liveness_file, dummy_mode=False, speedup=1, msg_count=0): # noqa: PLR0913 

211 logging.info("Start control worker") 

212 

213 if dummy_mode: 

214 logging.warning("DUMMY mode") 

215 

216 interval_sec = config["actuator"]["control"]["interval_sec"] / speedup 

217 handle = unit_cooler.actuator.control.gen_handle(config, message_queue) 

218 

219 ret = 0 

220 try: 

221 while True: 

222 start_time = time.time() 

223 

224 current_message = unit_cooler.actuator.control.get_control_message( 

225 handle, get_last_control_message() 

226 ) 

227 

228 set_last_control_message(current_message) 

229 

230 unit_cooler.actuator.control.execute(config, current_message) 

231 

232 # 環境データのメトリクス収集(定期的に実行) 

233 try: 

234 collect_environmental_metrics(config, current_message) 

235 except Exception: 

236 logging.debug("Failed to collect environmental metrics") 

237 

238 my_lib.footprint.update(liveness_file) 

239 

240 if get_should_terminate().is_set(): 

241 logging.info("Terminate control worker") 

242 break 

243 

244 if msg_count != 0: 244 ↛ 250line 244 didn't jump to line 250 because the condition on line 244 was always true

245 logging.debug("(receive_count, msg_count) = (%d, %d)", handle["receive_count"], msg_count) 

246 if handle["receive_count"] >= msg_count: 

247 logging.info("Terminate control, because the specified number of times has been reached.") 

248 break 

249 

250 sleep_until_next_iter(start_time, interval_sec) 

251 except Exception: 

252 logging.exception("Failed to control valve") 

253 unit_cooler.util.notify_error(config, traceback.format_exc()) 

254 ret = -1 

255 

256 logging.warning("Stop control worker") 

257 # NOTE: Queue を close した後に put されると ValueError が発生するので、 

258 # 明示的に閉じるのをやめた。 

259 # message_queue.close() 

260 

261 return ret 

262 

263 

264def get_worker_def(config, message_queue, setting): 

265 return [ 

266 { 

267 "name": "subscribe_worker", 

268 "param": [ 

269 subscribe_worker, 

270 config, 

271 setting["control_host"], 

272 setting["pub_port"], 

273 message_queue, 

274 pathlib.Path(config["actuator"]["subscribe"]["liveness"]["file"]), 

275 setting["msg_count"], 

276 ], 

277 }, 

278 { 

279 "name": "monitor_worker", 

280 "param": [ 

281 monitor_worker, 

282 config, 

283 pathlib.Path(config["actuator"]["monitor"]["liveness"]["file"]), 

284 setting["dummy_mode"], 

285 setting["speedup"], 

286 setting["msg_count"], 

287 ], 

288 }, 

289 { 

290 "name": "control_worker", 

291 "param": [ 

292 control_worker, 

293 config, 

294 message_queue, 

295 pathlib.Path(config["actuator"]["control"]["liveness"]["file"]), 

296 setting["dummy_mode"], 

297 setting["speedup"], 

298 setting["msg_count"], 

299 ], 

300 }, 

301 ] 

302 

303 

304def start(executor, worker_def): 

305 init_should_terminate() 

306 thread_list = [] 

307 

308 for worker_info in worker_def: 

309 future = executor.submit(*worker_info["param"]) 

310 thread_list.append({"name": worker_info["name"], "future": future}) 

311 

312 return thread_list 

313 

314 

315def term(): 

316 logging.info("Terminate actuator worker") 

317 get_should_terminate().set() 

318 

319 

320if __name__ == "__main__": 

321 # TEST Code 

322 import multiprocessing 

323 import os 

324 import pathlib 

325 

326 import docopt 

327 import my_lib.config 

328 import my_lib.logger 

329 import my_lib.pretty 

330 import my_lib.webapp.config 

331 

332 import unit_cooler.actuator.valve 

333 

334 args = docopt.docopt(__doc__) 

335 

336 config_file = args["-c"] 

337 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"]) 

338 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"])) 

339 speedup = int(args["-t"]) 

340 msg_count = int(args["-n"]) 

341 debug_mode = args["-D"] 

342 

343 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

344 

345 config = my_lib.config.load(config_file) 

346 message_queue = multiprocessing.Queue() 

347 event_queue = multiprocessing.Queue() 

348 

349 os.environ["DUMMY_MODE"] = "true" 

350 

351 my_lib.webapp.config.init(config["actuator"]) 

352 my_lib.webapp.log.init(config) 

353 unit_cooler.actuator.work_log.init(config, event_queue) 

354 

355 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"], config) 

356 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"]) 

357 

358 # NOTE: テストしやすいように、threading.Thread ではなく multiprocessing.pool.ThreadPool を使う 

359 executor = concurrent.futures.ThreadPoolExecutor() 

360 

361 setting = { 

362 "control_host": control_host, 

363 "pub_port": pub_port, 

364 "speedup": speedup, 

365 "msg_count": msg_count, 

366 "dummy_mode": True, 

367 } 

368 

369 thread_list = start(executor, get_worker_def(config, message_queue, setting)) 

370 

371 for thread_info in thread_list: 

372 logging.info("Wait %s finish", thread_info["name"]) 

373 

374 if thread_info["future"].result() != 0: 

375 logging.warning("Error occurred in %s", thread_info["name"]) 

376 

377 unit_cooler.actuator.work_log.term() 

378 

379 logging.info("Shutdown executor") 

380 executor.shutdown()