Coverage for src/actuator.py: 96%

64 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 14:35 +0000

1#!/usr/bin/env python3 

2""" 

3電磁弁を制御してエアコン室外機の冷却を行います。 

4 

5Usage: 

6 actuator.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-l LOG_PORT] [-n COUNT] [-d] [-t SPEEDUP] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml] 

10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost] 

11 -p PUB_PORT : コントローラの ZeroMQ Pub サーバーのポートを指定します。 [default: 2222] 

12 -l LOG_PORT : 動作ログを提供する WEB サーバーのポートを指定します。 [default: 5001] 

13 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 0] 

14 -d : ダミーモードで実行します。 

15 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1] 

16 -D : デバッグモードで動作します。 

17""" 

18 

19import concurrent.futures 

20import logging 

21import multiprocessing 

22import os 

23import signal 

24import time 

25 

26SCHEMA_CONFIG = "config.schema" 

27 

28 

29def sig_handler(num, frame): # noqa: ARG001 

30 import unit_cooler.actuator.worker 

31 

32 logging.warning("Receive signal %d", num) 

33 

34 if num == signal.SIGTERM: 

35 unit_cooler.actuator.worker.term() 

36 

37 

38def wait_before_start(config): 

39 for i in range(config["actuator"]["control"]["interval_sec"]): 

40 logging.info( 

41 "Wait for the old Pod to finish (%3d / %3d)", i + 1, config["actuator"]["control"]["interval_sec"] 

42 ) 

43 time.sleep(1) 

44 

45 

46def start(config, arg): 

47 global log_server_handle # noqa: PLW0603 

48 

49 setting = { 

50 "control_host": "localhost", 

51 "pub_port": 2222, 

52 "log_port": 5001, 

53 "dummy_mode": False, 

54 "speedup": 1, 

55 "msg_count": 0, 

56 "debug_mode": False, 

57 } 

58 setting.update(arg) 

59 

60 logging.info("Using ZMQ server of %s:%d", setting["control_host"], setting["pub_port"]) 

61 

62 # NOTE: オプションでダミーモードが指定された場合、環境変数もそれに揃えておく 

63 if setting["dummy_mode"]: 

64 logging.warning("Set dummy mode") 

65 os.environ["DUMMY_MODE"] = "true" 

66 

67 manager = multiprocessing.Manager() 

68 message_queue = manager.Queue() 

69 event_queue = manager.Queue() 

70 

71 if not setting["dummy_mode"] and (os.environ.get("TEST", "false") != "true"): 

72 # NOTE: 動作開始前に待つ。これを行わないと、複数の Pod が電磁弁を制御することに 

73 # なり、電磁弁の故障を誤判定する可能性がある。 

74 wait_before_start(config) 

75 

76 import unit_cooler.actuator.monitor 

77 import unit_cooler.actuator.valve 

78 import unit_cooler.actuator.work_log 

79 import unit_cooler.actuator.worker 

80 

81 unit_cooler.actuator.work_log.init(config, event_queue) 

82 

83 logging.info("Initialize valve") 

84 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"], config) 

85 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"]) 

86 

87 # NOTE: Blueprint のパス指定を YAML で行いたいので、my_lib.webapp の import 順を制御 

88 import unit_cooler.actuator.web_server 

89 

90 try: 

91 logging.info("Starting web server on port %d", setting["log_port"]) 

92 log_server_handle = unit_cooler.actuator.web_server.start(config, event_queue, setting["log_port"]) 

93 logging.info("Web server started successfully") 

94 except Exception: 

95 logging.exception("Failed to start web server") 

96 raise 

97 

98 executor = concurrent.futures.ThreadPoolExecutor() 

99 

100 thread_list = unit_cooler.actuator.worker.start( 

101 executor, unit_cooler.actuator.worker.get_worker_def(config, message_queue, setting) 

102 ) 

103 

104 signal.signal(signal.SIGTERM, sig_handler) 

105 

106 return (executor, thread_list, log_server_handle) 

107 

108 

109def wait_and_term(executor, thread_list, log_server_handle, terminate=True): # noqa: ARG001 

110 import unit_cooler.actuator.web_server 

111 import unit_cooler.actuator.work_log 

112 

113 ret = 0 

114 for thread_info in thread_list: 

115 logging.info("Wait %s finish", thread_info["name"]) 

116 

117 if thread_info["future"].result() != 0: 

118 logging.error("Error occurred in %s", thread_info["name"]) 

119 ret = -1 

120 

121 unit_cooler.actuator.worker.term() 

122 

123 logging.info("Shutdown executor") 

124 executor.shutdown(wait=True) 

125 

126 unit_cooler.actuator.web_server.term(log_server_handle) 

127 unit_cooler.actuator.work_log.term() 

128 

129 logging.warning("Terminate unit_cooler") 

130 

131 return ret 

132 

133 

134###################################################################### 

135if __name__ == "__main__": 

136 import pathlib 

137 import sys 

138 

139 import docopt 

140 import my_lib.config 

141 import my_lib.logger 

142 

143 args = docopt.docopt(__doc__) 

144 

145 config_file = args["-c"] 

146 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"]) 

147 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"])) 

148 log_port = int(os.environ.get("HEMS_LOG_PORT", args["-l"])) 

149 dummy_mode = os.environ.get("DUMMY_MODE", args["-d"]) 

150 speedup = int(args["-t"]) 

151 msg_count = int(args["-n"]) 

152 debug_mode = args["-D"] 

153 

154 my_lib.logger.init("hems.unit_cooler", level=logging.DEBUG if debug_mode else logging.INFO) 

155 

156 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG)) 

157 sys.exit( 

158 wait_and_term( 

159 *start( 

160 config, 

161 { 

162 "control_host": control_host, 

163 "pub_port": pub_port, 

164 "log_port": log_port, 

165 "dummy_mode": dummy_mode, 

166 "speedup": speedup, 

167 "msg_count": msg_count, 

168 "debug_mode": debug_mode, 

169 }, 

170 ), 

171 terminate=False, 

172 ) 

173 )