Coverage for src / rasp_shutter / control / scheduler.py: 90%

367 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-13 00:10 +0900

1#!/usr/bin/env python3 

2import datetime 

3import enum 

4import logging 

5import re 

6import threading 

7import time 

8import traceback 

9from typing import Any 

10 

11import my_lib.footprint 

12import my_lib.pytest_util 

13import my_lib.serializer 

14import my_lib.time 

15import my_lib.webapp.config 

16import my_lib.webapp.log 

17import schedule 

18 

19import rasp_shutter.config 

20import rasp_shutter.control.config 

21import rasp_shutter.control.webapi.control 

22import rasp_shutter.control.webapi.sensor 

23import rasp_shutter.type_defs 

24import rasp_shutter.util 

25 

26 

27class BRIGHTNESS_STATE(enum.IntEnum): 

28 DARK = 0 

29 BRIGHT = 1 

30 UNKNOWN = 2 

31 

32 

33RETRY_COUNT = 3 

34 

35# schedule ライブラリの曜日メソッド名(日曜始まり、wday[0]=日曜 に対応) 

36WEEKDAY_METHODS = ["sunday", "monday", "tuesday", "wednesday", "thursday", "friday", "saturday"] 

37 

38# 時刻フォーマット検証用パターン(HH:MM形式) 

39SCHEDULE_TIME_PATTERN = re.compile(r"\d{2}:\d{2}") 

40 

41should_terminate = threading.Event() 

42 

43# Worker-specific instances for pytest-xdist parallel execution 

44_scheduler_instances: dict[str, schedule.Scheduler] = {} 

45_schedule_data_instances: dict[str, rasp_shutter.type_defs.ScheduleData | None] = {} 

46_schedule_lock_instances: dict[str, threading.Lock] = {} 

47_auto_control_events: dict[str, threading.Event] = {} 

48 

49# ワーカー固有のループシーケンス番号(テスト同期用) 

50_loop_sequence: dict[str, int] = {} 

51_loop_condition: dict[str, threading.Condition] = {} 

52_loop_condition_lock = threading.Lock() 

53 

54 

55def get_scheduler() -> schedule.Scheduler: 

56 """Get worker-specific scheduler instance for pytest-xdist parallel execution""" 

57 worker_id = my_lib.pytest_util.get_worker_id() 

58 

59 if worker_id not in _scheduler_instances: 

60 # Create a new scheduler instance for this worker 

61 _scheduler_instances[worker_id] = schedule.Scheduler() 

62 

63 return _scheduler_instances[worker_id] 

64 

65 

66def get_schedule_lock() -> threading.Lock: 

67 """Get worker-specific schedule lock for pytest-xdist parallel execution""" 

68 worker_id = my_lib.pytest_util.get_worker_id() 

69 

70 if worker_id not in _schedule_lock_instances: 

71 _schedule_lock_instances[worker_id] = threading.Lock() 

72 

73 return _schedule_lock_instances[worker_id] 

74 

75 

76def clear_scheduler_jobs() -> None: 

77 """スケジューラのジョブとスケジュールデータをクリア(テスト用) 

78 

79 テスト間でスケジューラの状態が干渉しないようにするため、 

80 各テスト開始前に呼び出す。 

81 """ 

82 worker_id = my_lib.pytest_util.get_worker_id() 

83 

84 # スケジューラインスタンスのジョブをクリア 

85 if worker_id in _scheduler_instances: 85 ↛ 91line 85 didn't jump to line 91 because the condition on line 85 was always true

86 scheduler = _scheduler_instances[worker_id] 

87 scheduler.clear() 

88 logging.debug("Cleared scheduler jobs for worker %s", worker_id) 

89 

90 # スケジュールデータをクリア 

91 if worker_id in _schedule_data_instances: 91 ↛ exitline 91 didn't return from function 'clear_scheduler_jobs' because the condition on line 91 was always true

92 _schedule_data_instances[worker_id] = None 

93 logging.debug("Cleared schedule data for worker %s", worker_id) 

94 

95 

96def reset_loop_sequence() -> None: 

97 """ループシーケンス番号をリセット(テスト用) 

98 

99 テスト間でシーケンス番号が累積しないようにリセットする。 

100 """ 

101 worker_id = my_lib.pytest_util.get_worker_id() 

102 if worker_id in _loop_sequence: 

103 _loop_sequence[worker_id] = 0 

