Coverage for src/sharp_hems_dump.py: 0%

27 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-19 01:39 +0900

1#!/usr/bin/env python3 

2""" 

3センサーのシリアル出力をダンプします。 

4 

5Usage: 

6 sharp_hmes_dump.py [-c CONFIG] [-s SERVER_HOST] [-p SERVER_PORT] [-o FILE] [-D] 

7 

8Options: 

9 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml] 

10 -s SERVER_HOST : サーバーのホスト名を指定します。 [default: localhost] 

11 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444] 

12 -o FILE : 出力ファイル名 [default: packet.dump] 

13 -D : デバッグモードで動作します。 

14""" 

15 

16import logging 

17import os 

18import pathlib 

19import time 

20 

21import my_lib.fluentd_util 

22import my_lib.footprint 

23import my_lib.serializer 

24 

25import sharp_hems.device 

26import sharp_hems.notify 

27import sharp_hems.serial_pubsub 

28import sharp_hems.sniffer 

29 

30SCHEMA_CONFIG = "config.schema" 

31 

32start_time = None 

33packet_list = [] 

34 

35 

36def process_packet(handle, header, payload): 

37 global start_time # noqa: PLW0603 

38 global packet_list 

39 

40 now = time.time() 

41 if start_time is None: 

42 start_time = now 

43 

44 packet_list.append([now - start_time, header, payload]) 

45 logging.info("Receive %d packet(s) ", len(packet_list)) 

46 

47 my_lib.serializer.store(handle["dump_file"], packet_list) 

48 

49 

50def start(handle): 

51 try: 

52 sharp_hems.serial_pubsub.start_client(server_host, server_port, handle, process_packet) 

53 except: 

54 sharp_hems.notify.error(config) 

55 raise 

56 

57 

58###################################################################### 

59if __name__ == "__main__": 

60 import pathlib 

61 

62 import docopt 

63 import my_lib.config 

64 import my_lib.logger 

65 

66 args = docopt.docopt(__doc__) 

67 

68 config_file = args["-c"] 

69 server_host = os.environ.get("HEMS_SERVER_HOST", args["-s"]) 

70 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"])) 

71 dump_file = args["-o"] 

72 debug_mode = args["-D"] 

73 

74 my_lib.logger.init("hems.wattmeter-sharp", level=logging.DEBUG if debug_mode else logging.INFO) 

75 

76 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG)) 

77 

78 dev_define_file = pathlib.Path(config["device"]["define"]) 

79 dev_cache_file = pathlib.Path(config["device"]["cache"]) 

80 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"]) 

81 

82 logging.info("Start HEMS logger (server: %s:%d)", server_host, server_port) 

83 

84 logging.info( 

85 "Initialize Fluentd sender (host: %s, tag: %s)", 

86 config["fluentd"]["host"], 

87 config["fluentd"]["data"]["tag"], 

88 ) 

89 sender = my_lib.fluentd_util.get_handle(config["fluentd"]["data"]["tag"], host=config["fluentd"]["host"]) 

90 

91 start( 

92 { 

93 "sender": sender, 

94 "dump_file": dump_file, 

95 } 

96 )