Coverage for src/unit_cooler/pubsub/subscribe.py: 90%
35 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 12:29 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 12:29 +0000
1#!/usr/bin/env python3
2import json
3import logging
5import zmq
7import unit_cooler.const
10def start_client(server_host, server_port, func, msg_count=0, should_terminate=None):
11 logging.info("Start ZMQ client...")
13 context = zmq.Context()
14 socket = context.socket(zmq.SUB)
15 target = f"tcp://{server_host}:{server_port}"
16 socket.connect(target)
17 socket.setsockopt_string(zmq.SUBSCRIBE, unit_cooler.const.PUBSUB_CH)
19 # ノンブロッキング受信のためにタイムアウトを設定
20 socket.setsockopt(zmq.RCVTIMEO, 1000) # 1秒タイムアウト
22 logging.info("Client initialize done.")
24 receive_count = 0
25 while True:
26 # 終了フラグをチェック
27 if should_terminate and should_terminate.is_set(): 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 logging.info("Terminate signal received, stopping ZMQ client")
29 break
31 try:
32 ch, json_str = socket.recv_string().split(" ", 1)
33 json_data = json.loads(json_str)
34 logging.debug("recv %s", json_data)
35 func(json_data)
37 if msg_count != 0: 37 ↛ 25line 37 didn't jump to line 25 because the condition on line 37 was always true
38 receive_count += 1
39 logging.debug("(receive_count, msg_count) = (%d, %d)", receive_count, msg_count)
40 if receive_count == msg_count:
41 logging.info("Terminate, because the specified number of times has been reached.")
42 break
43 except zmq.Again:
44 # タイムアウト時は継続してループを回す
45 continue
47 logging.warning("Stop ZMQ client")
49 socket.disconnect(target)
50 socket.close()
51 context.destroy()