Coverage for flask/src/rasp_water/control/scheduler.py: 99%

149 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-04 12:06 +0900

1#!/usr/bin/env python3 

2import datetime 

3import logging 

4import os 

5import re 

6import threading 

7import time 

8import traceback 

9 

10import my_lib.footprint 

11import my_lib.serializer 

12import my_lib.webapp.config 

13import my_lib.webapp.log 

14import rasp_water.control.webapi.valve 

15import rasp_water.control.webapi.test.time 

16import schedule 

17 

18RETRY_COUNT = 3 

19 

20schedule_lock = None 

21should_terminate = threading.Event() 

22 

23# Worker-specific scheduler instance for pytest-xdist parallel execution 

24_scheduler_instances = {} 

25 

26 

27def get_scheduler(): 

28 """Get worker-specific scheduler instance for pytest-xdist parallel execution""" 

29 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main") 

30 

31 if worker_id not in _scheduler_instances: 

32 # Create a new scheduler instance for this worker 

33 _scheduler_instances[worker_id] = schedule.Scheduler() 

34 

35 return _scheduler_instances[worker_id] 

36 

37def init(): 

38 global schedule_lock # noqa: PLW0603 

39 global should_terminate 

40 

41 schedule_lock = threading.Lock() 

42 should_terminate.clear() 

43 

44 

45def term(): 

46 global should_terminate 

47 

48 should_terminate.set() 

49 

50 

51def valve_auto_control_impl(config, period): 

52 try: 

53 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ 

54 rasp_water.control.webapi.valve.set_valve_state(config, 1, period * 60, True, "scheduler") 

55 return True 

56 

57 # logging.debug("Request scheduled execution to {url}".format(url=url)) 

58 # res = requests.post( 

59 # url, params={"cmd": 1, "state": 1, "period": period * 60, "auto": True} 

60 # ) 

61 # logging.debug(res.text) 

62 # return res.status_code == 200 

63 except Exception as e: 

64 logging.exception("Failed to control valve automatically: %s", str(e)) 

65 import traceback 

66 logging.error("Full traceback: %s", traceback.format_exc()) 

67 

68 return False 

69 

70 

71def valve_auto_control(config, period): 

72 logging.info("Starts automatic control of the valve") 

73 

74 for _ in range(RETRY_COUNT): 

75 if valve_auto_control_impl(config, period): 

76 return True 

77 

78 my_lib.webapp.log.info("😵 水やりの自動実行に失敗しました。") 

79 return False 

80 

81 

82def schedule_validate(schedule_data): # noqa: C901, PLR0911 

83 if len(schedule_data) != 2: 

84 logging.warning("Count of entry is Invalid: %d", len(schedule_data)) 

85 return False 

86 

87 for entry in schedule_data: 

88 for key in ["is_active", "time", "period", "wday"]: 

89 if key not in entry: 

90 logging.warning("Does not contain %s", key) 

91 return False 

92 if type(entry["is_active"]) is not bool: 

93 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"])) 

94 return False 

95 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]): 

96 logging.warning("Format of time is invalid: %s", entry["time"]) 

97 return False 

98 if type(entry["period"]) is not int: 

99 logging.warning("Type of period is invalid: %s", type(entry["period"])) 

100 return False 

101 if len(entry["wday"]) != 7: 

102 logging.warning("Count of wday is Invalid: %d", len(entry["wday"])) 

103 return False 

104 for i, wday_flag in enumerate(entry["wday"]): 

105 if type(wday_flag) is not bool: 

106 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i])) 

107 return False 

108 return True 

109 

110 

111def schedule_store(schedule_data): 

112 global schedule_lock 

113 try: 

114 with schedule_lock: 

115 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data) 

116 except Exception: 

117 logging.exception("Failed to save schedule settings.") 

118 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。") 

119 

120 

121def gen_schedule_default(): 

122 return [ 

123 { 

124 "is_active": False, 

125 "time": "00:00", 

126 "period": 1, 

127 "wday": [True] * 7, 

128 } 

129 ] * 2 

130 

131 

132def schedule_load(): 

133 global schedule_lock 

134 

135 schedule_default = gen_schedule_default() 

136 

137 try: 

138 with schedule_lock: 

139 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default) 

140 if schedule_validate(schedule_data): 

141 return schedule_data 

142 except Exception: 

143 logging.exception("Failed to load schedule settings.") 

144 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。") 

145 

146 return schedule_default 

147 

148 

149def set_schedule(config, schedule_data): # noqa: C901 

150 scheduler = get_scheduler() 

151 scheduler.clear() 

152 

153 # DUMMY_MODEでは、scheduleライブラリの時刻関数をmy_lib.time.now()に設定 

154 if os.environ.get("DUMMY_MODE", "false") == "true": 154 ↛ 160line 154 didn't jump to line 160 because the condition on line 154 was always true

