Coverage for src/sharp_hems/sniffer.py: 69%

82 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-19 01:39 +0900

1#!/usr/bin/env python3 

2import json 

3import logging 

4import pickle 

5import struct 

6 

7counter_hist = {} 

8ieee_addr_list = [] 

9 

10# 電力に掛ける倍率 

11# NOTE: 

12# 電力会社のスマートメータの読み値と比較すると常に電力が小さいので、 

13# 一定の倍率を掛ける。 

14WATT_SCALE = 1.5 

15 

16 

17def dev_id_map_load(dev_cache_file): 

18 if dev_cache_file.exists(): 18 ↛ 22line 18 didn't jump to line 22 because the condition on line 18 was always true

19 with dev_cache_file.open("rb") as f: 

20 return pickle.load(f) # noqa: S301 

21 else: 

22 return {} 

23 

24 

25def dev_id_map_store(dev_cache_file, dev_id_map): 

26 logging.info("Store dev_id_map") 

27 

28 with dev_cache_file.open("wb") as f: 

29 pickle.dump(dev_id_map, f) 

30 

31 

32def dump_packet(data): 

33 return ",".join(f"{x:02X}" for x in list(data)) 

34 

35 

36def parse_packet_ieee_addr(packet): 

37 return ":".join(f"{x:02X}" for x in reversed(list(packet[4:12]))) 

38 

39 

40def parse_packet_dev_id(packet): 

41 dev_id = struct.unpack("<H", packet[4:6])[0] 

42 index = packet[6] 

43 

44 return { 

45 "dev_id": dev_id, 

46 "index": index, 

47 } 

48 

49 

50def parse_packet_measure(packet, dev_id_map): 

51 global counter_hist 

52 

53 dev_id = struct.unpack("<H", packet[5:7])[0] 

54 counter = packet[14] 

55 cur_time = struct.unpack("<H", packet[19:21])[0] 

56 cur_power = struct.unpack("<I", packet[26:30])[0] 

57 pre_time = struct.unpack("<H", packet[35:37])[0] 

58 pre_power = struct.unpack("<I", packet[42:46])[0] 

59 

60 if dev_id in dev_id_map: 60 ↛ 63line 60 didn't jump to line 63 because the condition on line 60 was always true

61 addr = dev_id_map[dev_id] 

62 else: 

63 addr = "UNKNOWN" 

64 logging.warning("dev_id = %s is unknown", f"0x{dev_id:04X}") 

65 logging.warning("dev_ip_map = %s", json.dumps(dev_id_map, indent=4)) 

66 

67 # NOTE: 同じデータが2回送られることがあるので、新しいデータ毎にインクリメント 

68 # しているフィールドを使ってはじく 

69 if dev_id in counter_hist and counter_hist[dev_id] == counter: 

70 logging.info("Packet duplication detected") 

71 return None 

72 counter_hist[dev_id] = counter 

73 

74 dif_time = cur_time - pre_time 

75 if dif_time < 0: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 dif_time += 0x10000 

77 if dif_time == 0: 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 logging.info("Packet duplication detected") 

79 return None 

80 

81 dif_power = cur_power - pre_power 

82 if dif_power < 0: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 dif_power += 0x100000000 

84 

85 data = { 

86 "addr": addr, 

87 "dev_id": dev_id, 

88 "dev_id_str": f"0x{dev_id:04X}", 

89 "cur_time": cur_time, 

90 "cur_power": cur_power, 

91 "pre_time": pre_time, 

92 "pre_power": pre_power, 

93 "watt": round(float(dif_power) / dif_time * WATT_SCALE, 2), 

94 } 

95 

96 logging.debug("Receive packet: %s", data) 

97 

98 return data 

99 

100 

101def process_packet(handle, header, payload, on_capture): 

102 global ieee_addr_list # noqa: PLW0603 

103 

104 dev_id_map = dev_id_map_load(handle["device"]["cache"]) 

105 

106 if header[1] == 0x08: 

107 logging.debug("IEEE addr payload: %s", dump_packet(payload)) 

108 ieee_addr_list.append(parse_packet_ieee_addr(header + payload)) 

109 elif header[1] == 0x12: 

110 logging.debug("Dev ID payload: %s", dump_packet(payload)) 

111 data = parse_packet_dev_id(header + payload) 

112 if data["index"] < len(ieee_addr_list): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 if data["dev_id"] not in dev_id_map: 

114 logging.info("Find IEEE addr for dev_id=%s", "0x{:04X}".format(data["dev_id"])) 

115 dev_id_map[data["dev_id"]] = ieee_addr_list[data["index"]] 

116 dev_id_map_store(handle["device"]["cache"], dev_id_map) 

117 elif dev_id_map[data["dev_id"]] != ieee_addr_list[data["index"]]: 

118 logging.info("Update IEEE addr for dev_id=%s", "0x{:04X}".format(data["dev_id"])) 

119 dev_id_map[data["dev_id"]] = ieee_addr_list[data["index"]] 

120 dev_id_map_store(handle["device"]["cache"], dev_id_map) 

121 else: 

122 logging.warning("Unable to identify IEEE addr for dev_id=%s", "0x{:04X}".format(data["dev_id"])) 

123 

124 if data["index"] == (len(ieee_addr_list) - 1): 124 ↛ 126line 124 didn't jump to line 126 because the condition on line 124 was never true

125 # NOTE: 次の周期に備えてリストをクリアする 

126 logging.debug("Clear IEEE addr list") 

127 ieee_addr_list = [] 

128 

129 elif header[1] == 0x2C: 

130 try: 

131 logging.debug("Measure payload: %s", dump_packet(payload)) 

132 data = parse_packet_measure(header + payload, dev_id_map) 

133 if data is not None: 

134 on_capture(data) 

135 except Exception: 

136 logging.warning("Invalid packet: %s", dump_packet(header + payload)) 

137 

138 else: 

139 logging.debug("Unknown packet: %s", dump_packet(header + payload))