Coverage for flask/src/rasp_water/valve.py: 99%

187 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 13:51 +0900

1#!/usr/bin/env python3 

2import enum 

3import inspect 

4import logging 

5import os 

6import pathlib 

7import threading 

8import time 

9 

10import my_lib.footprint 

11import my_lib.rpi 

12import my_lib.webapp.config 

13 

14# バルブを一定期間開く際に作られるファイル。 

15# ファイルの内容はバルブを閉じるべき UNIX 時間。 

16STAT_PATH_VALVE_CONTROL_COMMAND = None 

17 

18# 実際にバルブを開いた際に作られるファイル。 

19# 実際にバルブを閉じた際に削除される。 

20STAT_PATH_VALVE_OPEN = None 

21 

22# 実際にバルブを閉じた際に作られるファイル。 

23# 実際にバルブを開いた際に削除される。 

24STAT_PATH_VALVE_CLOSE = None 

25 

26# 電磁弁制御用の GPIO 端子番号。 

27# この端子が H になった場合に、水が出るように回路を組んでおく。 

28GPIO_PIN_DEFAULT = 18 

29 

30 

31# 電磁弁を開いてからこの時間経過しても、水が流れていなかったらエラーにする 

32TIME_CLOSE_FAIL = 45 

33 

34# 電磁弁を閉じてからこの時間経過しても、水が流れていたらエラーにする 

35# (Pytest によるテストの際、時間を分単位で制御する関係上、60 より大きい値にしておく) 

36TIME_OPEN_FAIL = 61 

37 

38# この時間の間、異常な流量になっていたらエラーにする 

39TIME_OVER_FAIL = 5 

40 

41# この時間の間、流量が 0 だったら、今回の計測を停止する。 

42TIME_ZERO_TAIL = 5 

43 

44 

45class VALVE_STATE(enum.IntEnum): # noqa: N801 

46 OPEN = my_lib.rpi.gpio.level.HIGH.value 

47 CLOSE = my_lib.rpi.gpio.level.LOW.value 

48 

49 

50class CONTROL_MODE(enum.IntEnum): # noqa: N801 

51 TIMER = 1 

52 IDLE = 0 

53 

54 

55if (os.environ.get("DUMMY_MODE", "false") != "true") and ( 

56 os.environ.get("TEST", "false") != "true" 

57): # pragma: no cover 

58 

59 def conv_rawadc_to_flow(adc, offset): 

60 flow = max( 

61 ( 

62 ( 

63 adc 

64 * config["flow"]["sensor"]["adc"]["scale_value"] 

65 * config["flow"]["sensor"]["scale"]["max"] 

66 ) 

67 / 5000.0 

68 ) 

69 - offset, 

70 0, 

71 ) 

72 if flow < 0.01: 

73 flow = 0 

74 

75 return flow 

76 

77 def get_flow(offset=0): 

78 try: 

79 with pathlib.Path(config["flow"]["sensor"]["adc"]["value_file"]).open(mode="r") as f: 

80 return {"flow": conv_rawadc_to_flow(int(f.read()), offset), "result": "success"} 

81 except Exception: 

82 return {"flow": 0, "result": "fail"} 

83 

84else: 

85 import random 

86 

87 def get_flow(offset=0): # noqa: ARG001 

88 if my_lib.footprint.exists(STAT_PATH_VALVE_OPEN): 

89 if get_flow.prev_flow == 0: 

90 flow = config["flow"]["sensor"]["scale"]["max"] 

91 else: 

92 flow = max( 

93 0, 

94 min( 

95 get_flow.prev_flow 

96 + (random.random() - 0.5) * (config["flow"]["sensor"]["scale"]["max"] / 5.0), # noqa: S311 

97 config["flow"]["sensor"]["scale"]["max"], 

98 ), 

99 ) 

100 

101 get_flow.prev_flow = flow 

102 

103 return {"flow": flow, "result": "success"} 

104 else: 

105 if get_flow.prev_flow > 1: 

106 get_flow.prev_flow /= 5 

107 else: 

108 get_flow.prev_flow = max(0, get_flow.prev_flow - 0.5) 

109 

