Coverage for flask/src/rasp_shutter/control/scheduler.py: 34%

289 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-23 19:38 +0900

1#!/usr/bin/env python3 

2import datetime 

3import enum 

4import logging 

5import os 

6import pathlib 

7import re 

8import threading 

9import time 

10import traceback 

11 

12import my_lib.footprint 

13import my_lib.serializer 

14import my_lib.webapp.config 

15import my_lib.webapp.log 

16import rasp_shutter.control.config 

17import rasp_shutter.control.webapi.control 

18import rasp_shutter.control.webapi.sensor 

19import schedule 

20 

21 

22class BRIGHTNESS_STATE(enum.IntEnum): # noqa: N801 

23 DARK = 0 

24 BRIGHT = 1 

25 UNKNOWN = 2 

26 

27 

28RETRY_COUNT = 3 

29 

30schedule_lock = None 

31schedule_data = None 

32should_terminate = threading.Event() 

33 

34# Worker-specific instances for pytest-xdist parallel execution 

35_scheduler_instances = {} 

36_schedule_data_instances = {} 

37_auto_control_events = {} 

38 

39 

40def get_scheduler(): 

41 """Get worker-specific scheduler instance for pytest-xdist parallel execution""" 

42 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main") 

43 

44 if worker_id not in _scheduler_instances: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was never true

45 # Create a new scheduler instance for this worker 

46 _scheduler_instances[worker_id] = schedule.Scheduler() 

47 

48 return _scheduler_instances[worker_id] 

49 

50 

51def get_auto_control_event(): 

52 """テスト同期用のワーカー固有自動制御イベントを取得""" 

53 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main") 

54 

55 if worker_id not in _auto_control_events: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 _auto_control_events[worker_id] = threading.Event() 

57 

58 return _auto_control_events[worker_id] 

59 

60 

61def _signal_auto_control_completed(): 

62 """自動制御サイクルの完了をシグナル(テスト用)""" 

63 # テスト環境でのみイベントを設定 

64 if os.environ.get("PYTEST_CURRENT_TEST"): 

65 event = get_auto_control_event() 

66 event.set() 

67 

68 

69def wait_for_auto_control_completion(timeout=5.0): 

70 """自動制御の完了を待機(テスト用)""" 

71 if not os.environ.get("PYTEST_CURRENT_TEST"): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 return True 

73 

74 event = get_auto_control_event() 

75 event.clear() # 待機前にクリア 

76 return event.wait(timeout) 

77 

78 

79def get_schedule_data(): 

80 """Get worker-specific schedule data for pytest-xdist parallel execution""" 

81 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main") 

82 

83 if worker_id not in _schedule_data_instances: 

84 _schedule_data_instances[worker_id] = None 

85 

86 return _schedule_data_instances[worker_id] 

87 

88 

89def set_schedule_data(data): 

90 """Set worker-specific schedule data for pytest-xdist parallel execution""" 

91 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main") 

92 _schedule_data_instances[worker_id] = data 

93 

94 

95def init(): 

96 global schedule_lock # noqa: PLW0603 

97 global should_terminate 

98 

99 schedule_lock = threading.Lock() 

100 should_terminate.clear() 

101 

102 

103def term(): 

104 global should_terminate 

105 

106 should_terminate.set() 

107 

108 

109def brightness_text(sense_data, cur_schedule_data): 

110 text = [ 

111 "{sensor}: current {current:.1f} {cmp} threshold {threshold:.1f}".format( 

112 sensor=sensor, 

113 current=sense_data[sensor]["value"], 

114 threshold=cur_schedule_data[sensor], 

115 cmp=">" 

116 if sense_data[sensor]["value"] > cur_schedule_data[sensor] 

117 else ("<" if sense_data[sensor]["value"] < cur_schedule_data[sensor] else "="), 

118 ) 

119 for sensor in ["solar_rad", "lux", "altitude"] 

120 ] 

121 

122 return ", ".join(text) 

123 

124 

125def check_brightness(sense_data, action): 

126 if (not sense_data["lux"]["valid"]) or (not sense_data["solar_rad"]["valid"]): 

127 return BRIGHTNESS_STATE.UNKNOWN 

128 

129 schedule_data = get_schedule_data() 

