Coverage for src/unit_cooler/pubsub/subscribe.py: 90%

35 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 12:29 +0000

1#!/usr/bin/env python3 

2import json 

3import logging 

4 

5import zmq 

6 

7import unit_cooler.const 

8 

9 

10def start_client(server_host, server_port, func, msg_count=0, should_terminate=None): 

11 logging.info("Start ZMQ client...") 

12 

13 context = zmq.Context() 

14 socket = context.socket(zmq.SUB) 

15 target = f"tcp://{server_host}:{server_port}" 

16 socket.connect(target) 

17 socket.setsockopt_string(zmq.SUBSCRIBE, unit_cooler.const.PUBSUB_CH) 

18 

19 # ノンブロッキング受信のためにタイムアウトを設定 

20 socket.setsockopt(zmq.RCVTIMEO, 1000) # 1秒タイムアウト 

21 

22 logging.info("Client initialize done.") 

23 

24 receive_count = 0 

25 while True: 

26 # 終了フラグをチェック 

27 if should_terminate and should_terminate.is_set(): 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 logging.info("Terminate signal received, stopping ZMQ client") 

29 break 

30 

31 try: 

32 ch, json_str = socket.recv_string().split(" ", 1) 

33 json_data = json.loads(json_str) 

34 logging.debug("recv %s", json_data) 

35 func(json_data) 

36 

37 if msg_count != 0: 37 ↛ 25line 37 didn't jump to line 25 because the condition on line 37 was always true

38 receive_count += 1 

39 logging.debug("(receive_count, msg_count) = (%d, %d)", receive_count, msg_count) 

40 if receive_count == msg_count: 

41 logging.info("Terminate, because the specified number of times has been reached.") 

42 break 

43 except zmq.Again: 

44 # タイムアウト時は継続してループを回す 

45 continue 

46 

47 logging.warning("Stop ZMQ client") 

48 

49 socket.disconnect(target) 

50 socket.close() 

51 context.destroy()