110 return {"flow": get_flow.prev_flow, "result": "success"} 

111 

112 get_flow.prev_flow = 0 

113 

114config = None 

115pin_no = GPIO_PIN_DEFAULT 

116worker = None 

117should_terminate = threading.Event() 

118 

119 

120# NOTE: STAT_PATH_VALVE_CONTROL_COMMAND の内容に基づいて、 

121# バルブを一定時間開けます。 

122# 時間を操作したテストを行うため、この関数の中では、 

123# time.time() の代わりに my_lib.rpi.gpio_time() を使う。 

124def control_worker(config, queue): # noqa: PLR0912, PLR0915, C901 

125 global should_terminate 

126 

127 sleep_sec = 0.1 

128 

129 logging.info("Start valve control worker") 

130 

131 time_open_start = None 

132 time_close = None 

133 flow = 0 

134 flow_sum = 0 

135 count_flow = 0 

136 count_zero = 0 

137 count_over = 0 

138 notify_last_time = None 

139 notify_last_flow_sum = 0 

140 notify_last_count = 0 

141 stop_measure = False 

142 

143 i = 0 

144 while True: 

145 if should_terminate.is_set(): 

146 break 

147 

148 if time_open_start is not None: 

149 flow = get_flow(config["flow"]["offset"])["flow"] 

150 logging.debug("Current flow: %.1f", flow) 

151 flow_sum += flow 

152 count_flow += 1 

153 

154 if (my_lib.rpi.gpio_time() - notify_last_time) > 10: 

155 # NOTE: 10秒ごとに途中集計を報告する 

156 queue.put( 

157 { 

158 "type": "instantaneous", 

159 "flow": float(flow_sum - notify_last_flow_sum) / (count_flow - notify_last_count), 

160 } 

161 ) 

162 

163 notify_last_time = my_lib.rpi.gpio_time() 

164 notify_last_flow_sum = flow_sum 

165 notify_last_count = count_flow 

166 

167 # NOTE: 以下の処理はファイルシステムへのアクセスが発生するので、実施頻度を落とす 

168 if i % 5 == 0: 

169 if time_open_start is None: 

170 if my_lib.footprint.exists(STAT_PATH_VALVE_OPEN): 

171 # NOTE: バルブが開かれていたら、状態を変更してトータルの水量の集計を開始する 

172 logging.info("Start flow measurement") 

173 

174 time_open_start = my_lib.rpi.gpio_time() 

175 notify_last_time = time_open_start 

176 # NOTE: バルブを閉じてから流量が 0 になるまでに再度開いた場合にエラーにならないようにする 

177 time_close = None 

178 else: 

179 if my_lib.footprint.exists(STAT_PATH_VALVE_CONTROL_COMMAND): 

180 # NOTE: バルブコマンドが存在したら、閉じる時間をチェックして、必要に応じて閉じる 

181 

182 try: 

183 time_to_close = my_lib.footprint.mtime(STAT_PATH_VALVE_CONTROL_COMMAND) 

184 if (my_lib.rpi.gpio_time() > time_to_close) or ( 

185 abs(my_lib.rpi.gpio_time() - time_to_close) < sleep_sec 

186 ): 

187 logging.info("Times is up, close valve") 

188 # NOTE: 下記の関数の中で 

189 # STAT_PATH_VALVE_CONTROL_COMMAND は削除される 

190 set_state(VALVE_STATE.CLOSE) 

191 time_close = my_lib.rpi.gpio_time() 

192 except Exception: 

193 logging.exception("Failed to read command") 

194 

195 if (time_close is None) and my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 

196 logging.info("May be manually closed") 

197 set_state(VALVE_STATE.CLOSE) 

198 time_close = my_lib.rpi.gpio_time() 

199 

200 if (time_close is not None) and (time_open_start is not None): 

201 period_sec = my_lib.rpi.gpio_time() - time_open_start 

202 

203 if flow < 0.1: 

204 count_zero += 1 

205 

206 if flow > config["flow"]["threshold"]["error"]: 

207 count_over += 1 

208 

209 if count_over > TIME_OVER_FAIL: 

210 set_state(VALVE_STATE.CLOSE) 