130 

131 if action == "close": 

132 if ( 

133 (sense_data["lux"]["value"] < schedule_data[action]["lux"]) 

134 and (sense_data["solar_rad"]["value"] < schedule_data[action]["solar_rad"]) 

135 ) or (sense_data["altitude"]["value"] < schedule_data[action]["altitude"]): 

136 logging.info("Getting darker %s", brightness_text(sense_data, schedule_data[action])) 

137 return BRIGHTNESS_STATE.DARK 

138 else: 

139 return BRIGHTNESS_STATE.BRIGHT 

140 else: # noqa: PLR5501 

141 if ( 

142 (sense_data["lux"]["value"] > schedule_data[action]["lux"]) 

143 and (sense_data["solar_rad"]["value"] > schedule_data[action]["solar_rad"]) 

144 and (sense_data["altitude"]["value"] > schedule_data[action]["altitude"]) 

145 ): 

146 logging.info("Getting brighter %s", brightness_text(sense_data, schedule_data[action])) 

147 return BRIGHTNESS_STATE.BRIGHT 

148 else: 

149 return BRIGHTNESS_STATE.DARK 

150 

151 

152def exec_shutter_control_impl(config, state, mode, sense_data, user): 

153 try: 

154 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ 

155 rasp_shutter.control.webapi.control.set_shutter_state( 

156 config, list(range(len(config["shutter"]))), state, mode, sense_data, user 

157 ) 

158 return True 

159 except Exception as e: 

160 logging.warning(e) 

161 logging.warning(traceback.format_exc()) 

162 

163 return False 

164 

165 

166def exec_shutter_control(config, state, mode, sense_data, user): 

167 logging.debug("Execute shutter control") 

168 

169 for _ in range(RETRY_COUNT): 

170 if exec_shutter_control_impl(config, state, mode, sense_data, user): 

171 return True 

172 logging.debug("Retry") 

173 

174 my_lib.webapp.log.info("😵 シャッターの制御に失敗しました。") 

175 return False 

176 

177 

178def shutter_auto_open(config): 

179 logging.debug("try auto open") 

180 

181 schedule_data = get_schedule_data() 

182 if not schedule_data["open"]["is_active"]: 

183 logging.debug("inactive") 

184 return 

185 

186 elapsed_pendiing_open = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_PENDING_OPEN) 

187 if elapsed_pendiing_open > 6 * 60 * 60: 

188 # NOTE: 暗くて開けるのを延期されている場合以外は処理を行わない。 

189 logging.debug("NOT pending") 

190 return 

191 

192 if ( 

193 my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE) 

194 < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60 

195 ): 

196 # NOTE: 自動で閉めてから時間が経っていない場合は、処理を行わない。 

197 logging.debug( 

198 "just closed before %d", my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE) 

199 ) 

200 return 

201 

202 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config) 

203 if check_brightness(sense_data, "open") == BRIGHTNESS_STATE.BRIGHT: 

204 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data) 

205 my_lib.webapp.log.info(f"📝 暗くて延期されていましたが、明るくなってきたので開けます。{sensor_text}") 

206 

207 exec_shutter_control( 

208 config, 

209 "open", 

210 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO, 

211 sense_data, 

212 "sensor", 

213 ) 

214 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN) 

215 my_lib.footprint.clear(rasp_shutter.control.config.STAT_AUTO_CLOSE) 

216 else: 

217 logging.debug( 

218 "Skip pendding open (solar_rad: %.1f W/m^2, lux: %.1f LUX)", 

219 sense_data["solar_rad"]["value"] if sense_data["solar_rad"]["valid"] else -1, 

220 sense_data["lux"]["value"] if sense_data["lux"]["valid"] else -1, 

221 ) 

222 

223 

224def conv_schedule_time_to_datetime(schedule_time): 

225 return ( 

226 datetime.datetime.strptime( 

227 my_lib.time.now().strftime("%Y/%m/%d ") + schedule_time, 

228 "%Y/%m/%d %H:%M", 

229 ) 

230 ).replace( 

231 tzinfo=my_lib.time.get_zoneinfo(), 

232 day=my_lib.time.now().day, 

233 ) 

234 

235 

236def shutter_auto_close(config): 

