Coverage for flask/src/rasp_water/control/valve.py: 99%

189 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-04 12:06 +0900

1#!/usr/bin/env python3 

2import enum 

3import inspect 

4import logging 

5import os 

6import pathlib 

7import threading 

8import time 

9 

10import my_lib.footprint 

11import my_lib.rpi 

12import my_lib.webapp.config 

13 

14# バルブを一定期間開く際に作られるファイル。 

15# ファイルの内容はバルブを閉じるべき UNIX 時間。 

16STAT_PATH_VALVE_CONTROL_COMMAND = None 

17 

18# 実際にバルブを開いた際に作られるファイル。 

19# 実際にバルブを閉じた際に削除される。 

20STAT_PATH_VALVE_OPEN = None 

21 

22# 実際にバルブを閉じた際に作られるファイル。 

23# 実際にバルブを開いた際に削除される。 

24STAT_PATH_VALVE_CLOSE = None 

25 

26# 電磁弁制御用の GPIO 端子番号。 

27# この端子が H になった場合に、水が出るように回路を組んでおく。 

28GPIO_PIN_DEFAULT = 18 

29 

30 

31# 電磁弁を開いてからこの時間経過しても、水が流れていなかったらエラーにする 

32TIME_CLOSE_FAIL = 45 

33 

34# 電磁弁を閉じてからこの時間経過しても、水が流れていたらエラーにする 

35# (Pytest によるテストの際、時間を分単位で制御する関係上、60 より大きい値にしておく) 

36TIME_OPEN_FAIL = 61 

37 

38# この時間の間、異常な流量になっていたらエラーにする 

39TIME_OVER_FAIL = 5 

40 

41# この時間の間、流量が 0 だったら、今回の計測を停止する。 

42TIME_ZERO_TAIL = 5 

43 

44 

45class VALVE_STATE(enum.IntEnum): # noqa: N801 

46 OPEN = my_lib.rpi.gpio.level.HIGH.value 

47 CLOSE = my_lib.rpi.gpio.level.LOW.value 

48 

49 

50class CONTROL_MODE(enum.IntEnum): # noqa: N801 

51 TIMER = 1 

52 IDLE = 0 

53 

54 

55if (os.environ.get("DUMMY_MODE", "false") != "true") and ( 

56 os.environ.get("TEST", "false") != "true" 

57): # pragma: no cover 

58 

59 def conv_rawadc_to_flow(adc, offset): 

60 flow = max( 

61 ( 

62 ( 

63 adc 

64 * config["flow"]["sensor"]["adc"]["scale_value"] 

65 * config["flow"]["sensor"]["scale"]["max"] 

66 ) 

67 / 5000.0 

68 ) 

69 - offset, 

70 0, 

71 ) 

72 if flow < 0.01: 

73 flow = 0 

74 

75 return flow 

76 

77 def get_flow(offset=0): 

78 try: 

79 with pathlib.Path(config["flow"]["sensor"]["adc"]["value_file"]).open(mode="r") as f: 

80 return {"flow": conv_rawadc_to_flow(int(f.read()), offset), "result": "success"} 

81 except Exception: 

82 return {"flow": 0, "result": "fail"} 

83 

84else: 

85 import random 

86 

87 def get_flow(offset=0): # noqa: ARG001 

88 if my_lib.footprint.exists(STAT_PATH_VALVE_OPEN): 

89 if get_flow.prev_flow == 0: 

90 flow = config["flow"]["sensor"]["scale"]["max"] 

91 else: 

92 flow = max( 

93 0, 

94 min( 

95 get_flow.prev_flow 

96 + (random.random() - 0.5) * (config["flow"]["sensor"]["scale"]["max"] / 5.0), # noqa: S311 

97 config["flow"]["sensor"]["scale"]["max"], 

98 ), 

99 ) 

100 

101 get_flow.prev_flow = flow 

102 

103 return {"flow": flow, "result": "success"} 

104 else: 

105 if get_flow.prev_flow > 1: 

106 get_flow.prev_flow /= 5 

107 else: 

108 get_flow.prev_flow = max(0, get_flow.prev_flow - 0.5) 

109 

110 return {"flow": get_flow.prev_flow, "result": "success"} 

