Coverage for src/sharp_hems_logger.py: 0%
79 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
1#!/usr/bin/env python3
2"""
3センサーから収集した消費電力データを Fluentd を使って送信します。
5Usage:
6 sharp_hmes_logger.py [-c CONFIG] [-s SERVER_HOST] [-p SERVER_PORT] [-n COUNT] [-d] [-D]
8Options:
9 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml]
10 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost]
11 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444]
12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 0]
13 -d : ダミーモードで動作します。
14 -D : デバッグモードで動作します。
15"""
17import logging
18import os
19import pathlib
20import signal
21import sys
23import my_lib.fluentd_util
24import my_lib.footprint
25import my_lib.pretty
27import sharp_hems.device
28import sharp_hems.notify
29import sharp_hems.serial_pubsub
30import sharp_hems.sniffer
31from sharp_hems.metrics.collector import MetricsCollector
33SCHEMA_CONFIG = "config.schema"
35# グローバル変数として保持(シグナルハンドラで使用)
36_metrics_collector = None
37_sender = None
40def record_metrics(metrics_collector, data):
41 """メトリクス収集を記録する"""
42 try:
43 name = sharp_hems.device.get_name(data["addr"])
44 if name is not None:
45 metrics_collector.record_heartbeat(name)
46 logging.debug("Recorded metrics for %s", name)
47 except Exception:
48 logging.exception("Failed to record metrics")
51def fluent_send(sender, label, field, data, liveness_file):
52 try:
53 name = sharp_hems.device.get_name(data["addr"])
55 if name is None:
56 logging.warning("Unknown device: dev_id = %s", data["dev_id_str"])
57 return
59 data = {
60 "hostname": name,
61 field: int(data["watt"]),
62 }
64 if my_lib.fluentd_util.send(sender, label, data):
65 logging.info("Send: %s", data)
66 my_lib.footprint.update(liveness_file)
67 else:
68 logging.error(sender.last_error)
69 except Exception:
70 sharp_hems.notify.error(config)
73def process_packet(handle, header, payload):
74 sharp_hems.device.reload(handle["device"]["define"])
76 if handle["dummy_mode"]:
78 def on_data_received(data):
79 logging.info(my_lib.pretty.format(data))
80 handle["packet"]["count"] += 1
81 if (handle["packet"]["max"] != 0) and (handle["packet"]["count"] >= handle["packet"]["max"]):
82 sharp_hems.serial_pubsub.stop_client()
83 else:
85 def on_data_received(data):
86 # Fluentdに送信
87 fluent_send(
88 handle["sender"],
89 config["fluentd"]["data"]["label"],
90 config["fluentd"]["data"]["field"],
91 data,
92 handle["liveness"],
93 )
94 # メトリクス収集も記録
95 if "metrics_collector" in handle:
96 record_metrics(handle["metrics_collector"], data)
98 sharp_hems.sniffer.process_packet(handle, header, payload, on_data_received)
101def cleanup():
102 """終了処理を実行します。"""
103 global _metrics_collector, _sender
105 logging.info("Starting cleanup process...")
107 sharp_hems.serial_pubsub.stop_client()
109 # メトリクスコレクターをクローズ
110 if _metrics_collector:
111 try:
112 _metrics_collector.close()
113 logging.info("Closed metrics collector")
114 except Exception:
115 logging.exception("Failed to close metrics collector")
117 # Fluentd senderをクローズ
118 if _sender:
119 try:
120 if hasattr(_sender, "close"):
121 _sender.close()
122 logging.info("Closed Fluentd sender")
123 except Exception:
124 logging.exception("Failed to close Fluentd sender")
126 logging.info("Cleanup completed")
129def sig_handler(num, frame): # noqa: ARG001
130 """シグナルハンドラー"""
131 logging.warning("Received signal %d", num)
133 if num in (signal.SIGTERM, signal.SIGINT):
134 cleanup()
135 sys.exit(0)
138def start(handle):
139 try:
140 sharp_hems.serial_pubsub.start_client(server_host, server_port, handle, process_packet)
141 except Exception:
142 sharp_hems.notify.error(config)
143 raise
144 finally:
145 cleanup()
148######################################################################
149if __name__ == "__main__":
150 import pathlib
152 import docopt
153 import my_lib.config
154 import my_lib.logger
156 args = docopt.docopt(__doc__)
158 config_file = args["-c"]
159 server_host = os.environ.get("HEMS_SERVER_HOST", args["-s"])
160 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"]))
161 count = int(args["-n"])
162 dummy_mode = os.environ.get("DUMMY_MODE", args["-d"])
163 debug_mode = args["-D"]
165 my_lib.logger.init("hems.wattmeter-sharp", level=logging.DEBUG if debug_mode else logging.INFO)
167 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG))
169 dev_define_file = pathlib.Path(config["device"]["define"])
170 dev_cache_file = pathlib.Path(config["device"]["cache"])
171 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"])
173 logging.info("Start HEMS logger (server: %s:%d)", server_host, server_port)
175 if dummy_mode:
176 logging.info("DUMMY MODE")
178 logging.info(
179 "Initialize Fluentd sender (host: %s, tag: %s)",
180 config["fluentd"]["host"],
181 config["fluentd"]["data"]["tag"],
182 )
183 sender = my_lib.fluentd_util.get_handle(config["fluentd"]["data"]["tag"], host=config["fluentd"]["host"])
184 _sender = sender # グローバル変数に保存(シグナルハンドラ用)
186 # メトリクスコレクターを初期化
187 metrics_collector = None
188 if "metrics" in config:
189 metrics_db_path = pathlib.Path(config["metrics"]["data"])
190 metrics_collector = MetricsCollector(metrics_db_path)
191 _metrics_collector = metrics_collector # グローバル変数に保存(シグナルハンドラ用)
192 logging.info("Initialize metrics collector (db: %s)", metrics_db_path)
194 # シグナルハンドラーを設定
195 signal.signal(signal.SIGTERM, sig_handler)
196 signal.signal(signal.SIGINT, sig_handler)
198 handle = {
199 "sender": sender,
200 "device": {
201 "define": dev_define_file,
202 "cache": dev_cache_file,
203 },
204 "data": {
205 "label": config["fluentd"]["data"]["label"],
206 "field": config["fluentd"]["data"]["field"],
207 },
208 "dummy_mode": dummy_mode,
209 "packet": {
210 "count": 0,
211 "max": count,
212 },
213 "liveness": liveness_file,
214 }
216 if metrics_collector:
217 handle["metrics_collector"] = metrics_collector
219 start(handle)