237 logging.debug("try auto close") 

238 

239 schedule_data = get_schedule_data() 

240 if not schedule_data["close"]["is_active"]: 

241 logging.debug("inactive") 

242 return 

243 elif abs( 

244 my_lib.time.now() - conv_schedule_time_to_datetime(schedule_data["open"]["time"]) 

245 ) < datetime.timedelta(minutes=1): 

246 # NOTE: 開ける時刻付近の場合は処理しない 

247 logging.debug("near open time") 

248 return 

249 elif ( 

250 my_lib.time.now() <= conv_schedule_time_to_datetime(schedule_data["open"]["time"]) 

251 ) or my_lib.footprint.exists(rasp_shutter.control.config.STAT_PENDING_OPEN): 

252 # NOTE: 開ける時刻よりも早い場合は処理しない 

253 logging.debug("before open time") 

254 return 

255 elif conv_schedule_time_to_datetime(schedule_data["close"]["time"]) <= my_lib.time.now(): 

256 # NOTE: スケジュールで閉めていた場合は処理しない 

257 logging.debug("after close time") 

258 return 

259 elif my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE) <= 12 * 60 * 60: 

260 # NOTE: 12時間以内に自動で閉めていた場合は処理しない 

261 logging.debug("already close") 

262 return 

263 

264 for index in range(len(config["shutter"])): 

265 if ( 

266 my_lib.footprint.elapsed(rasp_shutter.control.webapi.control.exec_stat_file("open", index)) 

267 < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60 

268 ): 

269 # NOTE: 自動で開けてから時間が経っていない場合は、処理を行わない。 

270 logging.debug( 

271 "just opened before %d sec (%d)", 

272 my_lib.footprint.elapsed(rasp_shutter.control.webapi.control.exec_stat_file("open", index)), 

273 index, 

274 ) 

275 return 

276 

277 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config) 

278 if check_brightness(sense_data, "close") == BRIGHTNESS_STATE.DARK: 

279 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data) 

280 my_lib.webapp.log.info( 

281 f"📝 予定より早いですが、暗くなってきたので閉めます。{sensor_text}", 

282 ) 

283 

284 exec_shutter_control( 

285 config, 

286 "close", 

287 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO, 

288 sense_data, 

289 "sensor", 

290 ) 

291 logging.info("Set Auto CLOSE") 

292 my_lib.footprint.update(rasp_shutter.control.config.STAT_AUTO_CLOSE) 

293 

294 # NOTE: まだ明るくなる可能性がある時間帯の場合、再度自動的に開けるようにする 

295 hour = my_lib.time.now().hour 

296 if (hour > 5) and (hour < 13): 

297 logging.info("Set Pending OPEN") 

298 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN) 

299 

300 else: # pragma: no cover 

301 # NOTE: pending close の制御は無いのでここには来ない。 

302 logging.debug( 

303 "Skip pendding close (solar_rad: %.1f W/m^2, lux: %.1f LUX)", 

304 sense_data["solar_rad"]["value"] if sense_data["solar_rad"]["valid"] else -1, 

305 sense_data["lux"]["value"] if sense_data["lux"]["valid"] else -1, 

306 ) 

307 

308 

309def shutter_auto_control(config): 

310 hour = my_lib.time.now().hour 

311 

312 # NOTE: 時間帯によって自動制御の内容を分ける 

313 if (hour > 5) and (hour < 12): 

314 shutter_auto_open(config) 

315 

316 if (hour > 5) and (hour < 20): 

317 shutter_auto_close(config) 

318 

319 # テスト同期用の完了シグナル 

320 _signal_auto_control_completed() 

321 

322 

323def shutter_schedule_control(config, state): 

324 logging.info("Execute schedule control") 

325 

326 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config) 

327 

328 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.UNKNOWN: 

329 error_sensor = [] 

330 

331 if not sense_data["solar_rad"]["valid"]: 

332 error_sensor.append("日射センサ") 

333 if not sense_data["lux"]["valid"]: 

334 error_sensor.append("照度センサ") 

335 

336 my_lib.webapp.log.error( 

337 "😵 {error_sensor}の値が不明なので{state}るのを見合わせました。".format( 

338 error_sensor="と".join(error_sensor), 

339 state="開け" if state == "open" else "閉め", 

340 ) 

341 ) 

