Coverage for src/sharp_hems_server.py: 0%

17 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-19 01:39 +0900

1#!/usr/bin/env python3 

2""" 

3センサーからのパケットを Pub-Sub パターンで配信します。 

4 

5Usage: 

6 sharp_hmes_server.py [-c CONFIG] [-t SERIAL_PORT] [-p SERVER_PORT] [-D] 

7 

8Options: 

9 -c CONFIG : 設定ファイルを指定します。 [default: config.yaml] 

10 -t SERIAL_PORT : HEMS 中継器を接続するシリアルポートを指定します。 [default: /dev/ttyUSB0] 

11 -p SERVER_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 4444] 

12 -D : デバッグモードで動作します。 

13""" 

14 

15import logging 

16import os 

17import pathlib 

18import signal 

19 

20import sharp_hems.notify 

21import sharp_hems.serial_pubsub 

22 

23SCHEMA_CONFIG = "config.schema" 

24 

25 

26def sig_handler(num, frame): # noqa: ARG001 

27 logging.warning("Receive signal %d", num) 

28 

29 if num in (signal.SIGTERM, signal.SIGINT): 

30 sharp_hems.serial_pubsub.stop_server() 

31 

32 

33def start(): 

34 try: 

35 sharp_hems.serial_pubsub.start_server(serial_port, server_port, liveness_file) 

36 except: 

37 sharp_hems.notify.error(config) 

38 raise 

39 

40 

41###################################################################### 

42if __name__ == "__main__": 

43 import pathlib 

44 

45 import docopt 

46 import my_lib.config 

47 import my_lib.logger 

48 

49 args = docopt.docopt(__doc__) 

50 

51 config_file = args["-c"] 

52 serial_port = os.environ.get("HEMS_SERIAL_PORT", args["-t"]) 

53 server_port = int(os.environ.get("HEMS_SERVER_PORT", args["-p"])) 

54 debug_mode = args["-D"] 

55 

56 my_lib.logger.init("hems.wattmeter-sharp", level=logging.DEBUG if debug_mode else logging.INFO) 

57 

58 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG)) 

59 

60 liveness_file = pathlib.Path(config["liveness"]["file"]["measure"]) 

61 

62 logging.info("Start server (serial: %s, port: %d)", serial_port, server_port) 

63 

64 start()