111 

112 get_flow.prev_flow = 0 

113 

114config = None 

115pin_no = GPIO_PIN_DEFAULT 

116worker = None 

117should_terminate = threading.Event() 

118current_auto_mode = False # 現在の水やりが自動モードかどうか 

119 

120 

121# NOTE: STAT_PATH_VALVE_CONTROL_COMMAND の内容に基づいて、 

122# バルブを一定時間開けます。 

123# 時間を操作したテストを行うため、この関数の中では、 

124# time.time() の代わりに my_lib.rpi.gpio_time() を使う。 

125def control_worker(config, queue): # noqa: PLR0912, PLR0915, C901 

126 global should_terminate, current_auto_mode 

127 

128 sleep_sec = 0.1 

129 

130 logging.info("Start valve control worker") 

131 

132 time_open_start = None 

133 time_close = None 

134 flow = 0 

135 flow_sum = 0 

136 count_flow = 0 

137 count_zero = 0 

138 count_over = 0 

139 notify_last_time = None 

140 notify_last_flow_sum = 0 

141 notify_last_count = 0 

142 stop_measure = False 

143 

144 i = 0 

145 while True: 

146 if should_terminate.is_set(): 

147 break 

148 

149 if time_open_start is not None: 

150 flow = get_flow(config["flow"]["offset"])["flow"] 

151 logging.debug("Current flow: %.1f", flow) 

152 flow_sum += flow 

153 count_flow += 1 

154 

155 if (my_lib.rpi.gpio_time() - notify_last_time) > 10: 

156 # NOTE: 10秒ごとに途中集計を報告する 

157 queue.put( 

158 { 

159 "type": "instantaneous", 

160 "flow": float(flow_sum - notify_last_flow_sum) / (count_flow - notify_last_count), 

161 } 

162 ) 

163 

164 notify_last_time = my_lib.rpi.gpio_time() 

165 notify_last_flow_sum = flow_sum 

166 notify_last_count = count_flow 

167 

168 # NOTE: 以下の処理はファイルシステムへのアクセスが発生するので、実施頻度を落とす 

169 if i % 5 == 0: 

170 if time_open_start is None: 

171 if my_lib.footprint.exists(STAT_PATH_VALVE_OPEN): 

172 # NOTE: バルブが開かれていたら、状態を変更してトータルの水量の集計を開始する 

173 logging.info("Start flow measurement") 

174 

175 time_open_start = my_lib.rpi.gpio_time() 

176 notify_last_time = time_open_start 

177 # NOTE: バルブを閉じてから流量が 0 になるまでに再度開いた場合にエラーにならないようにする 

178 time_close = None 

179 else: 

180 if my_lib.footprint.exists(STAT_PATH_VALVE_CONTROL_COMMAND): 

181 # NOTE: バルブコマンドが存在したら、閉じる時間をチェックして、必要に応じて閉じる 

182 

183 try: 

184 time_to_close = my_lib.footprint.mtime(STAT_PATH_VALVE_CONTROL_COMMAND) 

185 if (my_lib.rpi.gpio_time() > time_to_close) or ( 

186 abs(my_lib.rpi.gpio_time() - time_to_close) < sleep_sec 

187 ): 

188 logging.info("Times is up, close valve") 

189 # NOTE: 下記の関数の中で 

190 # STAT_PATH_VALVE_CONTROL_COMMAND は削除される 

191 set_state(VALVE_STATE.CLOSE) 

192 time_close = my_lib.rpi.gpio_time() 

193 except Exception: 

194 logging.exception("Failed to read command") 

195 

196 if (time_close is None) and my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 

197 logging.info("May be manually closed") 

198 set_state(VALVE_STATE.CLOSE) 

199 time_close = my_lib.rpi.gpio_time() 

200 

201 if (time_close is not None) and (time_open_start is not None): 

202 period_sec = my_lib.rpi.gpio_time() - time_open_start 

203 

204 if flow < 0.1: 

205 count_zero += 1 

206 

207 if flow > config["flow"]["threshold"]["error"]: 

208 count_over += 1 

209 

210 if count_over > TIME_OVER_FAIL: 

211 set_state(VALVE_STATE.CLOSE) 