342 return 

343 

344 if state == "open": 

345 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.DARK: 

346 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data) 

347 my_lib.webapp.log.info(f"📝 まだ暗いので開けるのを見合わせました。{sensor_text}") 

348 

349 rasp_shutter.control.webapi.control.cmd_hist_push( 

350 { 

351 "cmd": "pending", 

352 "state": state, 

353 } 

354 ) 

355 

356 # NOTE: 暗いので開けれなかったことを通知 

357 logging.info("Set Pending OPEN") 

358 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN) 

359 else: 

360 # NOTE: ここにきたときのみ、スケジュールに従って開ける 

361 exec_shutter_control( 

362 config, 

363 state, 

364 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE, 

365 sense_data, 

366 "scheduler", 

367 ) 

368 else: 

369 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN) 

370 exec_shutter_control( 

371 config, 

372 state, 

373 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE, 

374 sense_data, 

375 "scheduler", 

376 ) 

377 

378 

379def schedule_validate(schedule_data): # noqa: C901, PLR0911 

380 if len(schedule_data) != 2: 

381 logging.warning("Count of entry is Invalid: %d", len(schedule_data)) 

382 return False 

383 

384 for entry in schedule_data.values(): 

385 for key in ["is_active", "time", "wday", "solar_rad", "lux", "altitude"]: 

386 if key not in entry: 

387 logging.warning("Does not contain %s", key) 

388 return False 

389 if type(entry["is_active"]) is not bool: 

390 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"])) 

391 return False 

392 if type(entry["lux"]) is not int: 

393 logging.warning("Type of lux is invalid: %s", type(entry["lux"])) 

394 return False 

395 if type(entry["altitude"]) is not int: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 logging.warning("Type of altitude is invalid: %s", type(entry["altitude"])) 

397 return False 

398 if type(entry["solar_rad"]) is not int: 

399 logging.warning("Type of solar_rad is invalid: %s", type(entry["solar_rad"])) 

400 return False 

401 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]): 

402 logging.warning("Format of time is invalid: %s", entry["time"]) 

403 return False 

404 if len(entry["wday"]) != 7: 

405 logging.warning("Count of wday is Invalid: %d", len(entry["wday"])) 

406 return False 

407 for i, wday_flag in enumerate(entry["wday"]): 

408 if type(wday_flag) is not bool: 

409 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i])) 

410 return False 

411 return True 

412 

413 

414def schedule_store(schedule_data): 

415 global schedule_lock 

416 try: 

417 with schedule_lock: 

418 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data) 

419 except Exception: 

420 logging.exception("Failed to save schedule settings.") 

421 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。") 

422 

423 

424def gen_schedule_default(): 

425 schedule_data = { 

426 "is_active": False, 

427 "time": "00:00", 

428 "solar_rad": 0, 

429 "lux": 0, 

430 "altitude": 0, 

431 "wday": [True] * 7, 

432 } 

433 

434 return { 

435 "open": schedule_data | {"time": "08:00", "solar_rad": 150, "lux": 1000}, 

436 "close": schedule_data | {"time": "17:00", "solar_rad": 80, "lux": 1200}, 

437 } 

438 

439 

440def schedule_load(): 

441 global schedule_lock 

442 

443 schedule_default = gen_schedule_default() 

444 

445 try: 

446 with schedule_lock: 

447 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default) 

448 if schedule_validate(schedule_data): 

449 return schedule_data 

450 except Exception: 

451 logging.exception("Failed to load schedule settings.") 

452 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。") 

453 

454 return schedule_default 

455 

456 

457def set_schedule(config, schedule_data): # noqa: C901 

458 scheduler = get_scheduler() 

459 scheduler.clear() 

460 

461 for state, entry in schedule_data.items(): 

462 if not entry["is_active"]: 

463 continue 

464 

465 if entry["wday"][0]: 

466 scheduler.every().sunday.at(entry["time"], my_lib.time.get_pytz()).do( 

467 shutter_schedule_control, config, state 

468 ) 

469 if entry["wday"][1]: 

470 scheduler.every().monday.at(entry["time"], my_lib.time.get_pytz()).do( 

471 shutter_schedule_control, config, state 

472 ) 

