Coverage for src/actuator.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 08:08 +0000

1#!/usr/bin/env python3 

2""" 

3電磁弁を制御してエアコン室外機の冷却を行います。 

4 

5Usage: 

6 actuator.py [-c CONFIG] [-s CONTROL_HOST] [-p PUB_PORT] [-n COUNT] [-d] [-t SPEEDUP] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。 [default: config.yaml] 

10 -s CONTROL_HOST : コントローラのホスト名を指定します。 [default: localhost] 

11 -p PUB_PORT : ZeroMQ の Pub サーバーを動作させるポートを指定します。 [default: 2222] 

12 -n COUNT : n 回制御メッセージを受信したら終了します。0 は制限なし。 [default: 0] 

13 -d : ダミーモードで実行します。 

14 -t SPEEDUP : 時短モード。演算間隔を SPEEDUP 分の一にします。 [default: 1] 

15 -D : デバッグモードで動作します。 

16""" 

17 

18import concurrent.futures 

19import logging 

20import multiprocessing 

21import os 

22import signal 

23import time 

24 

25SCHEMA_CONFIG = "config.schema" 

26 

27 

28def sig_handler(num, frame): # noqa: ARG001 

29 import unit_cooler.actuator.worker 

30 

31 logging.warning("Receive signal %d", num) 

32 

33 if num == signal.SIGTERM: 

34 unit_cooler.actuator.worker.term() 

35 

36 

37def wait_before_start(config): 

38 for i in range(config["actuator"]["control"]["interval_sec"]): 

39 logging.info( 

40 "Wait for the old Pod to finish (%3d / %3d)", i + 1, config["actuator"]["control"]["interval_sec"] 

41 ) 

42 time.sleep(1) 

43 

44 

45def start(config, arg): 

46 global should_terminate # noqa: PLW0603 

47 global log_server_handle # noqa: PLW0603 

48 

49 setting = { 

50 "control_host": "localhost", 

51 "pub_port": 2222, 

52 "log_port": 5001, 

53 "dummy_mode": False, 

54 "speedup": 1, 

55 "msg_count": 0, 

56 "debug_mode": False, 

57 } 

58 setting.update(arg) 

59 

60 should_terminate = False 

61 

62 logging.info("Using ZMQ server of %s:%d", setting["control_host"], setting["pub_port"]) 

63 

64 # NOTE: オプションでダミーモードが指定された場合、環境変数もそれに揃えておく 

65 if setting["dummy_mode"]: 

66 logging.warning("Set dummy mode") 

67 os.environ["DUMMY_MODE"] = "true" 

68 

69 manager = multiprocessing.Manager() 

70 message_queue = manager.Queue() 

71 event_queue = manager.Queue() 

72 

73 # NOTE: Blueprint のパス指定を YAML で行いたいので、my_lib.webapp の import 順を制御 

74 import unit_cooler.actuator.log_server 

75 

76 log_server_handle = unit_cooler.actuator.log_server.start(config, event_queue, setting["log_port"]) 

77 

78 if not setting["dummy_mode"] and (os.environ.get("TEST", "false") != "true"): 

79 # NOTE: 動作開始前に待つ。これを行わないと、複数の Pod が電磁弁を制御することに 

80 # なり、電磁弁の故障を誤判定する可能性がある。 

81 wait_before_start(config) 

82 

83 import unit_cooler.actuator.work_log 

84 import unit_cooler.actuator.worker 

85 

86 unit_cooler.actuator.work_log.init(config, event_queue) 

87 

88 logging.info("Initialize valve") 

89 unit_cooler.actuator.valve.init(config["actuator"]["control"]["valve"]["pin_no"]) 

90 unit_cooler.actuator.monitor.init(config["actuator"]["control"]["valve"]["pin_no"]) 

91 

92 executor = concurrent.futures.ThreadPoolExecutor() 

93 

94 thread_list = unit_cooler.actuator.worker.start( 

95 executor, unit_cooler.actuator.worker.get_worker_def(config, message_queue, setting) 

96 ) 

97 

98 signal.signal(signal.SIGTERM, sig_handler) 

99 

100 return (executor, thread_list, log_server_handle) 

101 

102 

103def wait_and_term(executor, thread_list, log_server_handle, terminate=True): 

104 global should_terminate # noqa: PLW0603 

105 

106 import unit_cooler.actuator.log_server 

107 import unit_cooler.actuator.work_log 

108 

109 should_terminate = terminate 

110 

111 ret = 0 

112 for thread_info in thread_list: 

113 logging.info("Wait %s finish", thread_info["name"]) 

114 

115 if thread_info["future"].result() != 0: 

116 logging.error("Error occurred in %s", thread_info["name"]) 

117 ret = -1 

118 

119 logging.info("Shutdown executor") 

120 executor.shutdown(wait=True) 

121 

122 unit_cooler.actuator.log_server.term(log_server_handle) 

123 unit_cooler.actuator.work_log.term() 

124 

125 logging.warning("Terminate unit_cooler") 

126 

127 return ret 

128 

129 

130###################################################################### 

131if __name__ == "__main__": 

132 import pathlib 

133 import sys 

134 

135 import docopt 

136 import my_lib.config 

137 import my_lib.logger 

138 

139 args = docopt.docopt(__doc__) 

140 

141 config_file = args["-c"] 

142 control_host = os.environ.get("HEMS_CONTROL_HOST", args["-s"]) 

143 pub_port = int(os.environ.get("HEMS_PUB_PORT", args["-p"])) 

144 dummy_mode = os.environ.get("DUMMY_MODE", args["-d"]) 

145 speedup = int(args["-t"]) 

146 msg_count = int(args["-n"]) 

147 debug_mode = args["-D"] 

148 

149 my_lib.logger.init("hems.unit_cooler", level=logging.DEBUG if debug_mode else logging.INFO) 

150 

151 config = my_lib.config.load(config_file, pathlib.Path(SCHEMA_CONFIG)) 

152 

153 sys.exit( 

154 wait_and_term( 

155 *start( 

156 config, 

157 { 

158 "control_host": control_host, 

159 "pub_port": pub_port, 

160 "log_port": config["actuator"]["log_server"]["webapp"]["port"], 

161 "dummy_mode": dummy_mode, 

162 "speedup": speedup, 

163 "msg_count": msg_count, 

164 "debug_mode": debug_mode, 

165 }, 

166 ), 

167 terminate=False, 

168 ) 

169 )