Coverage for src/unit_cooler/actuator/valve.py: 95%

111 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 14:35 +0000

1#!/usr/bin/env python3 

2""" 

3電磁弁を可変デューティ制御します。 

4 

5Usage: 

6 valve.py [-c CONFIG] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml] 

10 -D : デバッグモードで動作します。 

11""" 

12 

13import logging 

14import pathlib 

15import threading 

16import time 

17 

18import my_lib.footprint 

19import my_lib.rpi 

20 

21import unit_cooler.actuator.work_log 

22import unit_cooler.const 

23 

24STAT_DIR_PATH = pathlib.Path("/dev/shm") # noqa: S108 

25 

26# STATE が WORKING になった際に作られるファイル。Duty 制御している場合、 

27# OFF Duty から ON Duty に遷移する度に変更日時が更新される。 

28# STATE が IDLE になった際に削除される。 

29# (OFF Duty になって実際にバルブを閉じただけでは削除されない) 

30STAT_PATH_VALVE_STATE_WORKING = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "working" 

31 

32# STATE が IDLE になった際に作られるファイル。 

33# (OFF Duty になって実際にバルブを閉じただけでは作られない) 

34# STATE が WORKING になった際に削除される。 

35STAT_PATH_VALVE_STATE_IDLE = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "idle" 

36 

37# 実際にバルブを開いた際に作られるファイル。 

38# 実際にバルブを閉じた際に削除される。 

39STAT_PATH_VALVE_OPEN = STAT_DIR_PATH / "unit_cooler" / "valve" / "open" 

40 

41# 実際にバルブを閉じた際に作られるファイル。 

42# 実際にバルブを開いた際に削除される。 

43STAT_PATH_VALVE_CLOSE = STAT_DIR_PATH / "unit_cooler" / "valve" / "close" 

44 

45pin_no = None 

46valve_lock = None 

47ctrl_hist = [] 

48config = None 

49 

50 

51def init(pin, valve_config): 

52 global pin_no # noqa: PLW0603 

53 global valve_lock # noqa: PLW0603 

54 global config # noqa: PLW0603 

55 

56 pin_no = pin 

57 valve_lock = threading.Lock() 

58 config = valve_config 

59 

60 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING) 

61 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE) 

62 

63 my_lib.rpi.gpio.setwarnings(False) 

64 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM) 

65 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT) 

66 

67 set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

68 

69 

70# NOTE: テスト用 

71def clear_stat(): 

72 global ctrl_hist # noqa: PLW0603 

73 

74 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING) 

75 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE) 

76 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

77 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

78 ctrl_hist = [] 

79 

80 

81# NOTE: テスト用 

82def get_hist(): 

83 global ctrl_hist 

84 

85 return ctrl_hist 

86 

87 

88# NOTE: 実際にバルブを開きます。 

89# 現在のバルブの状態と、バルブが現在の状態になってからの経過時間を返します。 

90def set_state(valve_state): 

91 global pin_no 

92 global valve_lock 

93 global ctrl_hist 

94 

95 with valve_lock: 

96 curr_state = get_state() 

97 

98 if valve_state != curr_state: 

99 logging.info("VALVE: %s -> %s", curr_state.name, valve_state.name) 

100 # NOTE: テスト時のみ履歴を記録 

101 import os 

102 

103 if os.environ.get("TEST") == "true": 103 ↛ 107line 103 didn't jump to line 107 because the condition on line 103 was always true

104 ctrl_hist.append(curr_state) 

105 

106 # メトリクス記録 

107 try: 

108 from unit_cooler.metrics import get_metrics_collector 

109 

110 global config 

111 

112 if config and "actuator" in config and "metrics" in config["actuator"]: 112 ↛ 119line 112 didn't jump to line 119 because the condition on line 112 was always true

113 metrics_db_path = config["actuator"]["metrics"]["data"] 

114 metrics_collector = get_metrics_collector(metrics_db_path) 

115 metrics_collector.record_valve_operation() 

116 except Exception: 

117 logging.debug("Failed to record valve operation metrics") 

118 

119 my_lib.rpi.gpio.output(pin_no, valve_state.value) 

120 

121 if valve_state == unit_cooler.const.VALVE_STATE.OPEN: 

122 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

123 if not my_lib.footprint.exists(STAT_PATH_VALVE_OPEN): 

124 my_lib.footprint.update(STAT_PATH_VALVE_OPEN) 

125 else: 

126 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

127 if not my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 

128 my_lib.footprint.update(STAT_PATH_VALVE_CLOSE) 

