Coverage for flask/src/rasp_water/scheduler.py: 100%

128 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-24 13:56 +0900

1#!/usr/bin/env python3 

2import datetime 

3import logging 

4import pathlib 

5import re 

6import threading 

7import time 

8import traceback 

9 

10import my_lib.footprint 

11import my_lib.serializer 

12import my_lib.webapp.config 

13import my_lib.webapp.log 

14import rasp_water.webapp_valve 

15import schedule 

16 

17RETRY_COUNT = 3 

18 

19schedule_lock = None 

20should_terminate = threading.Event() 

21 

22 

23def init(): 

24 global schedule_lock # noqa: PLW0603 

25 schedule_lock = threading.Lock() 

26 

27 

28def valve_auto_control_impl(config, period): 

29 try: 

30 # NOTE: Web 経由だと認証つけた場合に困るので,直接関数を呼ぶ 

31 rasp_water.webapp_valve.set_valve_state(config, 1, period * 60, True, "scheduler") 

32 return True 

33 

34 # logging.debug("Request scheduled execution to {url}".format(url=url)) 

35 # res = requests.post( 

36 # url, params={"cmd": 1, "state": 1, "period": period * 60, "auto": True} 

37 # ) 

38 # logging.debug(res.text) 

39 # return res.status_code == 200 

40 except Exception: 

41 logging.exception("Failed to control valve automatically") 

42 

43 return False 

44 

45 

46def valve_auto_control(config, period): 

47 logging.info("Starts automatic control of the valve") 

48 

49 for _ in range(RETRY_COUNT): 

50 if valve_auto_control_impl(config, period): 

51 return True 

52 

53 my_lib.webapp.log.info("😵 水やりの自動実行に失敗しました。") 

54 return False 

55 

56 

57def schedule_validate(schedule_data): # noqa: C901, PLR0911 

58 if len(schedule_data) != 2: 

59 logging.warning("Count of entry is Invalid: %d", len(schedule_data)) 

60 return False 

61 

62 for entry in schedule_data: 

63 for key in ["is_active", "time", "period", "wday"]: 

64 if key not in entry: 

65 logging.warning("Does not contain %s", key) 

66 return False 

67 if type(entry["is_active"]) is not bool: 

68 logging.warning("Type of is_active is invalid: %s", type(entry["is_active"])) 

69 return False 

70 if not re.compile(r"\d{2}:\d{2}").search(entry["time"]): 

71 logging.warning("Format of time is invalid: %s", entry["time"]) 

72 return False 

73 if type(entry["period"]) is not int: 

74 logging.warning("Type of period is invalid: %s", type(entry["period"])) 

75 return False 

76 if len(entry["wday"]) != 7: 

77 logging.warning("Count of wday is Invalid: %d", len(entry["wday"])) 

78 return False 

79 for i, wday_flag in enumerate(entry["wday"]): 

80 if type(wday_flag) is not bool: 

81 logging.warning("Type of wday[%d] is Invalid: %s", i, type(entry["wday"][i])) 

82 return False 

83 return True 

84 

85 

86def schedule_store(schedule_data): 

87 global schedule_lock 

88 try: 

89 with schedule_lock: 

90 my_lib.serializer.store(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_data) 

91 except Exception: 

92 logging.exception("Failed to save schedule settings.") 

93 my_lib.webapp.log.error("😵 スケジュール設定の保存に失敗しました。") 

94 

95 

96def gen_schedule_default(): 

97 return [ 

98 { 

99 "is_active": False, 

100 "time": "00:00", 

101 "period": 1, 

102 "wday": [True] * 7, 

103 } 

104 ] * 2 

105 

106 

107def schedule_load(): 

108 global schedule_lock 

109 

110 schedule_default = gen_schedule_default() 

111 

112 try: 

113 with schedule_lock: 

114 schedule_data = my_lib.serializer.load(my_lib.webapp.config.SCHEDULE_FILE_PATH, schedule_default) 

115 if schedule_validate(schedule_data): 

116 return schedule_data 

117 except Exception: 

118 logging.exception("Failed to load schedule settings.") 

119 my_lib.webapp.log.error("😵 スケジュール設定の読み出しに失敗しました。") 