211 queue.put({"type": "error", "message": "😵水が流れすぎています。"}) 

212 

213 # NOTE: バルブが閉じられた後、流量が 0 になっていたらトータル流量を報告する 

214 if count_zero > TIME_ZERO_TAIL: 

215 logging.info("Stop flow measurement") 

216 

217 # NOTE: 流量(L/min)の平均を求めてから期間(min)を掛ける 

218 total = float(flow_sum) / count_flow * period_sec / 60 

219 

220 logging.debug( 

221 "(flow_sum, count_flow, period_sec, total) = (%1.f, %d, %d, %.1f)", 

222 flow_sum, 

223 count_flow, 

224 period_sec, 

225 total, 

226 ) 

227 

228 queue.put( 

229 { 

230 "type": "total", 

231 "period": period_sec, 

232 "total": total, 

233 } 

234 ) 

235 

236 if (period_sec > TIME_CLOSE_FAIL) and (total < 1): 

237 queue.put( 

238 { 

239 "type": "error", 

240 "message": "😵 元栓が閉まっている可能性があります。", 

241 } 

242 ) 

243 

244 stop_measure = True 

245 elif (my_lib.rpi.gpio_time() - time_close) > TIME_OPEN_FAIL: 

246 set_state(VALVE_STATE.CLOSE) 

247 queue.put( 

248 { 

249 "type": "error", 

250 "message": "😵 バルブを閉めても水が流れ続けています。", 

251 } 

252 ) 

253 stop_measure = True 

254 

255 if stop_measure: 

256 stop_measure = False 

257 time_open_start = None 

258 time_close = None 

259 flow_sum = 0 

260 count_flow = 0 

261 count_zero = 0 

262 count_over = 0 

263 

264 notify_last_time = None 

265 notify_last_flow_sum = 0 

266 notify_last_count = 0 

267 

268 time.sleep(sleep_sec) 

269 

270 if i % (10 / sleep_sec) == 0: 

271 my_lib.footprint.update(config["liveness"]["file"]["valve_control"]) 

272 

273 i += 1 

274 

275 logging.info("Terminate valve control worker") 

276 

277 

278def init(config_, queue, pin=GPIO_PIN_DEFAULT): 

279 global config # noqa: PLW0603 

280 global worker # noqa: PLW0603 

281 global pin_no # noqa: PLW0603 

282 global should_terminate 

283 global STAT_PATH_VALVE_CONTROL_COMMAND # noqa: PLW0603 

284 global STAT_PATH_VALVE_OPEN # noqa: PLW0603 

285 global STAT_PATH_VALVE_CLOSE # noqa: PLW0603 

286 

287 STAT_PATH_VALVE_CONTROL_COMMAND = my_lib.webapp.config.STAT_DIR_PATH / "valve" / "control" / "command" 

288 STAT_PATH_VALVE_OPEN = my_lib.webapp.config.STAT_DIR_PATH / "valve" / "open" 

289 STAT_PATH_VALVE_CLOSE = my_lib.webapp.config.STAT_DIR_PATH / "valve" / "close" 

290 

291 config = config_ 

292 

293 if worker is not None: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 raise ValueError("worker should be None") # noqa: TRY003, EM101 

295 

296 pin_no = pin 

297 

298 set_state(VALVE_STATE.CLOSE) 

299 

300 logging.info("Setting scale of ADC") 

301 if pathlib.Path(config["flow"]["sensor"]["adc"]["scale_file"]).exists(): 

302 with pathlib.Path(config["flow"]["sensor"]["adc"]["scale_file"]).open(mode="w") as f: 

303 f.write(str(config["flow"]["sensor"]["adc"]["scale_value"])) 

304 

305 should_terminate.clear() 

306 

307 worker = threading.Thread( 

308 target=control_worker, 

309 args=( 

310 config, 

311 queue, 

312 ), 

313 ) 

314 worker.start() 

315 

316 

317def term(): 

318 global worker # noqa: PLW0603 

319 

320 should_terminate.set() 

321 worker.join() 

322 

323 worker = None 

324 

325 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

326 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

327 my_lib.footprint.clear(STAT_PATH_VALVE_CONTROL_COMMAND) 

