Coverage for src/sharp_hems/serial_pubsub.py: 88%

56 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-19 01:39 +0900

1#!/usr/bin/env python3 

2""" 

3センサーからのパケットを Pub-Sub パターンで配信します。 

4 

5Usage: 

6 serial_pubsub.py -S [-t SERIAL_PORT] [-p SERVER_PORT] [-d] 

7 serial_pubsub.py [-s SERVER_HOST] [-p SERVER_PORT] [-c CONFIG] [-d] 

8 

9Options: 

10 -S : サーバーモードで動作します。 

11 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost] 

12 -t SERIAL_PORT : HEMS 中継器を接続するシリアルポートを指定します。 [default: /dev/ttyUSB0] 

13 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444] 

14 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml] 

15 -d : デバッグモードで動作します。 

16""" 

17 

18import logging 

19import threading 

20 

21import my_lib.footprint 

22import serial 

23import zmq 

24 

25CH = "serial" 

26SER_BAUD = 115200 

27SER_TIMEOUT = 5 

28 

29should_terminate_server = threading.Event() 

30should_terminate_client = threading.Event() 

31 

32 

33def start_server(serial_port, server_port, liveness_file): 

34 global should_terminate_server 

35 logging.info("Start serial server...") 

36 

37 should_terminate_server.clear() 

38 context = zmq.Context() 

39 

40 socket = context.socket(zmq.PUB) 

41 socket.bind(f"tcp://*:{server_port}") 

42 

43 ser = serial.Serial(serial_port, SER_BAUD, timeout=SER_TIMEOUT) 

44 

45 logging.info("Server initialize done.") 

46 

47 while True: 

48 if should_terminate_server.is_set(): 

49 break 

50 

51 header = ser.read(2) 

52 

53 if len(header) == 0: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 continue 

55 elif len(header) == 1: # noqa: RET507 

56 logging.debug("Short packet") 

57 continue 

58 

59 header_hex = header.hex() 

60 payload_hex = ser.read(header[1] + 5 - 2).hex() 

61 

62 logging.debug("send %s %s", header_hex, payload_hex) 

63 socket.send_string(f"{CH} {header_hex} {payload_hex}") 

64 

65 my_lib.footprint.update(liveness_file) 

66 

67 logging.warning("Stop serial server") 

68 

69 

70def stop_server(): 

71 global should_terminate_server 

72 

73 should_terminate_server.set() 

74 

75 

76def start_client(server_host, server_port, handle, func): 

77 global should_terminate_client 

78 logging.info("Start serial client...") 

79 

80 should_terminate_client.clear() 

81 socket = zmq.Context().socket(zmq.SUB) 

82 socket.connect(f"tcp://{server_host}:{server_port}") 

83 socket.setsockopt_string(zmq.SUBSCRIBE, CH) 

84 socket.setsockopt(zmq.RCVTIMEO, 1000) # 1秒のタイムアウト 

85 

86 logging.info("Client initialize done.") 

87 

88 while True: 

89 if should_terminate_client.is_set(): 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 logging.info("Terminate serial client") 

91 break 

92 

93 try: 

94 ch, header_hex, payload_hex = socket.recv_string().split(" ", 2) 

95 logging.debug("recv %s %s", header_hex, payload_hex) 

96 func(handle, bytes.fromhex(header_hex), bytes.fromhex(payload_hex)) 

97 except zmq.error.Again: 

98 continue 

99 

100 logging.warning("Stop serial client") 

101 

102 

103def stop_client(): 

104 global should_terminate_client 

105 

106 should_terminate_client.set() 

107 

108 

109if __name__ == "__main__": 

110 import pathlib 

111 

112 import docopt 

113 import my_lib.config 

114 import my_lib.logger 

115 

116 import sharp_hems.sniffer 

117 

118 args = docopt.docopt(__doc__) 

119 

120 config_file = args["-c"] 

121 is_server_mode = args["-S"] 

122 server_host = args["-s"] 

123 server_port = int(args["-p"]) 

124 serial_port = args["-t"] 

125 debug_mode = args["-d"] 

126 

127 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

128 

129 config = my_lib.config.load(config_file) 

130 

131 dev_cache_file = pathlib.Path(config["device"]["cache"]) 

132 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"]) 

133 

134 def log_data(data): 

135 logging.info(data) 

136 

137 def process_packet(handle, header, payload): 

138 sharp_hems.sniffer.process_packet(handle, header, payload, log_data) 

139 

140 if is_server_mode: 

141 logging.info("Start server") 

142 start_server(serial_port, server_port, liveness_file) 

143 else: 

144 logging.info("Start client") 

145 start_client(server_host, server_port, {"device": {"cache": dev_cache_file}}, process_packet)