Coverage for src/sharp_hems/sniffer.py: 69%
82 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-19 01:39 +0900
1#!/usr/bin/env python3
2import json
3import logging
4import pickle
5import struct
7counter_hist = {}
8ieee_addr_list = []
10# 電力に掛ける倍率
11# NOTE:
12# 電力会社のスマートメータの読み値と比較すると常に電力が小さいので、
13# 一定の倍率を掛ける。
14WATT_SCALE = 1.5
17def dev_id_map_load(dev_cache_file):
18 if dev_cache_file.exists(): 18 ↛ 22line 18 didn't jump to line 22 because the condition on line 18 was always true
19 with dev_cache_file.open("rb") as f:
20 return pickle.load(f) # noqa: S301
21 else:
22 return {}
25def dev_id_map_store(dev_cache_file, dev_id_map):
26 logging.info("Store dev_id_map")
28 with dev_cache_file.open("wb") as f:
29 pickle.dump(dev_id_map, f)
32def dump_packet(data):
33 return ",".join(f"{x:02X}" for x in list(data))
36def parse_packet_ieee_addr(packet):
37 return ":".join(f"{x:02X}" for x in reversed(list(packet[4:12])))
40def parse_packet_dev_id(packet):
41 dev_id = struct.unpack("<H", packet[4:6])[0]
42 index = packet[6]
44 return {
45 "dev_id": dev_id,
46 "index": index,
47 }
50def parse_packet_measure(packet, dev_id_map):
51 global counter_hist
53 dev_id = struct.unpack("<H", packet[5:7])[0]
54 counter = packet[14]
55 cur_time = struct.unpack("<H", packet[19:21])[0]
56 cur_power = struct.unpack("<I", packet[26:30])[0]
57 pre_time = struct.unpack("<H", packet[35:37])[0]
58 pre_power = struct.unpack("<I", packet[42:46])[0]
60 if dev_id in dev_id_map: 60 ↛ 63line 60 didn't jump to line 63 because the condition on line 60 was always true
61 addr = dev_id_map[dev_id]
62 else:
63 addr = "UNKNOWN"
64 logging.warning("dev_id = %s is unknown", f"0x{dev_id:04X}")
65 logging.warning("dev_ip_map = %s", json.dumps(dev_id_map, indent=4))
67 # NOTE: 同じデータが2回送られることがあるので、新しいデータ毎にインクリメント
68 # しているフィールドを使ってはじく
69 if dev_id in counter_hist and counter_hist[dev_id] == counter:
70 logging.info("Packet duplication detected")
71 return None
72 counter_hist[dev_id] = counter
74 dif_time = cur_time - pre_time
75 if dif_time < 0: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 dif_time += 0x10000
77 if dif_time == 0: 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 logging.info("Packet duplication detected")
79 return None
81 dif_power = cur_power - pre_power
82 if dif_power < 0: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 dif_power += 0x100000000
85 data = {
86 "addr": addr,
87 "dev_id": dev_id,
88 "dev_id_str": f"0x{dev_id:04X}",
89 "cur_time": cur_time,
90 "cur_power": cur_power,
91 "pre_time": pre_time,
92 "pre_power": pre_power,
93 "watt": round(float(dif_power) / dif_time * WATT_SCALE, 2),
94 }
96 logging.debug("Receive packet: %s", data)
98 return data
101def process_packet(handle, header, payload, on_capture):
102 global ieee_addr_list # noqa: PLW0603
104 dev_id_map = dev_id_map_load(handle["device"]["cache"])
106 if header[1] == 0x08:
107 logging.debug("IEEE addr payload: %s", dump_packet(payload))
108 ieee_addr_list.append(parse_packet_ieee_addr(header + payload))
109 elif header[1] == 0x12:
110 logging.debug("Dev ID payload: %s", dump_packet(payload))
111 data = parse_packet_dev_id(header + payload)
112 if data["index"] < len(ieee_addr_list): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 if data["dev_id"] not in dev_id_map:
114 logging.info("Find IEEE addr for dev_id=%s", "0x{:04X}".format(data["dev_id"]))
115 dev_id_map[data["dev_id"]] = ieee_addr_list[data["index"]]
116 dev_id_map_store(handle["device"]["cache"], dev_id_map)
117 elif dev_id_map[data["dev_id"]] != ieee_addr_list[data["index"]]:
118 logging.info("Update IEEE addr for dev_id=%s", "0x{:04X}".format(data["dev_id"]))
119 dev_id_map[data["dev_id"]] = ieee_addr_list[data["index"]]
120 dev_id_map_store(handle["device"]["cache"], dev_id_map)
121 else:
122 logging.warning("Unable to identify IEEE addr for dev_id=%s", "0x{:04X}".format(data["dev_id"]))
124 if data["index"] == (len(ieee_addr_list) - 1): 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was never true
125 # NOTE: 次の周期に備えてリストをクリアする
126 logging.debug("Clear IEEE addr list")
127 ieee_addr_list = []
129 elif header[1] == 0x2C:
130 try:
131 logging.debug("Measure payload: %s", dump_packet(payload))
132 data = parse_packet_measure(header + payload, dev_id_map)
133 if data is not None:
134 on_capture(data)
135 except Exception:
136 logging.warning("Invalid packet: %s", dump_packet(header + payload))
138 else:
139 logging.debug("Unknown packet: %s", dump_packet(header + payload))