212 queue.put({"type": "error", "message": "😵水が流れすぎています。"}) 

213 

214 # NOTE: バルブが閉じられた後、流量が 0 になっていたらトータル流量を報告する 

215 if count_zero > TIME_ZERO_TAIL: 

216 logging.info("Stop flow measurement") 

217 

218 # NOTE: 流量(L/min)の平均を求めてから期間(min)を掛ける 

219 total = float(flow_sum) / count_flow * period_sec / 60 

220 

221 logging.debug( 

222 "(flow_sum, count_flow, period_sec, total) = (%1.f, %d, %d, %.1f)", 

223 flow_sum, 

224 count_flow, 

225 period_sec, 

226 total, 

227 ) 

228 queue.put( 

229 { 

230 "type": "total", 

231 "period": period_sec, 

232 "total": total, 

233 "auto": current_auto_mode, 

234 } 

235 ) 

236 

237 if (period_sec > TIME_CLOSE_FAIL) and (total < 1): 

238 queue.put( 

239 { 

240 "type": "error", 

241 "message": "😵 元栓が閉まっている可能性があります。", 

242 } 

243 ) 

244 

245 stop_measure = True 

246 elif (my_lib.rpi.gpio_time() - time_close) > TIME_OPEN_FAIL: 

247 set_state(VALVE_STATE.CLOSE) 

248 queue.put( 

249 { 

250 "type": "error", 

251 "message": "😵 バルブを閉めても水が流れ続けています。", 

252 } 

253 ) 

254 stop_measure = True 

255 

256 if stop_measure: 

257 stop_measure = False 

258 time_open_start = None 

259 time_close = None 

260 flow_sum = 0 

261 count_flow = 0 

262 count_zero = 0 

263 count_over = 0 

264 

265 notify_last_time = None 

266 notify_last_flow_sum = 0 

267 notify_last_count = 0 

268 

269 time.sleep(sleep_sec) 

270 

271 if i % (10 / sleep_sec) == 0: 

272 my_lib.footprint.update(config["liveness"]["file"]["valve_control"]) 

273 

274 i += 1 

275 

276 logging.info("Terminate valve control worker") 

277 

278 

279def init(config_, queue, pin=GPIO_PIN_DEFAULT): 

280 global config # noqa: PLW0603 

281 global worker # noqa: PLW0603 

282 global pin_no # noqa: PLW0603 

283 global should_terminate 

284 global STAT_PATH_VALVE_CONTROL_COMMAND # noqa: PLW0603 

285 global STAT_PATH_VALVE_OPEN # noqa: PLW0603 

286 global STAT_PATH_VALVE_CLOSE # noqa: PLW0603 

287 

288 STAT_PATH_VALVE_CONTROL_COMMAND = my_lib.webapp.config.STAT_DIR_PATH / "valve" / "control" / "command" 

289 STAT_PATH_VALVE_OPEN = my_lib.webapp.config.STAT_DIR_PATH / "valve" / "open" 

290 STAT_PATH_VALVE_CLOSE = my_lib.webapp.config.STAT_DIR_PATH / "valve" / "close" 

291 

292 config = config_ 

293 

294 if worker is not None: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 raise ValueError("worker should be None") # noqa: TRY003, EM101 

296 

297 pin_no = pin 

298 

299 set_state(VALVE_STATE.CLOSE) 

300 

301 logging.info("Setting scale of ADC") 

302 if pathlib.Path(config["flow"]["sensor"]["adc"]["scale_file"]).exists(): 

303 with pathlib.Path(config["flow"]["sensor"]["adc"]["scale_file"]).open(mode="w") as f: 

304 f.write(str(config["flow"]["sensor"]["adc"]["scale_value"])) 

305 

306 should_terminate.clear() 

307 

308 worker = threading.Thread( 

309 target=control_worker, 

310 args=( 

311 config, 

312 queue, 

313 ), 

314 ) 

315 worker.start() 

316 

317 

318def term(): 

319 global worker # noqa: PLW0603 

320 

321 should_terminate.set() 

322 worker.join() 

323 

324 worker = None 

325 

326 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

327 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