104 logging.debug("Reset loop sequence for worker %s", worker_id) 

105 

106 

107def get_auto_control_event(): 

108 """テスト同期用のワーカー固有自動制御イベントを取得""" 

109 worker_id = my_lib.pytest_util.get_worker_id() 

110 

111 if worker_id not in _auto_control_events: 

112 _auto_control_events[worker_id] = threading.Event() 

113 

114 return _auto_control_events[worker_id] 

115 

116 

117def _signal_auto_control_completed(): 

118 """自動制御サイクルの完了をシグナル(テスト用)""" 

119 # テスト環境でのみイベントを設定 

120 if rasp_shutter.util.is_pytest_running(): 120 ↛ exitline 120 didn't return from function '_signal_auto_control_completed' because the condition on line 120 was always true

121 event = get_auto_control_event() 

122 event.set() 

123 

124 

125def wait_for_auto_control_completion(timeout=5.0): 

126 """自動制御の完了を待機(テスト用)""" 

127 if not rasp_shutter.util.is_pytest_running(): 

128 return True 

129 

130 event = get_auto_control_event() 

131 event.clear() # 待機前にクリア 

132 return event.wait(timeout) 

133 

134 

135def _get_loop_condition() -> threading.Condition: 

136 """ループ完了通知用のConditionを取得(スレッドセーフ)""" 

137 worker_id = my_lib.pytest_util.get_worker_id() 

138 with _loop_condition_lock: 

139 if worker_id not in _loop_condition: 

140 _loop_condition[worker_id] = threading.Condition() 

141 return _loop_condition[worker_id] 

142 

143 

144def get_loop_sequence() -> int: 

145 """現在のループシーケンス番号を取得""" 

146 worker_id = my_lib.pytest_util.get_worker_id() 

147 return _loop_sequence.get(worker_id, 0) 

148 

149 

150def _increment_loop_sequence() -> None: 

151 """ループシーケンス番号をインクリメントして通知""" 

152 worker_id = my_lib.pytest_util.get_worker_id() 

153 condition = _get_loop_condition() 

154 with condition: 

155 _loop_sequence[worker_id] = _loop_sequence.get(worker_id, 0) + 1 

156 condition.notify_all() 

157 

158 

159def wait_for_loop_after(sequence: int, timeout: float = 10.0) -> bool: 

160 """指定シーケンス番号より大きくなるまで待機 

161 

162 Args: 

163 sequence: 待機開始時のシーケンス番号 

164 timeout: タイムアウト秒数 

165 

166 Returns: 

167 成功したら True、タイムアウトしたら False 

168 """ 

169 # NOTE: threading.Condition を使用して効率的に待機する。 

170 # _increment_loop_sequence() が notify_all() を呼ぶので、 

171 # シーケンス番号がインクリメントされた時点で即座に待機が終了する。 

172 # time_machine の影響を避けるため、time.perf_counter() でタイムアウトをチェックし、 

173 # Condition.wait() は短い間隔で呼び出す。 

174 condition = _get_loop_condition() 

175 start = time.perf_counter() # time_machineの影響を受けない 

176 poll_interval = 0.1 # 100ms間隔でCondition.waitを呼び出す 

177 

178 with condition: 

179 while time.perf_counter() - start < timeout: 

180 if get_loop_sequence() > sequence: 

181 return True 

182 # NOTE: Condition.wait() を使用することで、notify_all() 呼び出し時に 

183 # 即座に起床する。poll_interval は time_machine の影響を受ける可能性があるが、 

184 # 外側の time.perf_counter() チェックでタイムアウトを正確に管理する。 

185 condition.wait(timeout=poll_interval) 

186 

187 return get_loop_sequence() > sequence 

188 

189 

190def get_schedule_data() -> rasp_shutter.type_defs.ScheduleData | None: 

191 """Get worker-specific schedule data for pytest-xdist parallel execution""" 

192 worker_id = my_lib.pytest_util.get_worker_id() 

193 

194 if worker_id not in _schedule_data_instances: 

195 _schedule_data_instances[worker_id] = None 

196 

197 return _schedule_data_instances[worker_id] 

198 

199 

200def set_schedule_data(data: rasp_shutter.type_defs.ScheduleData | dict[str, Any] | None) -> None: 

201 """Set worker-specific schedule data for pytest-xdist parallel execution""" 

202 worker_id = my_lib.pytest_util.get_worker_id() 

203 _schedule_data_instances[worker_id] = data # type: ignore[assignment] 