129 

130 return get_status() 

131 

132 

133# NOTE: 実際のバルブの状態を返します 

134def get_state(): 

135 global pin_no 

136 

137 if my_lib.rpi.gpio.input(pin_no) == 1: 

138 return unit_cooler.const.VALVE_STATE.OPEN 

139 else: 

140 return unit_cooler.const.VALVE_STATE.CLOSE 

141 

142 

143# NOTE: 実際のバルブの状態と、その状態になってからの経過時間を返します 

144def get_status(): 

145 global valve_lock 

146 

147 with valve_lock: 

148 valve_state = get_state() 

149 

150 if valve_state == unit_cooler.const.VALVE_STATE.OPEN: 

151 assert my_lib.footprint.exists(STAT_PATH_VALVE_OPEN) # noqa: S101 

152 

153 return { 

154 "state": valve_state, 

155 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_OPEN), 

156 } 

157 else: # noqa: PLR5501 

158 if my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 158 ↛ 164line 158 didn't jump to line 164 because the condition on line 158 was always true

159 return { 

160 "state": valve_state, 

161 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_CLOSE), 

162 } 

163 else: 

164 return {"state": valve_state, "duration": 0} 

165 

166 

167# NOTE: バルブを動作状態にします。 

168# Duty 制御を実現するため、OFF Duty 期間の場合はバルブを閉じます。 

169# 実際にバルブを開いてからの経過時間を返します。 

170# duty_info = { "enable": bool, "on": on_sec, "off": off_sec } 

171def set_cooling_working(duty_info): 

172 logging.debug(duty_info) 

173 

174 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE) 

175 

176 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_WORKING): 

177 my_lib.footprint.update(STAT_PATH_VALVE_STATE_WORKING) 

178 unit_cooler.actuator.work_log.add("冷却を開始します。") 

179 logging.info("COOLING: IDLE -> WORKING") 

180 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

181 

182 if not duty_info["enable"]: 

183 # NOTE Duty 制御しない場合 

184 logging.info("COOLING: WORKING") 

185 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

186 

187 status = get_status() 

188 

189 if status["state"] == unit_cooler.const.VALVE_STATE.OPEN: 

190 # NOTE: 現在バルブが開かれている 

191 if status["duration"] >= duty_info["on_sec"]: 

192 logging.info("COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"]) 

193 unit_cooler.actuator.work_log.add("OFF Duty になったのでバルブを締めます。") 

194 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

195 else: 

196 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"] - status["duration"]) 

197 

198 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

199 else: # noqa: PLR5501 

200 # NOTE: 現在バルブが閉じられている 

201 if status["duration"] >= duty_info["off_sec"]: 

202 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"]) 

203 unit_cooler.actuator.work_log.add("ON Duty になったのでバルブを開けます。") 

204 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

205 else: 

206 logging.info( 

207 "COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"] - status["duration"] 

208 ) 

209 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

210 

211 

212def set_cooling_idle(): 

213 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING) 

214 

215 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_IDLE): 

216 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE) 

217 unit_cooler.actuator.work_log.add("冷却を停止しました。") 

218 logging.info("COOLING: WORKING -> IDLE") 

219 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

220 else: 

221 logging.info("COOLING: IDLE") 

222 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

223 

224 

225def set_cooling_state(control_message): 

226 if control_message["state"] == unit_cooler.const.COOLING_STATE.WORKING: 

227 return set_cooling_working(control_message["duty"]) 

228 else: 

229 return set_cooling_idle() 

230 

231 

232if __name__ == "__main__": 

233 # TEST Code 

234 import multiprocessing 

235 

236 import docopt 

237 import my_lib.config 

238 import my_lib.logger 

239 import my_lib.pretty 

240 

241 args = docopt.docopt(__doc__) 

242 

243 config_file = args["-c"] 

244 debug_mode = args["-D"] 

245 

246 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

247 

248 config = my_lib.config.load(config_file) 

249 event_queue = multiprocessing.Queue() 

250 

251 my_lib.webapp.config.init(config["actuator"]) 

252 my_lib.webapp.log.init(config) 

253 unit_cooler.actuator.work_log.init(config, event_queue) 

254 init(config["actuator"]["control"]["valve"]["pin_no"]) 

255 

256 while True: 

257 set_cooling_state( 

258 { 

259 "state": unit_cooler.const.COOLING_STATE.WORKING, 

260 "mode_index": 1, 

261 "duty": {"enable": True, "on_sec": 1, "off_sec": 3}, 

262 } 

263 ) 

264 time.sleep(1)