Coverage for src/unit_cooler/actuator/valve.py: 97%

101 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 08:08 +0000

1#!/usr/bin/env python3 

2""" 

3電磁弁を可変デューティ制御します。 

4 

5Usage: 

6 valve.py [-c CONFIG] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml] 

10 -D : デバッグモードで動作します。 

11""" 

12 

13import logging 

14import pathlib 

15import threading 

16import time 

17 

18import my_lib.footprint 

19import my_lib.rpi 

20 

21import unit_cooler.actuator.work_log 

22import unit_cooler.const 

23 

24STAT_DIR_PATH = pathlib.Path("/dev/shm") # noqa: S108 

25 

26# STATE が WORKING になった際に作られるファイル。Duty 制御している場合、 

27# OFF Duty から ON Duty に遷移する度に変更日時が更新される。 

28# STATE が IDLE になった際に削除される。 

29# (OFF Duty になって実際にバルブを閉じただけでは削除されない) 

30STAT_PATH_VALVE_STATE_WORKING = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "working" 

31 

32# STATE が IDLE になった際に作られるファイル。 

33# (OFF Duty になって実際にバルブを閉じただけでは作られない) 

34# STATE が WORKING になった際に削除される。 

35STAT_PATH_VALVE_STATE_IDLE = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "idle" 

36 

37# 実際にバルブを開いた際に作られるファイル。 

38# 実際にバルブを閉じた際に削除される。 

39STAT_PATH_VALVE_OPEN = STAT_DIR_PATH / "unit_cooler" / "valve" / "open" 

40 

41# 実際にバルブを閉じた際に作られるファイル。 

42# 実際にバルブを開いた際に削除される。 

43STAT_PATH_VALVE_CLOSE = STAT_DIR_PATH / "unit_cooler" / "valve" / "close" 

44 

45pin_no = None 

46valve_lock = None 

47ctrl_hist = [] 

48 

49 

50def init(pin): 

51 global pin_no # noqa: PLW0603 

52 global valve_lock # noqa: PLW0603 

53 

54 pin_no = pin 

55 valve_lock = threading.Lock() 

56 

57 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING) 

58 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE) 

59 

60 my_lib.rpi.gpio.setwarnings(False) 

61 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM) 

62 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT) 

63 

64 set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

65 

66 

67# NOTE: テスト用 

68def clear_stat(): 

69 global ctrl_hist # noqa: PLW0603 

70 

71 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING) 

72 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE) 

73 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

74 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

75 ctrl_hist = [] 

76 

77 

78# NOTE: テスト用 

79def get_hist(): 

80 global ctrl_hist 

81 

82 return ctrl_hist 

83 

84 

85# NOTE: 実際にバルブを開きます。 

86# 現在のバルブの状態と、バルブが現在の状態になってからの経過時間を返します。 

87def set_state(valve_state): 

88 global pin_no 

89 global valve_lock 

90 global ctrl_hist 

91 

92 with valve_lock: 

93 curr_state = get_state() 

94 

95 if valve_state != curr_state: 

96 logging.info("VALVE: %s -> %s", curr_state.name, valve_state.name) 

97 # NOTE: テスト時のみ履歴を記録 

98 import os 

99 

100 if os.environ.get("TEST") == "true": 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true

101 ctrl_hist.append(curr_state) 

102 

103 my_lib.rpi.gpio.output(pin_no, valve_state.value) 

104 

105 if valve_state == unit_cooler.const.VALVE_STATE.OPEN: 

106 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

107 if not my_lib.footprint.exists(STAT_PATH_VALVE_OPEN): 

108 my_lib.footprint.update(STAT_PATH_VALVE_OPEN) 

109 else: 

110 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

111 if not my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 

112 my_lib.footprint.update(STAT_PATH_VALVE_CLOSE) 

113 

114 return get_status() 

115 

116 

117# NOTE: 実際のバルブの状態を返します 

118def get_state(): 

119 global pin_no 

120 

121 if my_lib.rpi.gpio.input(pin_no) == 1: 

122 return unit_cooler.const.VALVE_STATE.OPEN 

123 else: 

124 return unit_cooler.const.VALVE_STATE.CLOSE 

125 

126 