204 

205 

206def init() -> None: 

207 global should_terminate 

208 

209 # ワーカー固有のロックを初期化 

210 get_schedule_lock() 

211 should_terminate.clear() 

212 

213 

214def term(): 

215 global should_terminate 

216 

217 should_terminate.set() 

218 

219 

220def brightness_text( 

221 sense_data: rasp_shutter.type_defs.SensorData, 

222 cur_schedule_data: rasp_shutter.type_defs.ScheduleEntry | dict[str, Any], 

223) -> str: 

224 # TypedDictへの動的アクセスを避けるため、dictに展開 

225 schedule_dict: dict[str, Any] = {**cur_schedule_data} 

226 

227 def sensor_text(sensor: str) -> str: 

228 sensor_value = getattr(sense_data, sensor) 

229 current = sensor_value.value 

230 threshold = schedule_dict[sensor] 

231 if current > threshold: 

232 cmp = ">" 

233 elif current < threshold: 

234 cmp = "<" 

235 else: 

236 cmp = "=" 

237 return f"{sensor}: current {current:.1f} {cmp} threshold {threshold:.1f}" 

238 

239 text = [sensor_text(sensor) for sensor in ["solar_rad", "lux", "altitude"]] 

240 

241 return ", ".join(text) 

242 

243 

244def check_brightness(sense_data: rasp_shutter.type_defs.SensorData, action: str) -> BRIGHTNESS_STATE: 

245 if not sense_data.lux.valid or not sense_data.solar_rad.valid: 

246 return BRIGHTNESS_STATE.UNKNOWN 

247 

248 schedule_data = get_schedule_data() 

249 if schedule_data is None: 249 ↛ 251line 249 didn't jump to line 251 because the condition on line 249 was never true

250 # テスト間のクリア中は不明として扱う 

251 return BRIGHTNESS_STATE.UNKNOWN 

252 

253 # validがTrueの場合、valueはNoneではない 

254 lux_value = sense_data.lux.value 

255 solar_rad_value = sense_data.solar_rad.value 

256 altitude_value = sense_data.altitude.value 

257 assert lux_value is not None and solar_rad_value is not None # noqa: S101 

258 assert altitude_value is not None # noqa: S101 

259 

260 if action == "close": 

261 close_data = schedule_data["close"] 

262 if ( 

263 lux_value < close_data["lux"] 

264 or solar_rad_value < close_data["solar_rad"] 

265 or altitude_value < close_data["altitude"] 

266 ): 

267 logging.info("Getting darker %s", brightness_text(sense_data, close_data)) 

268 return BRIGHTNESS_STATE.DARK 

269 else: 

270 return BRIGHTNESS_STATE.BRIGHT 

271 else: 

272 open_data = schedule_data["open"] 

273 if ( 

274 lux_value > open_data["lux"] 

275 and solar_rad_value > open_data["solar_rad"] 

276 and altitude_value > open_data["altitude"] 

277 ): 

278 logging.info("Getting brighter %s", brightness_text(sense_data, open_data)) 

279 return BRIGHTNESS_STATE.BRIGHT 

280 else: 

281 return BRIGHTNESS_STATE.DARK 

282 

283 

284def exec_shutter_control_impl( 

285 config: rasp_shutter.config.AppConfig, 

286 state: str, 

287 mode: rasp_shutter.control.webapi.control.CONTROL_MODE, 

288 sense_data: rasp_shutter.type_defs.SensorData, 

289 user: str, 

290) -> bool: 

291 try: 

292 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ 

293 rasp_shutter.control.webapi.control.set_shutter_state( 

294 config, list(range(len(config.shutter))), state, mode, sense_data, user 

295 ) 

296 return True 

297 except Exception: 

298 logging.exception("Failed to control shutter") 

299 

300 return False 

301 

302 

303def exec_shutter_control( 

304 config: rasp_shutter.config.AppConfig, 

305 state: str, 

306 mode: rasp_shutter.control.webapi.control.CONTROL_MODE, 

307 sense_data: rasp_shutter.type_defs.SensorData, 

308 user: str, 

309) -> bool: 

310 logging.debug("Execute shutter control") 

311 

312 for _ in range(RETRY_COUNT): 

313 if exec_shutter_control_impl(config, state, mode, sense_data, user): 

314 return True 

315 logging.debug("Retry") 

316 

317 my_lib.webapp.log.info("😵 シャッターの制御に失敗しました。") 

