Coverage for flask/src/rasp_water/scheduler.py: 99%

137 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 13:51 +0900

1#!/usr/bin/env python3 

2import datetime 

3import logging 

4import os 

5import re 

6import threading 

7import time 

8import traceback 

9 

10import my_lib.footprint 

11import my_lib.serializer 

12import my_lib.webapp.config 

13import my_lib.webapp.log 

14import rasp_water.webapp_valve 

15import schedule 

16 

17RETRY_COUNT = 3 

18 

19schedule_lock = None 

20should_terminate = threading.Event() 

21 

22# Worker-specific scheduler instance for pytest-xdist parallel execution 

23_scheduler_instances = {} 

24 

25 

26def get_scheduler(): 

27 """Get worker-specific scheduler instance for pytest-xdist parallel execution""" 

28 worker_id = os.environ.get("PYTEST_XDIST_WORKER", "main") 

29 

30 if worker_id not in _scheduler_instances: 

31 # Create a new scheduler instance for this worker 

32 _scheduler_instances[worker_id] = schedule.Scheduler() 

33 

34 return _scheduler_instances[worker_id] 

35 

36 

37def init(): 

38 global schedule_lock # noqa: PLW0603 

39 global should_terminate 

40 

41 schedule_lock = threading.Lock() 

42 should_terminate.clear() 

43 

44 

45def term(): 

46 global should_terminate 

47 

48 should_terminate.set() 

49 

50 

51def valve_auto_control_impl(config, period): 

52 try: 

53 # NOTE: Web 経由だと認証つけた場合に困るので、直接関数を呼ぶ 

54 rasp_water.webapp_valve.set_valve_state(config, 1, period * 60, True, "scheduler") 

55 return True 

56 

57 # logging.debug("Request scheduled execution to {url}".format(url=url)) 

58 # res = requests.post( 

59 # url, params={"cmd": 1, "state": 1, "period": period * 60, "auto": True} 

60 # ) 

61 # logging.debug(res.text) 

62 # return res.status_code == 200 

63 except Exception: 

64 logging.exception("Failed to control valve automatically") 

65 

66 return False 

67 

68 

69def valve_auto_control(config, period): 

70 logging.info("Starts automatic control of the valve") 

71 

72 for _ in range(RETRY_COUNT): 

73 if valve_auto_control_impl(config, period): 

74 return True 

75 

76 my_lib.webapp.log.info("😵 水やりの自動実行に失敗しました。") 

77 return False 

78 

79 

80def schedule_validate(schedule_data): # noqa: C901, PLR0911 

81 if len(schedule_data) != 2: 

82 logging.warning("Count of entry is Invalid: %d", len(schedule_data)) 

83 return False 

84 

85 for entry in schedule_data: 

86 for key in ["is_active", "time", "period", "wday"]: 

87 if key not in entry: 

88 logging.warning("Does not contain %s", key) 

89 return False 

90 if type(entry["is_active"]) is not bool: 

91 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"])) 

92 return False 

93 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]): 

94 logging.warning("Format of time is invalid: %s", entry["time"]) 

95 return False 

96 if type(entry["period"]) is not int: 

97 logging.warning("Type of period is invalid: %s", type(entry["period"])) 

98 return False 

99 if len(entry["wday"]) != 7: 

100 logging.warning("Count of wday is Invalid: %d", len(entry["wday"])) 

101 return False 

102 for i, wday_flag in enumerate(entry["wday"]): 

103 if type(wday_flag) is not bool: 

104 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i])) 

105 return False 

106 return True 

107 

108 

109def schedule_store(schedule_data): 

110 global schedule_lock 

111 try: 

112 with schedule_lock: 

113 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data) 

114 except Exception: 

115 logging.exception("Failed to save schedule settings.") 

116 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。") 

117 

118 

119def gen_schedule_default(): 

120 return [ 

121 { 

122 "is_active": False, 

123 "time": "00:00", 

124 "period": 1, 

125 "wday": [True] * 7, 

126 } 

127 ] * 2 

128 

129 

130def schedule_load(): 

131 global schedule_lock 

132 

133 schedule_default = gen_schedule_default() 

134 

135 try: 

136 with schedule_lock: 

137 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default) 

138 if schedule_validate(schedule_data): 

139 return schedule_data 

