Coverage for src/sharp_hems_logger.py: 0%

79 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-19 01:39 +0900

1#!/usr/bin/env python3 

2""" 

3センサーから収集した消費電力データを Fluentd を使って送信します。 

4 

5Usage: 

6 sharp_hmes_logger.py [-c CONFIG] [-s SERVER_HOST] [-p SERVER_PORT] [-n COUNT] [-d] [-D] 

7 

8Options: 

9 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml] 

10 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost] 

11 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444] 

12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 0] 

13 -d : ダミーモードで動作します。 

14 -D : デバッグモードで動作します。 

15""" 

16 

17import logging 

18import os 

19import pathlib 

20import signal 

21import sys 

22 

23import my_lib.fluentd_util 

24import my_lib.footprint 

25import my_lib.pretty 

26 

27import sharp_hems.device 

28import sharp_hems.notify 

29import sharp_hems.serial_pubsub 

30import sharp_hems.sniffer 

31from sharp_hems.metrics.collector import MetricsCollector 

32 

33SCHEMA_CONFIG = "config.schema" 

34 

35# グローバル変数として保持(シグナルハンドラで使用) 

36_metrics_collector = None 

37_sender = None 

38 

39 

40def record_metrics(metrics_collector, data): 

41 """メトリクス収集を記録する""" 

42 try: 

43 name = sharp_hems.device.get_name(data["addr"]) 

44 if name is not None: 

45 metrics_collector.record_heartbeat(name) 

46 logging.debug("Recorded metrics for %s", name) 

47 except Exception: 

48 logging.exception("Failed to record metrics") 

49 

50 

51def fluent_send(sender, label, field, data, liveness_file): 

52 try: 

53 name = sharp_hems.device.get_name(data["addr"]) 

54 

55 if name is None: 

56 logging.warning("Unknown device: dev_id = %s", data["dev_id_str"]) 

57 return 

58 

59 data = { 

60 "hostname": name, 

61 field: int(data["watt"]), 

62 } 

63 

64 if my_lib.fluentd_util.send(sender, label, data): 

65 logging.info("Send: %s", data) 

66 my_lib.footprint.update(liveness_file) 

67 else: 

68 logging.error(sender.last_error) 

69 except Exception: 

70 sharp_hems.notify.error(config) 

71 

72 

73def process_packet(handle, header, payload): 

74 sharp_hems.device.reload(handle["device"]["define"]) 

75 

76 if handle["dummy_mode"]: 

77 

78 def on_data_received(data): 

79 logging.info(my_lib.pretty.format(data)) 

80 handle["packet"]["count"] += 1 

81 if (handle["packet"]["max"] != 0) and (handle["packet"]["count"] >= handle["packet"]["max"]): 

82 sharp_hems.serial_pubsub.stop_client() 

83 else: 

84 

85 def on_data_received(data): 

86 # Fluentdに送信 

87 fluent_send( 

88 handle["sender"], 

89 config["fluentd"]["data"]["label"], 

90 config["fluentd"]["data"]["field"], 

91 data, 

92 handle["liveness"], 

93 ) 

94 # メトリクス収集も記録 

95 if "metrics_collector" in handle: 

96 record_metrics(handle["metrics_collector"], data) 

97 

98 sharp_hems.sniffer.process_packet(handle, header, payload, on_data_received) 

99 

100 

101def cleanup(): 

102 """終了処理を実行します。""" 

103 global _metrics_collector, _sender 

104 

105 logging.info("Starting cleanup process...") 

106 

107 sharp_hems.serial_pubsub.stop_client() 

108 

109 # メトリクスコレクターをクローズ 

110 if _metrics_collector: 

111 try: 

112 _metrics_collector.close() 

113 logging.info("Closed metrics collector") 

114 except Exception: 

115 logging.exception("Failed to close metrics collector") 

116 

117 # Fluentd senderをクローズ 

118 if _sender: 

119 try: 

120 if hasattr(_sender, "close"): 

121 _sender.close() 

122 logging.info("Closed Fluentd sender") 

123 except Exception: 

124 logging.exception("Failed to close Fluentd sender") 

125 

126 logging.info("Cleanup completed") 

127 

128 

129def sig_handler(num, frame): # noqa: ARG001 

130 """シグナルハンドラー""" 

131 logging.warning("Received signal %d", num) 

132 

133 if num in (signal.SIGTERM, signal.SIGINT): 

134 cleanup() 

135 sys.exit(0) 

136 

137 

138def start(handle): 

139 try: 

140 sharp_hems.serial_pubsub.start_client(server_host, server_port, handle, process_packet) 

141 except Exception: 

142 sharp_hems.notify.error(config) 

143 raise 

144 finally: 

145 cleanup() 

146 

147 

148###################################################################### 

149if __name__ == "__main__": 

150 import pathlib 

151 

152 import docopt 

153 import my_lib.config 

154 import my_lib.logger 

155 

156 args = docopt.docopt(__doc__) 

157 

158 config_file = args["-c"] 

159 server_host = os.environ.get("HEMS_SERVER_HOST", args["-s"]) 

160 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"])) 

161 count = int(args["-n"]) 

162 dummy_mode = os.environ.get("DUMMY_MODE", args["-d"]) 

163 debug_mode = args["-D"] 

164 

165 my_lib.logger.init("hems.wattmeter-sharp", level=logging.DEBUG if debug_mode else logging.INFO) 

166 

167 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG)) 

168 

169 dev_define_file = pathlib.Path(config["device"]["define"]) 

170 dev_cache_file = pathlib.Path(config["device"]["cache"]) 

171 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"]) 

172 

173 logging.info("Start HEMS logger (server: %s:%d)", server_host, server_port) 

174 

175 if dummy_mode: 

176 logging.info("DUMMY MODE") 

177 

178 logging.info( 

179 "Initialize Fluentd sender (host: %s, tag: %s)", 

180 config["fluentd"]["host"], 

181 config["fluentd"]["data"]["tag"], 

182 ) 

183 sender = my_lib.fluentd_util.get_handle(config["fluentd"]["data"]["tag"], host=config["fluentd"]["host"]) 

184 _sender = sender # グローバル変数に保存(シグナルハンドラ用) 

185 

186 # メトリクスコレクターを初期化 

187 metrics_collector = None 

188 if "metrics" in config: 

189 metrics_db_path = pathlib.Path(config["metrics"]["data"]) 

190 metrics_collector = MetricsCollector(metrics_db_path) 

191 _metrics_collector = metrics_collector # グローバル変数に保存(シグナルハンドラ用) 

192 logging.info("Initialize metrics collector (db: %s)", metrics_db_path) 

193 

194 # シグナルハンドラーを設定 

195 signal.signal(signal.SIGTERM, sig_handler) 

196 signal.signal(signal.SIGINT, sig_handler) 

197 

198 handle = { 

199 "sender": sender, 

200 "device": { 

201 "define": dev_define_file, 

202 "cache": dev_cache_file, 

203 }, 

204 "data": { 

205 "label": config["fluentd"]["data"]["label"], 

206 "field": config["fluentd"]["data"]["field"], 

207 }, 

208 "dummy_mode": dummy_mode, 

209 "packet": { 

210 "count": 0, 

211 "max": count, 

212 }, 

213 "liveness": liveness_file, 

214 } 

215 

216 if metrics_collector: 

217 handle["metrics_collector"] = metrics_collector 

218 

219 start(handle)