318 return False 

319 

320 

321def shutter_auto_open(config: rasp_shutter.config.AppConfig) -> None: 

322 logging.debug("try auto open") 

323 

324 schedule_data = get_schedule_data() 

325 if schedule_data is None: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was never true

326 # テスト間のクリア中は何もしない 

327 logging.debug("Schedule data not set, skipping auto open") 

328 return 

329 if not schedule_data["open"]["is_active"]: 

330 logging.debug("inactive") 

331 return 

332 

333 elapsed_pending_open = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()) 

334 if elapsed_pending_open > rasp_shutter.control.config.ELAPSED_PENDING_OPEN_MAX_SEC: 

335 # NOTE: 暗くて開けるのを延期されている場合以外は処理を行わない。 

336 logging.debug("NOT pending") 

337 return 

338 

339 elapsed_auto_close = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path()) 

340 if elapsed_auto_close < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60: 340 ↛ 342line 340 didn't jump to line 342 because the condition on line 340 was never true

341 # NOTE: 自動で閉めてから時間が経っていない場合は、処理を行わない。 

342 logging.debug("just closed before %d", elapsed_auto_close) 

343 return 

344 

345 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config) 

346 if check_brightness(sense_data, "open") == BRIGHTNESS_STATE.BRIGHT: 

347 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data) 

348 my_lib.webapp.log.info(f"🌅 暗くて延期されていましたが、明るくなってきたので開けます。{sensor_text}") 

349 

350 exec_shutter_control( 

351 config, 

352 "open", 

353 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO, 

354 sense_data, 

355 "sensor", 

356 ) 

357 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()) 

358 my_lib.footprint.clear(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path()) 

359 else: 

360 logging.debug( 

361 "Skip pendding open (solar_rad: %.1f W/m^2, lux: %.1f LUX)", 

362 sense_data.solar_rad.value if sense_data.solar_rad.valid else -1, 

363 sense_data.lux.value if sense_data.lux.valid else -1, 

364 ) 

365 

366 

367def conv_schedule_time_to_datetime(schedule_time: str) -> datetime.datetime: 

368 now = my_lib.time.now() 

369 time_obj = datetime.datetime.strptime(schedule_time, "%H:%M").time() 

370 return datetime.datetime.combine(now.date(), time_obj, tzinfo=my_lib.time.get_zoneinfo()) 

371 

372 

373def shutter_auto_close(config: rasp_shutter.config.AppConfig) -> None: 

374 logging.debug("try auto close") 

375 

376 schedule_data = get_schedule_data() 

377 if schedule_data is None: 377 ↛ 379line 377 didn't jump to line 379 because the condition on line 377 was never true

378 # テスト間のクリア中は何もしない 

379 logging.debug("Schedule data not set, skipping auto close") 

380 return 

381 if not schedule_data["close"]["is_active"]: 

382 logging.debug("inactive") 

383 return 

384 elif abs( 

385 my_lib.time.now() - conv_schedule_time_to_datetime(schedule_data["open"]["time"]) 

386 ) < datetime.timedelta(minutes=1): 

387 # NOTE: 開ける時刻付近の場合は処理しない 

388 logging.debug("near open time") 

389 return 

390 elif ( 

391 my_lib.time.now() <= conv_schedule_time_to_datetime(schedule_data["open"]["time"]) 

392 ) or my_lib.footprint.exists(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()): 

393 # NOTE: 開ける時刻よりも早い場合は処理しない 

394 logging.debug("before open time") 

395 return 

396 elif conv_schedule_time_to_datetime(schedule_data["close"]["time"]) <= my_lib.time.now(): 

397 # NOTE: スケジュールで閉めていた場合は処理しない 

398 logging.debug("after close time") 

399 return 

400 elif ( 

401 my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path()) 

402 <= rasp_shutter.control.config.ELAPSED_AUTO_CLOSE_MAX_SEC 

403 ): 

404 # NOTE: 12時間以内に自動で閉めていた場合は処理しない 

405 logging.debug("already close") 

406 return 

407 

408 for index in range(len(config.shutter)): 

409 elapsed_open = my_lib.footprint.elapsed( 

410 rasp_shutter.control.webapi.control.exec_stat_file("open", index) 

411 ) 

412 if elapsed_open < rasp_shutter.control.config.EXEC_INTERVAL_AUTO_MIN * 60: 

413 # NOTE: 自動で開けてから時間が経っていない場合は、処理を行わない。 