120 

121 return schedule_default 

122 

123 

124def set_schedule(config, schedule_data): # noqa: C901 

125 schedule.clear() 

126 

127 for entry in schedule_data: 

128 if not entry["is_active"]: 

129 continue 

130 

131 if entry["wday"][0]: 

132 schedule.every().sunday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

133 valve_auto_control, config, entry["period"] 

134 ) 

135 if entry["wday"][1]: 

136 schedule.every().monday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

137 valve_auto_control, config, entry["period"] 

138 ) 

139 if entry["wday"][2]: 

140 schedule.every().tuesday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

141 valve_auto_control, config, entry["period"] 

142 ) 

143 if entry["wday"][3]: 

144 schedule.every().wednesday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

145 valve_auto_control, config, entry["period"] 

146 ) 

147 if entry["wday"][4]: 

148 schedule.every().thursday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

149 valve_auto_control, config, entry["period"] 

150 ) 

151 if entry["wday"][5]: 

152 schedule.every().friday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

153 valve_auto_control, config, entry["period"] 

154 ) 

155 if entry["wday"][6]: 

156 schedule.every().saturday.at(entry["time"], my_lib.webapp.config.TIMEZONE_PYTZ).do( 

157 valve_auto_control, config, entry["period"] 

158 ) 

159 

160 for job in schedule.get_jobs(): 

161 logging.info("Next run: %s", job.next_run) 

162 

163 idle_sec = schedule.idle_seconds() 

164 if idle_sec is not None: 

165 hours, remainder = divmod(idle_sec, 3600) 

166 minutes, seconds = divmod(remainder, 60) 

167 

168 logging.info( 

169 "Now is %s, time to next jobs is %d hour(s) %d minute(s) %d second(s)", 

170 datetime.datetime.now( 

171 tz=datetime.timezone(datetime.timedelta(hours=my_lib.webapp.config.TIMEZONE_OFFSET)) 

172 ).strftime("%Y-%m-%d %H:%M"), 

173 hours, 

174 minutes, 

175 seconds, 

176 ) 

177 

178 return idle_sec 

179 

180 

181def schedule_worker(config, queue): 

182 global should_terminate 

183 

184 sleep_sec = 0.25 

185 

186 liveness_file = pathlib.Path(config["liveness"]["file"]["scheduler"]) 

187 

188 logging.info("Load schedule") 

189 set_schedule(config, schedule_load()) 

190 

191 logging.info("Start schedule worker") 

192 

193 i = 0 

194 while True: 

195 if should_terminate.is_set(): 

196 schedule.clear() 

197 break 

198 try: 

199 if not queue.empty(): 

200 schedule_data = queue.get() 

201 set_schedule(config, schedule_data) 

202 schedule_store(schedule_data) 

203 

204 schedule.run_pending() 

205 logging.debug("Sleep %.1f sec...", sleep_sec) 

206 time.sleep(sleep_sec) 

207 except OverflowError: # pragma: no cover 

208 # NOTE: テストする際,freezer 使って日付をいじるとこの例外が発生する 

209 logging.debug(traceback.format_exc()) 

210 

211 if i % (10 / sleep_sec) == 0: 

212 my_lib.footprint.update(liveness_file) 

213 

214 i += 1 

215 

216 logging.info("Terminate schedule worker") 

217 

218 

219if __name__ == "__main__": 

220 import datetime 

221 from multiprocessing import Queue 

222 from multiprocessing.pool import ThreadPool 

223 

224 import logger 

225 import my_lib.webapp.config 

226 

227 logger.init("test", level=logging.DEBUG) 

228 

229 def test_func(): 

230 logging.info("TEST") 

231 

232 should_terminate.set() 

233 

234 queue = Queue() 

235 

236 pool = ThreadPool(processes=1) 

237 result = pool.apply_async(schedule_worker, (queue,)) 

238 

239 exec_time = datetime.datetime.now(my_lib.webapp.config.TIMEZONE) + datetime.timedelta(seconds=5) 

240 queue.put([{"time": exec_time.strftime("%H:%M"), "func": test_func}]) 

241 

242 # NOTE: 終了するのを待つ 

243 result.get()