140 except Exception: 

141 logging.exception("Failed to load schedule settings.") 

142 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。") 

143 

144 return schedule_default 

145 

146 

147def set_schedule(config, schedule_data): # noqa: C901 

148 scheduler = get_scheduler() 

149 scheduler.clear() 

150 

151 for entry in schedule_data: 

152 if not entry["is_active"]: 

153 continue 

154 

155 if entry["wday"][0]: 

156 scheduler.every().sunday.at(entry["time"], my_lib.time.get_pytz()).do( 

157 valve_auto_control, config, entry["period"] 

158 ) 

159 if entry["wday"][1]: 

160 scheduler.every().monday.at(entry["time"], my_lib.time.get_pytz()).do( 

161 valve_auto_control, config, entry["period"] 

162 ) 

163 if entry["wday"][2]: 

164 scheduler.every().tuesday.at(entry["time"], my_lib.time.get_pytz()).do( 

165 valve_auto_control, config, entry["period"] 

166 ) 

167 if entry["wday"][3]: 

168 scheduler.every().wednesday.at(entry["time"], my_lib.time.get_pytz()).do( 

169 valve_auto_control, config, entry["period"] 

170 ) 

171 if entry["wday"][4]: 

172 scheduler.every().thursday.at(entry["time"], my_lib.time.get_pytz()).do( 

173 valve_auto_control, config, entry["period"] 

174 ) 

175 if entry["wday"][5]: 

176 scheduler.every().friday.at(entry["time"], my_lib.time.get_pytz()).do( 

177 valve_auto_control, config, entry["period"] 

178 ) 

179 if entry["wday"][6]: 

180 scheduler.every().saturday.at(entry["time"], my_lib.time.get_pytz()).do( 

181 valve_auto_control, config, entry["period"] 

182 ) 

183 

184 for job in scheduler.get_jobs(): 

185 logging.info("Next run: %s", job.next_run) 

186 

187 idle_sec = scheduler.idle_seconds 

188 if idle_sec is not None: 

189 hours, remainder = divmod(idle_sec, 3600) 

190 minutes, seconds = divmod(remainder, 60) 

191 

192 logging.info( 

193 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)", 

194 my_lib.time.now().strftime("%Y-%m-%d %H:%M"), 

195 hours, 

196 minutes, 

197 seconds, 

198 ) 

199 

200 return idle_sec 

201 

202 

203def schedule_worker(config, queue): 

204 global should_terminate 

205 

206 sleep_sec = 0.25 

207 scheduler = get_scheduler() 

208 

209 logging.info("Load schedule") 

210 set_schedule(config, schedule_load()) 

211 

212 logging.info("Start schedule worker") 

213 

214 i = 0 

215 while True: 

216 if should_terminate.is_set(): 

217 scheduler.clear() 

218 break 

219 try: 

220 if not queue.empty(): 

221 schedule_data = queue.get() 

222 set_schedule(config, schedule_data) 

223 

224 scheduler.run_pending() 

225 logging.debug("Sleep %.1f sec...", sleep_sec) 

226 time.sleep(sleep_sec) 

227 except OverflowError: # pragma: no cover 

228 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する 

229 logging.debug(traceback.format_exc()) 

230 

231 if i % (10 / sleep_sec) == 0: 

232 my_lib.footprint.update(config["liveness"]["file"]["scheduler"]) 

233 

234 i += 1 

235 

236 logging.info("Terminate schedule worker") 

237 

238 

239if __name__ == "__main__": 

240 import datetime 

241 import multiprocessing 

242 import multiprocessing.pool 

243 

244 import logger 

245 import my_lib.webapp.config 

246 

247 logger.init("test", level=logging.DEBUG) 

248 

249 def test_func(): 

250 logging.info("TEST") 

251 

252 should_terminate.set() 

253 

254 queue = multiprocessing.Queue() 

255 

256 pool = multiprocessing.pool.ThreadPool(processes=1) 

257 result = pool.apply_async(schedule_worker, (queue,)) 

258 

259 exec_time = my_lib.time.now() + datetime.timedelta(seconds=5) 

260 queue.put([{"time": exec_time.strftime("%H:%M"), "func": test_func}]) 

261 

262 # NOTE: 終了するのを待つ 

263 result.get()