Coverage for src/sharp_hems/serial_pubsub.py: 88%
56 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
1#!/usr/bin/env python3
2"""
3センサーからのパケットを Pub-Sub パターンで配信します。
5Usage:
6 serial_pubsub.py -S [-t SERIAL_PORT] [-p SERVER_PORT] [-d]
7 serial_pubsub.py [-s SERVER_HOST] [-p SERVER_PORT] [-c CONFIG] [-d]
9Options:
10 -S : サーバーモードで動作します。
11 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost]
12 -t SERIAL_PORT : HEMS 中継器を接続するシリアルポートを指定します。 [default: /dev/ttyUSB0]
13 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444]
14 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml]
15 -d : デバッグモードで動作します。
16"""
18import logging
19import threading
21import my_lib.footprint
22import serial
23import zmq
25CH = "serial"
26SER_BAUD = 115200
27SER_TIMEOUT = 5
29should_terminate_server = threading.Event()
30should_terminate_client = threading.Event()
33def start_server(serial_port, server_port, liveness_file):
34 global should_terminate_server
35 logging.info("Start serial server...")
37 should_terminate_server.clear()
38 context = zmq.Context()
40 socket = context.socket(zmq.PUB)
41 socket.bind(f"tcp://*:{server_port}")
43 ser = serial.Serial(serial_port, SER_BAUD, timeout=SER_TIMEOUT)
45 logging.info("Server initialize done.")
47 while True:
48 if should_terminate_server.is_set():
49 break
51 header = ser.read(2)
53 if len(header) == 0: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 continue
55 elif len(header) == 1: # noqa: RET507
56 logging.debug("Short packet")
57 continue
59 header_hex = header.hex()
60 payload_hex = ser.read(header[1] + 5 - 2).hex()
62 logging.debug("send %s %s", header_hex, payload_hex)
63 socket.send_string(f"{CH} {header_hex} {payload_hex}")
65 my_lib.footprint.update(liveness_file)
67 logging.warning("Stop serial server")
70def stop_server():
71 global should_terminate_server
73 should_terminate_server.set()
76def start_client(server_host, server_port, handle, func):
77 global should_terminate_client
78 logging.info("Start serial client...")
80 should_terminate_client.clear()
81 socket = zmq.Context().socket(zmq.SUB)
82 socket.connect(f"tcp://{server_host}:{server_port}")
83 socket.setsockopt_string(zmq.SUBSCRIBE, CH)
84 socket.setsockopt(zmq.RCVTIMEO, 1000) # 1秒のタイムアウト
86 logging.info("Client initialize done.")
88 while True:
89 if should_terminate_client.is_set(): 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 logging.info("Terminate serial client")
91 break
93 try:
94 ch, header_hex, payload_hex = socket.recv_string().split(" ", 2)
95 logging.debug("recv %s %s", header_hex, payload_hex)
96 func(handle, bytes.fromhex(header_hex), bytes.fromhex(payload_hex))
97 except zmq.error.Again:
98 continue
100 logging.warning("Stop serial client")
103def stop_client():
104 global should_terminate_client
106 should_terminate_client.set()
109if __name__ == "__main__":
110 import pathlib
112 import docopt
113 import my_lib.config
114 import my_lib.logger
116 import sharp_hems.sniffer
118 args = docopt.docopt(__doc__)
120 config_file = args["-c"]
121 is_server_mode = args["-S"]
122 server_host = args["-s"]
123 server_port = int(args["-p"])
124 serial_port = args["-t"]
125 debug_mode = args["-d"]
127 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
129 config = my_lib.config.load(config_file)
131 dev_cache_file = pathlib.Path(config["device"]["cache"])
132 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"])
134 def log_data(data):
135 logging.info(data)
137 def process_packet(handle, header, payload):
138 sharp_hems.sniffer.process_packet(handle, header, payload, log_data)
140 if is_server_mode:
141 logging.info("Start server")
142 start_server(serial_port, server_port, liveness_file)
143 else:
144 logging.info("Start client")
145 start_client(server_host, server_port, {"device": {"cache": dev_cache_file}}, process_packet)