473 if entry["wday"][2]: 

474 scheduler.every().tuesday.at(entry["time"], my_lib.time.get_pytz()).do( 

475 shutter_schedule_control, config, state 

476 ) 

477 if entry["wday"][3]: 

478 scheduler.every().wednesday.at(entry["time"], my_lib.time.get_pytz()).do( 

479 shutter_schedule_control, config, state 

480 ) 

481 if entry["wday"][4]: 

482 scheduler.every().thursday.at(entry["time"], my_lib.time.get_pytz()).do( 

483 shutter_schedule_control, config, state 

484 ) 

485 if entry["wday"][5]: 

486 scheduler.every().friday.at(entry["time"], my_lib.time.get_pytz()).do( 

487 shutter_schedule_control, config, state 

488 ) 

489 if entry["wday"][6]: 

490 scheduler.every().saturday.at(entry["time"], my_lib.time.get_pytz()).do( 

491 shutter_schedule_control, config, state 

492 ) 

493 

494 for job in scheduler.get_jobs(): 

495 logging.info("Next run: %s", job.next_run) 

496 

497 idle_sec = scheduler.idle_seconds 

498 if idle_sec is not None: 

499 hours, remainder = divmod(idle_sec, 3600) 

500 minutes, seconds = divmod(remainder, 60) 

501 

502 logging.info( 

503 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)", 

504 my_lib.time.now().strftime("%Y-%m-%d %H:%M"), 

505 hours, 

506 minutes, 

507 seconds, 

508 ) 

509 

510 scheduler.every(1).seconds.do(shutter_auto_control, config) 

511 

512 

513def schedule_worker(config, queue): 

514 global should_terminate 

515 

516 sleep_sec = 0.5 

517 scheduler = get_scheduler() 

518 

519 liveness_file = pathlib.Path(config["liveness"]["file"]["scheduler"]) 

520 

521 logging.info("Load schedule") 

522 schedule_data = schedule_load() 

523 set_schedule_data(schedule_data) 

524 

525 set_schedule(config, schedule_data) 

526 

527 logging.info("Start schedule worker") 

528 

529 i = 0 

530 while True: 

531 if should_terminate.is_set(): 

532 scheduler.clear() 

533 break 

534 

535 try: 

536 if not queue.empty(): 

537 schedule_data = queue.get() 

538 set_schedule_data(schedule_data) 

539 set_schedule(config, schedule_data) 

540 schedule_store(schedule_data) 

541 

542 idle_sec = scheduler.idle_seconds 

543 if idle_sec is not None: 

544 hours, remainder = divmod(idle_sec, 3600) 

545 minutes, seconds = divmod(remainder, 60) 

546 

547 scheduler.run_pending() 

548 

549 logging.debug("Sleep %.1f sec...", sleep_sec) 

550 time.sleep(sleep_sec) 

551 except OverflowError: # pragma: no cover 

552 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する 

553 logging.debug(traceback.format_exc()) 

554 

555 if i % (10 / sleep_sec) == 0: 

556 my_lib.footprint.update(liveness_file) 

557 

558 i += 1 

559 

560 logging.info("Terminate schedule worker") 

561 

562 

563if __name__ == "__main__": 

564 import multiprocessing 

565 import multiprocessing.pool 

566 

567 import my_lib.config 

568 import my_lib.logger 

569 

570 my_lib.logger.init("test", level=logging.DEBUG) 

571 

572 def test_func(): 

573 logging.info("TEST") 

574 

575 should_terminate.set() 

576 

577 config = my_lib.config.load() 

578 queue = multiprocessing.Queue() 

579 

580 init() 

581 

582 pool = multiprocessing.pool.ThreadPool(processes=1) 

583 result = pool.apply_async(schedule_worker, (config, queue)) 

584 

585 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5) 

586 queue.put( 

587 { 

588 "open": { 

589 "time": exec_time.strftime("%H:%M"), 

590 "is_active": True, 

591 "wday": [True] * 7, 

592 "solar_rad": 0, 

593 "lux": 0, 

594 "altitude": 0, 

595 "func": test_func, 

596 } 

597 } 

598 ) 

599 

600 # NOTE: 終了するのを待つ 

601 result.get()