414 logging.debug("just opened before %d sec (%d)", elapsed_open, index) 

415 return 

416 

417 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config) 

418 if check_brightness(sense_data, "close") == BRIGHTNESS_STATE.DARK: 

419 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data) 

420 my_lib.webapp.log.info( 

421 f"🌇 予定より早いですが、暗くなってきたので閉めます。{sensor_text}", 

422 ) 

423 

424 exec_shutter_control( 

425 config, 

426 "close", 

427 rasp_shutter.control.webapi.control.CONTROL_MODE.AUTO, 

428 sense_data, 

429 "sensor", 

430 ) 

431 logging.info("Set Auto CLOSE") 

432 my_lib.footprint.update(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path()) 

433 

434 # NOTE: まだ明るくなる可能性がある時間帯の場合、再度自動的に開けるようにする 

435 hour = my_lib.time.now().hour 

436 if ( 

437 hour > rasp_shutter.control.config.HOUR_MORNING_START 

438 and hour < rasp_shutter.control.config.HOUR_PENDING_OPEN_END 

439 ): 

440 logging.info("Set Pending OPEN") 

441 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()) 

442 

443 else: # pragma: no cover 

444 # NOTE: pending close の制御は無いのでここには来ない。 

445 logging.debug( 

446 "Skip pendding close (solar_rad: %.1f W/m^2, lux: %.1f LUX)", 

447 sense_data.solar_rad.value if sense_data.solar_rad.valid else -1, 

448 sense_data.lux.value if sense_data.lux.valid else -1, 

449 ) 

450 

451 

452def shutter_auto_control(config: rasp_shutter.config.AppConfig) -> None: 

453 hour = my_lib.time.now().hour 

454 cfg = rasp_shutter.control.config 

455 

456 # NOTE: 時間帯によって自動制御の内容を分ける 

457 if hour > cfg.HOUR_MORNING_START and hour < cfg.HOUR_AUTO_OPEN_END: 

458 shutter_auto_open(config) 

459 

460 if hour > cfg.HOUR_MORNING_START and hour < cfg.HOUR_AUTO_CLOSE_END: 

461 shutter_auto_close(config) 

462 

463 # テスト同期用の完了シグナル 

464 _signal_auto_control_completed() 

465 

466 

467def shutter_schedule_control(config: rasp_shutter.config.AppConfig, state: str) -> None: 

468 logging.info("Execute schedule control") 

469 

470 sense_data = rasp_shutter.control.webapi.sensor.get_sensor_data(config) 

471 

472 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.UNKNOWN: 

473 error_sensor = [] 

474 

475 if not sense_data.solar_rad.valid: 

476 error_sensor.append("日射センサ") 

477 if not sense_data.lux.valid: 

478 error_sensor.append("照度センサ") 

479 

480 error_sensor_text = "と".join(error_sensor) 

481 state_text = rasp_shutter.type_defs.state_to_action_text(state) 

482 my_lib.webapp.log.error(f"😵 {error_sensor_text}の値が不明なので{state_text}るのを見合わせました。") 

483 _signal_auto_control_completed() 

484 return 

485 

486 if state == "open": 

487 if check_brightness(sense_data, state) == BRIGHTNESS_STATE.DARK: 487 ↛ 503line 487 didn't jump to line 503 because the condition on line 487 was always true

488 sensor_text = rasp_shutter.control.webapi.control.sensor_text(sense_data) 

489 my_lib.webapp.log.info(f"📝 まだ暗いので開けるのを見合わせました。{sensor_text}") 

490 

491 rasp_shutter.control.webapi.control.cmd_hist_push( 

492 { 

493 "cmd": "pending", 

494 "state": state, 

495 } 

496 ) 

497 

498 # NOTE: 暗いので開けれなかったことを通知 

499 logging.info("Set Pending OPEN") 

500 my_lib.footprint.update(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()) 

501 else: 

502 # NOTE: ここにきたときのみ、スケジュールに従って開ける 

503 exec_shutter_control( 

504 config, 

505 state, 

506 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE, 

507 sense_data, 

508 "scheduler", 

509 ) 

510 else: 

511 my_lib.footprint.clear(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path()) 

512 exec_shutter_control( 

513 config, 

514 state, 

515 rasp_shutter.control.webapi.control.CONTROL_MODE.SCHEDULE, 

516 sense_data, 

517 "scheduler", 

518 ) 

