Coverage for src/sharp_hems_dump.py: 0%
27 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
1#!/usr/bin/env python3
2"""
3センサーのシリアル出力をダンプします。
5Usage:
6 sharp_hmes_dump.py [-c CONFIG] [-s SERVER_HOST] [-p SERVER_PORT] [-o FILE] [-D]
8Options:
9 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml]
10 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost]
11 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444]
12 -o FILE : 出力ファイル名 [default: packet.dump]
13 -D : デバッグモードで動作します。
14"""
16import logging
17import os
18import pathlib
19import time
21import my_lib.fluentd_util
22import my_lib.footprint
23import my_lib.serializer
25import sharp_hems.device
26import sharp_hems.notify
27import sharp_hems.serial_pubsub
28import sharp_hems.sniffer
30SCHEMA_CONFIG = "config.schema"
32start_time = None
33packet_list = []
36def process_packet(handle, header, payload):
37 global start_time # noqa: PLW0603
38 global packet_list
40 now = time.time()
41 if start_time is None:
42 start_time = now
44 packet_list.append([now - start_time, header, payload])
45 logging.info("Receive %d packet(s) ", len(packet_list))
47 my_lib.serializer.store(handle["dump_file"], packet_list)
50def start(handle):
51 try:
52 sharp_hems.serial_pubsub.start_client(server_host, server_port, handle, process_packet)
53 except:
54 sharp_hems.notify.error(config)
55 raise
58######################################################################
59if __name__ == "__main__":
60 import pathlib
62 import docopt
63 import my_lib.config
64 import my_lib.logger
66 args = docopt.docopt(__doc__)
68 config_file = args["-c"]
69 server_host = os.environ.get("HEMS_SERVER_HOST", args["-s"])
70 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"]))
71 dump_file = args["-o"]
72 debug_mode = args["-D"]
74 my_lib.logger.init("hems.wattmeter-sharp", level=logging.DEBUG if debug_mode else logging.INFO)
76 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG))
78 dev_define_file = pathlib.Path(config["device"]["define"])
79 dev_cache_file = pathlib.Path(config["device"]["cache"])
80 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"])
82 logging.info("Start HEMS logger (server: %s:%d)", server_host, server_port)
84 logging.info(
85 "Initialize Fluentd sender (host: %s, tag: %s)",
86 config["fluentd"]["host"],
87 config["fluentd"]["data"]["tag"],
88 )
89 sender = my_lib.fluentd_util.get_handle(config["fluentd"]["data"]["tag"], host=config["fluentd"]["host"])
91 start(
92 {
93 "sender": sender,
94 "dump_file": dump_file,
95 }
96 )