328 

329 my_lib.rpi.gpio.cleanup() 

330 

331 

332# NOTE: 実際にバルブを開きます。 

333def set_state(valve_state): 

334 global pin_no 

335 

336 logging.debug( 

337 "set_state = %s from %s at %s:%d", 

338 valve_state, 

339 inspect.stack()[1].function, 

340 inspect.stack()[1].filename, 

341 inspect.stack()[1].lineno, 

342 ) 

343 

344 curr_state = get_state() 

345 

346 if valve_state != curr_state: 

347 logging.info("VALVE: %s -> %s", curr_state.name, valve_state.name) 

348 

349 my_lib.rpi.gpio.setwarnings(False) 

350 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM) 

351 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT) 

352 my_lib.rpi.gpio.output(pin_no, valve_state.value) 

353 

354 if valve_state == VALVE_STATE.OPEN: 

355 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE) 

356 my_lib.footprint.update(STAT_PATH_VALVE_OPEN) 

357 else: 

358 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN) 

359 my_lib.footprint.clear(STAT_PATH_VALVE_CONTROL_COMMAND) 

360 my_lib.footprint.update(STAT_PATH_VALVE_CLOSE) 

361 

362 return get_state() 

363 

364 

365# NOTE: 実際のバルブの状態を返します 

366def get_state(): 

367 global pin_no 

368 

369 my_lib.rpi.gpio.setwarnings(False) 

370 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM) 

371 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT) 

372 

373 if my_lib.rpi.gpio.input(pin_no) == 1: 

374 return VALVE_STATE.OPEN 

375 else: 

376 return VALVE_STATE.CLOSE 

377 

378 

379def set_control_mode(open_sec): 

380 logging.info("Open valve for %d sec", open_sec) 

381 

382 set_state(VALVE_STATE.OPEN) 

383 my_lib.footprint.update(STAT_PATH_VALVE_CONTROL_COMMAND, my_lib.rpi.gpio_time() + open_sec) 

384 

385 

386def get_control_mode(): 

387 if my_lib.footprint.exists(STAT_PATH_VALVE_CONTROL_COMMAND): 

388 time_to_close = my_lib.footprint.mtime(STAT_PATH_VALVE_CONTROL_COMMAND) 

389 time_now = my_lib.rpi.gpio_time() 

390 

391 if time_to_close >= time_now: 

392 return { 

393 "mode": CONTROL_MODE.TIMER, 

394 "remain": time_to_close - time_now, 

395 } 

396 else: 

397 if (time_now - time_to_close) > 1: 397 ↛ 399line 397 didn't jump to line 399 because the condition on line 397 was always true

398 logging.warning("Timer control of the valve may be broken") 

399 return {"mode": CONTROL_MODE.TIMER, "remain": 0} 

400 else: 

401 return {"mode": CONTROL_MODE.IDLE, "remain": 0} 

402 

403 

404if __name__ == "__main__": 

405 from multiprocessing import Queue 

406 

407 import my_lib.config 

408 import my_lib.logger 

409 

410 my_lib.logger.init("test", level=logging.INFO) 

411 

412 config = my_lib.load() 

413 queue = Queue() 

414 init(config, queue) 

415 

416 set_state(VALVE_STATE.OPEN) 

417 time.sleep(0.5) 

418 logging.info("Flow: %.2f", get_flow(config["flow"]["offset"])["flow"]) 

419 time.sleep(0.5) 

420 logging.info("Flow: %.2f", get_flow(config["flow"]["offset"])["flow"]) 

421 set_state(VALVE_STATE.CLOSE) 

422 logging.info("Flow: %.2f", get_flow(config["flow"]["offset"])["flow"]) 

423 

424 set_control_mode(60) 

425 time.sleep(1) 

426 logging.info(get_control_mode()) 

427 time.sleep(1) 

428 logging.info(get_control_mode()) 

429 time.sleep(2) 

430 logging.info(get_control_mode()) 

431 

432 while True: 

433 info = queue.get() 

434 logging.info(info) 

435 

436 if info["type"] == "total": 

437 break 

438 

439 should_terminate.set()