328 my_lib.footprint.clear(STAT_PATH_VALVE_CONTROL_COMMAND) 

329 

330 my_lib.rpi.gpio.cleanup() 

331 

332 

333# NOTE: 実際にバルブを開きます。 

334def set_state(valve_state): 

335 global pin_no 

336 

337 logging.debug( 

338 "set_state = %s from %s at %s:%d", 

339 valve_state, 

340 inspect.stack()[1].function, 

341 inspect.stack()[1].filename, 

342 inspect.stack()[1].lineno, 

343 ) 

344 

345 curr_state = get_state() 

346 

347 if valve_state != curr_state: 

348 logging.info("VALVE: %s -> %s", curr_state.name, valve_state.name) 

349 

350 my_lib.rpi.gpio.setwarnings(False) 

351 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM) 

352 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT) 

353 my_lib.rpi.gpio.output(pin_no, valve_state.value) 

354 

355 if valve_state == VALVE_STATE.OPEN: 

356 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

357 my_lib.footprint.update(STAT_PATH_VALVE_OPEN) 

358 else: 

359 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

360 my_lib.footprint.clear(STAT_PATH_VALVE_CONTROL_COMMAND) 

361 my_lib.footprint.update(STAT_PATH_VALVE_CLOSE) 

362 

363 return get_state() 

364 

365 

366# NOTE: 実際のバルブの状態を返します 

367def get_state(): 

368 global pin_no 

369 

370 my_lib.rpi.gpio.setwarnings(False) 

371 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM) 

372 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT) 

373 

374 if my_lib.rpi.gpio.input(pin_no) == 1: 

375 return VALVE_STATE.OPEN 

376 else: 

377 return VALVE_STATE.CLOSE 

378 

379 

380def set_control_mode(open_sec, auto=False): 

381 global current_auto_mode 

382 current_auto_mode = auto 

383 

384 logging.info("Open valve for %d sec (auto=%s)", open_sec, auto) 

385 

386 set_state(VALVE_STATE.OPEN) 

387 my_lib.footprint.update(STAT_PATH_VALVE_CONTROL_COMMAND, my_lib.rpi.gpio_time() + open_sec) 

388 

389 

390def get_control_mode(): 

391 if my_lib.footprint.exists(STAT_PATH_VALVE_CONTROL_COMMAND): 

392 time_to_close = my_lib.footprint.mtime(STAT_PATH_VALVE_CONTROL_COMMAND) 

393 time_now = my_lib.rpi.gpio_time() 

394 

395 if time_to_close >= time_now: 

396 return { 

397 "mode": CONTROL_MODE.TIMER, 

398 "remain": time_to_close - time_now, 

399 } 

400 else: 

401 if (time_now - time_to_close) > 1: 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was always true

402 logging.warning("Timer control of the valve may be broken") 

403 return {"mode": CONTROL_MODE.TIMER, "remain": 0} 

404 else: 

405 return {"mode": CONTROL_MODE.IDLE, "remain": 0} 

406 

407 

408if __name__ == "__main__": 

409 from multiprocessing import Queue 

410 

411 import my_lib.config 

412 import my_lib.logger 

413 

414 my_lib.logger.init("test", level=logging.INFO) 

415 

416 config = my_lib.load() 

417 queue = Queue() 

418 init(config, queue) 

419 

420 set_state(VALVE_STATE.OPEN) 

421 time.sleep(0.5) 

422 logging.info("Flow: %.2f", get_flow(config["flow"]["offset"])["flow"]) 

423 time.sleep(0.5) 

424 logging.info("Flow: %.2f", get_flow(config["flow"]["offset"])["flow"]) 

425 set_state(VALVE_STATE.CLOSE) 

426 logging.info("Flow: %.2f", get_flow(config["flow"]["offset"])["flow"]) 

427 

428 set_control_mode(60) 

429 time.sleep(1) 

430 logging.info(get_control_mode()) 

431 time.sleep(1) 

432 logging.info(get_control_mode()) 

433 time.sleep(2) 

434 logging.info(get_control_mode()) 

435 

436 while True: 

437 info = queue.get() 

438 logging.info(info) 

439 

440 if info["type"] == "total": 

441 break 

442 

443 should_terminate.set()