519 

520 # テスト同期用の完了シグナル 

521 _signal_auto_control_completed() 

522 

523 

524SCHEDULE_FIELD_TYPES: dict[str, type] = { 

525 "is_active": bool, 

526 "lux": int, 

527 "altitude": int, 

528 "solar_rad": int, 

529} 

530 

531 

532def schedule_validate(schedule_data: dict) -> bool: 

533 if len(schedule_data) != 2: 

534 logging.warning("Count of entry is Invalid: %d", len(schedule_data)) 

535 return False 

536 

537 for entry in schedule_data.values(): 

538 for key in ["is_active", "time", "wday", "solar_rad", "lux", "altitude"]: 

539 if key not in entry: 

540 logging.warning("Does not contain %s", key) 

541 return False 

542 

543 # 辞書ベースのループで型チェック 

544 for field, expected_type in SCHEDULE_FIELD_TYPES.items(): 

545 if not isinstance(entry.get(field), expected_type): 

546 logging.warning("Type of %s is invalid: %s", field, type(entry.get(field))) 

547 return False 

548 

549 if not SCHEDULE_TIME_PATTERN.search(entry["time"]): 

550 logging.warning("Format of time is invalid: %s", entry["time"]) 

551 return False 

552 if len(entry["wday"]) != 7: 

553 logging.warning("Count of wday is Invalid: %d", len(entry["wday"])) 

554 return False 

555 for i, wday_flag in enumerate(entry["wday"]): 

556 if not isinstance(wday_flag, bool): 

557 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i])) 

558 return False 

559 return True 

560 

561 

562def schedule_store(schedule_data: dict) -> None: 

563 schedule_path = my_lib.webapp.config.SCHEDULE_FILE_PATH 

564 assert schedule_path is not None, "SCHEDULE_FILE_PATH not configured" # noqa: S101 

565 

566 try: 

567 with get_schedule_lock(): 

568 my_lib.serializer.store(schedule_path, schedule_data) 

569 except Exception: 

570 logging.exception("Failed to save schedule settings.") 

571 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。") 

572 

573 

574def gen_schedule_default(): 

575 _cfg = rasp_shutter.control.config 

576 schedule_data = { 

577 "is_active": False, 

578 "time": "00:00", 

579 "solar_rad": 0, 

580 "lux": 0, 

581 "altitude": 0, 

582 "wday": [True] * 7, 

583 } 

584 

585 return { 

586 "open": schedule_data 

587 | { 

588 "time": _cfg.DEFAULT_OPEN_TIME, 

589 "solar_rad": _cfg.DEFAULT_OPEN_SOLAR_RAD, 

590 "lux": _cfg.DEFAULT_OPEN_LUX, 

591 }, 

592 "close": schedule_data 

593 | { 

594 "time": _cfg.DEFAULT_CLOSE_TIME, 

595 "solar_rad": _cfg.DEFAULT_CLOSE_SOLAR_RAD, 

596 "lux": _cfg.DEFAULT_CLOSE_LUX, 

597 }, 

598 } 

599 

600 

601def schedule_load() -> dict: 

602 schedule_default = gen_schedule_default() 

603 schedule_path = my_lib.webapp.config.SCHEDULE_FILE_PATH 

604 assert schedule_path is not None, "SCHEDULE_FILE_PATH not configured" # noqa: S101 

605 

606 try: 

607 with get_schedule_lock(): 

608 schedule_data = my_lib.serializer.load(schedule_path, schedule_default) 

609 if schedule_validate(schedule_data): 

610 return schedule_data 

611 except Exception: 

612 logging.exception("Failed to load schedule settings.") 

613 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。") 

614 

615 return schedule_default 

616 

617 

618def set_schedule(config: rasp_shutter.config.AppConfig, schedule_data: dict) -> None: 

619 scheduler = get_scheduler() 

620 scheduler.clear() 

621 

622 for state, entry in schedule_data.items(): 

623 if not entry["is_active"]: 

624 continue 

625 

626 for i, wday_method_name in enumerate(WEEKDAY_METHODS): 

627 if entry["wday"][i]: 

628 wday_method = getattr(scheduler.every(), wday_method_name) 

629 wday_method.at(entry["time"], my_lib.time.get_pytz()).do( 

630 shutter_schedule_control, config, state 

631 ) 

632 

633 for job in scheduler.get_jobs(): 

634 logging.info("Next run: %s", job.next_run) 