127# NOTE: 実際のバルブの状態と、その状態になってからの経過時間を返します 

128def get_status(): 

129 global valve_lock 

130 

131 with valve_lock: 

132 valve_state = get_state() 

133 

134 if valve_state == unit_cooler.const.VALVE_STATE.OPEN: 

135 assert my_lib.footprint.exists(STAT_PATH_VALVE_OPEN) # noqa: S101 

136 

137 return { 

138 "state": valve_state, 

139 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_OPEN), 

140 } 

141 else: # noqa: PLR5501 

142 if my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 142 ↛ 148line 142 didn't jump to line 148 because the condition on line 142 was always true

143 return { 

144 "state": valve_state, 

145 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_CLOSE), 

146 } 

147 else: 

148 return {"state": valve_state, "duration": 0} 

149 

150 

151# NOTE: バルブを動作状態にします。 

152# Duty 制御を実現するため、OFF Duty 期間の場合はバルブを閉じます。 

153# 実際にバルブを開いてからの経過時間を返します。 

154# duty_info = { "enable": bool, "on": on_sec, "off": off_sec } 

155def set_cooling_working(duty_info): 

156 logging.debug(duty_info) 

157 

158 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE) 

159 

160 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_WORKING): 

161 my_lib.footprint.update(STAT_PATH_VALVE_STATE_WORKING) 

162 unit_cooler.actuator.work_log.add("冷却を開始します。") 

163 logging.info("COOLING: IDLE -> WORKING") 

164 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

165 

166 if not duty_info["enable"]: 

167 # NOTE Duty 制御しない場合 

168 logging.info("COOLING: WORKING") 

169 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

170 

171 status = get_status() 

172 

173 if status["state"] == unit_cooler.const.VALVE_STATE.OPEN: 

174 # NOTE: 現在バルブが開かれている 

175 if status["duration"] >= duty_info["on_sec"]: 

176 logging.info("COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"]) 

177 unit_cooler.actuator.work_log.add("OFF Duty になったのでバルブを締めます。") 

178 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

179 else: 

180 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"] - status["duration"]) 

181 

182 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

183 else: # noqa: PLR5501 

184 # NOTE: 現在バルブが閉じられている 

185 if status["duration"] >= duty_info["off_sec"]: 

186 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"]) 

187 unit_cooler.actuator.work_log.add("ON Duty になったのでバルブを開けます。") 

188 return set_state(unit_cooler.const.VALVE_STATE.OPEN) 

189 else: 

190 logging.info( 

191 "COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"] - status["duration"] 

192 ) 

193 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

194 

195 

196def set_cooling_idle(): 

197 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING) 

198 

199 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_IDLE): 

200 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE) 

201 unit_cooler.actuator.work_log.add("冷却を停止しました。") 

202 logging.info("COOLING: WORKING -> IDLE") 

203 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

204 else: 

205 logging.info("COOLING: IDLE") 

206 return set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

207 

208 

209def set_cooling_state(control_message): 

210 if control_message["state"] == unit_cooler.const.COOLING_STATE.WORKING: 

211 return set_cooling_working(control_message["duty"]) 

212 else: 

213 return set_cooling_idle() 

214 

215 

216if __name__ == "__main__": 

217 # TEST Code 

218 import multiprocessing 

219 

220 import docopt 

221 import my_lib.config 

222 import my_lib.logger 

223 import my_lib.pretty 

224 

225 args = docopt.docopt(__doc__) 

226 

227 config_file = args["-c"] 

228 debug_mode = args["-D"] 

229 

230 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

231 

232 config = my_lib.config.load(config_file) 

233 event_queue = multiprocessing.Queue() 

234 

235 my_lib.webapp.config.init(config["actuator"]) 

236 my_lib.webapp.log.init(config) 

237 unit_cooler.actuator.work_log.init(config, event_queue) 

238 init(config["actuator"]["control"]["valve"]["pin_no"]) 

239 

240 while True: 

241 set_cooling_state( 

242 { 

243 "state": unit_cooler.const.COOLING_STATE.WORKING, 

244 "mode_index": 1, 

245 "duty": {"enable": True, "on_sec": 1, "off_sec": 3}, 

246 } 

247 ) 

248 time.sleep(1)