155 scheduler._time_func = my_lib.time.now 

156 

157 # DUMMY_MODEでは、スケジュールライブラリのモッキングはしない 

158 # 代わりに、テスト用APIでモックした時刻を利用 

159 

160 for entry in schedule_data: 

161 if not entry["is_active"]: 

162 continue 

163 

164 if entry["wday"][0]: 

165 scheduler.every().sunday.at(entry["time"], my_lib.time.get_pytz()).do( 

166 valve_auto_control, config, entry["period"] 

167 ) 

168 if entry["wday"][1]: 

169 scheduler.every().monday.at(entry["time"], my_lib.time.get_pytz()).do( 

170 valve_auto_control, config, entry["period"] 

171 ) 

172 if entry["wday"][2]: 

173 scheduler.every().tuesday.at(entry["time"], my_lib.time.get_pytz()).do( 

174 valve_auto_control, config, entry["period"] 

175 ) 

176 if entry["wday"][3]: 

177 scheduler.every().wednesday.at(entry["time"], my_lib.time.get_pytz()).do( 

178 valve_auto_control, config, entry["period"] 

179 ) 

180 if entry["wday"][4]: 

181 scheduler.every().thursday.at(entry["time"], my_lib.time.get_pytz()).do( 

182 valve_auto_control, config, entry["period"] 

183 ) 

184 if entry["wday"][5]: 

185 scheduler.every().friday.at(entry["time"], my_lib.time.get_pytz()).do( 

186 valve_auto_control, config, entry["period"] 

187 ) 

188 if entry["wday"][6]: 

189 scheduler.every().saturday.at(entry["time"], my_lib.time.get_pytz()).do( 

190 valve_auto_control, config, entry["period"] 

191 ) 

192 

193 for job in scheduler.get_jobs(): 

194 logging.info("Next run: %s", job.next_run) 

195 

196 idle_sec = scheduler.idle_seconds 

197 if idle_sec is not None: 

198 hours, remainder = divmod(idle_sec, 3600) 

199 minutes, seconds = divmod(remainder, 60) 

200 

201 logging.info( 

202 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)", 

203 my_lib.time.now().strftime("%Y-%m-%d %H:%M"), 

204 hours, 

205 minutes, 

206 seconds, 

207 ) 

208 

209 return idle_sec 

210 

211 

212def schedule_worker(config, queue): 

213 global should_terminate 

214 

215 sleep_sec = 0.25 

216 scheduler = get_scheduler() 

217 

218 logging.info("Load schedule") 

219 set_schedule(config, schedule_load()) 

220 

221 logging.info("Start schedule worker") 

222 

223 i = 0 

224 while True: 

225 if should_terminate.is_set(): 

226 scheduler.clear() 

227 break 

228 try: 

229 if not queue.empty(): 

230 schedule_data = queue.get() 

231 set_schedule(config, schedule_data) 

232 

233 # デバッグ: スケジューラーの実行前に時刻とジョブ状態をログ出力 

234 current_time = my_lib.time.now() 

235 pending_jobs = len(scheduler.jobs) 

236 if pending_jobs > 0: 

237 next_run = scheduler.next_run 

238 logging.debug("Scheduler check - Current time (my_lib): %s, System time: %s, Pending jobs: %d, Next run: %s", 

239 current_time, datetime.datetime.now(), pending_jobs, next_run) 

240 

241 # DUMMY_MODEでは、scheduleライブラリの内部時刻をmy_lib.time.now()に強制同期 

242 if os.environ.get("DUMMY_MODE", "false") == "true": 

243 scheduler._time_func = my_lib.time.now 

244 

245 scheduler.run_pending() 

246 logging.debug("Sleep %.1f sec...", sleep_sec) 

247 time.sleep(sleep_sec) 

248 except OverflowError: # pragma: no cover 

249 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する 

250 logging.debug(traceback.format_exc()) 

251 

252 if i % (10 / sleep_sec) == 0: 

253 my_lib.footprint.update(config["liveness"]["file"]["scheduler"]) 

254 

255 i += 1 

256 

257 logging.info("Terminate schedule worker") 

258 

259 

260if __name__ == "__main__": 

261 import datetime 

262 import multiprocessing 

263 import multiprocessing.pool 

264 

265 import logger 

266 import my_lib.webapp.config 

267 

268 logger.init("test", level=logging.DEBUG) 

269 

270 def test_func(): 

271 logging.info("TEST") 

272 

273 should_terminate.set() 

274 

275 queue = multiprocessing.Queue() 

276 

277 pool = multiprocessing.pool.ThreadPool(processes=1) 

278 result = pool.apply_async(schedule_worker, (queue,)) 

279 

280 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5) 

281 queue.put([{"time": exec_time.strftime("%H:%M"), "func": test_func}]) 

282 

283 # NOTE: 終了するのを待つ 

284 result.get()