635 

636 idle_sec = scheduler.idle_seconds 

637 if idle_sec is not None: 

638 hours, remainder = divmod(idle_sec, 3600) 

639 minutes, seconds = divmod(remainder, 60) 

640 

641 logging.info( 

642 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)", 

643 my_lib.time.now().strftime("%Y-%m-%d %H:%M"), 

644 hours, 

645 minutes, 

646 seconds, 

647 ) 

648 

649 scheduler.every(1).seconds.do(shutter_auto_control, config) 

650 

651 

652def schedule_worker(config: rasp_shutter.config.AppConfig, queue) -> None: 

653 global should_terminate 

654 

655 # DUMMY_MODEではより短い間隔でループして、テストの応答性を向上 

656 # 本番環境では0.5秒、テスト環境では0.1秒 

657 sleep_sec = 0.1 if rasp_shutter.util.is_dummy_mode() else 0.5 

658 scheduler = get_scheduler() 

659 

660 liveness_file = config.liveness.file.scheduler 

661 

662 logging.info("Load schedule") 

663 schedule_data = schedule_load() 

664 set_schedule_data(schedule_data) 

665 

666 set_schedule(config, schedule_data) 

667 

668 logging.info("Start schedule worker") 

669 

670 i = 0 

671 while True: 

672 if should_terminate.is_set(): 

673 scheduler.clear() 

674 break 

675 

676 run_pending_elapsed = 0.0 

677 try: 

678 loop_start = time.perf_counter() 

679 

680 if not queue.empty(): 

681 schedule_data = queue.get() 

682 set_schedule_data(schedule_data) 

683 set_schedule(config, schedule_data) 

684 schedule_store(schedule_data) 

685 

686 idle_sec = scheduler.idle_seconds # noqa: F841 

687 

688 run_pending_start = time.perf_counter() 

689 scheduler.run_pending() 

690 run_pending_elapsed = time.perf_counter() - run_pending_start 

691 

692 time.sleep(sleep_sec) 

693 

694 loop_elapsed = time.perf_counter() - loop_start 

695 

696 # 1秒以上かかった場合は警告ログを出力 

697 if loop_elapsed > 1.0: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 logging.warning( 

699 "Scheduler loop took %.2fs (run_pending: %.2fs, sequence: %d)", 

700 loop_elapsed, 

701 run_pending_elapsed, 

702 get_loop_sequence(), 

703 ) 

704 except OverflowError: # pragma: no cover 

705 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する 

706 logging.debug(traceback.format_exc()) 

707 except Exception: # pragma: no cover 

708 # NOTE: その他の例外(ログシステムのBrokenPipeError、IOErrorなど)が発生しても 

709 # スケジューラループを継続する。例外でループが停止すると、テストの同期が 

710 # 取れなくなり、タイムアウトエラーが発生する。 

711 logging.warning("Exception in scheduler loop, continuing: %s", traceback.format_exc()) 

712 finally: 

713 # NOTE: 例外が発生してもシーケンス番号を更新する。 

714 # テスト同期で使用されるため、ループが動いていることを常に示す必要がある。 

715 _increment_loop_sequence() 

716 

717 if i % (10 / sleep_sec) == 0: 

718 my_lib.footprint.update(liveness_file) 

719 

720 i += 1 

721 

722 logging.info("Terminate schedule worker") 

723 

724 

725if __name__ == "__main__": 

726 import multiprocessing 

727 import multiprocessing.pool 

728 

729 import my_lib.config 

730 import my_lib.logger 

731 

732 my_lib.logger.init("test", level=logging.DEBUG) 

733 

734 def test_func(): 

735 logging.info("TEST") 

736 

737 should_terminate.set() 

738 

739 config = my_lib.config.load() 

740 queue: multiprocessing.Queue[dict] = multiprocessing.Queue() 

741 

742 init() 

743 

744 pool = multiprocessing.pool.ThreadPool(processes=1) 

745 result = pool.apply_async(schedule_worker, (config, queue)) 

746 

747 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5) 

748 queue.put( 

749 { 

750 "open": { 

751 "time": exec_time.strftime("%H:%M"), 

752 "is_active": True, 

753 "wday": [True] * 7, 

754 "solar_rad": 0, 

755 "lux": 0, 

756 "altitude": 0, 

757 "func": test_func, 

758 } 

759 } 

760 ) 

761 

762 # NOTE: 終了するのを待つ 

763 result.get()