Coverage for flask/src/rasp_water/control/webapi/valve.py: 93%

144 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-04 12:06 +0900

1#!/usr/bin/env python3 

2import logging 

3import multiprocessing 

4import os 

5import pathlib 

6import threading 

7import time 

8import traceback 

9 

10import flask_cors 

11import fluent.sender 

12import my_lib.flask_util 

13import my_lib.footprint 

14import my_lib.webapp.config 

15import my_lib.webapp.event 

16import my_lib.webapp.log 

17import rasp_water.control.valve 

18import rasp_water.control.weather_forecast 

19import rasp_water.control.weather_sensor 

20 

21import flask 

22 

23blueprint = flask.Blueprint("rasp-water-valve", __name__, url_prefix=my_lib.webapp.config.URL_PREFIX) 

24 

25worker = None 

26flow_stat_manager = None 

27should_terminate = threading.Event() 

28 

29 

30def init(config): 

31 global worker # noqa: PLW0603 

32 global flow_stat_manager # noqa: PLW0603 

33 

34 if worker is not None: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 raise ValueError("worker should be None") # noqa: TRY003, EM101 

36 

37 if flow_stat_manager is not None: 

38 flow_stat_manager.shutdown() 

39 

40 flow_stat_manager = multiprocessing.Manager() 

41 flow_stat_queue = flow_stat_manager.Queue() 

42 rasp_water.control.valve.init(config, flow_stat_queue) 

43 worker = threading.Thread(target=flow_notify_worker, args=(config, flow_stat_queue)) 

44 worker.start() 

45 

46 

47def term(): 

48 global worker # noqa: PLW0603 

49 

50 if worker is None: 

51 return 

52 

53 should_terminate.set() 

54 worker.join() 

55 

56 worker = None 

57 should_terminate.clear() 

58 

59 rasp_water.control.valve.term() 

60 

61 

62def send_data(config, flow): 

63 logging.info("Send fluentd: flow = %.2f", flow) 

64 sender = fluent.sender.FluentSender(config["fluent"]["data"]["tag"], host=config["fluent"]["host"]) 

65 sender.emit("rasp", {"hostname": config["fluent"]["data"]["hostname"], "flow": flow}) 

66 sender.close() 

67 

68 

69def second_str(sec): 

70 minute = 0 

71 if sec >= 60: 

72 minute = int(sec / 60) 

73 sec -= minute * 60 

74 sec = int(sec) 

75 

76 if minute != 0: 

77 if sec == 0: 

78 return f"{minute}" 

79 else: 

80 return f"{minute}{sec}" 

81 else: 

82 return f"{sec}" 

83 

84 

85def flow_notify_worker(config, queue): 

86 global should_terminate 

87 

88 sleep_sec = 0.1 

89 

90 liveness_file = pathlib.Path(config["liveness"]["file"]["flow_notify"]) 

91 

92 logging.info("Start flow notify worker") 

93 i = 0 

94 while True: 

95 if should_terminate.is_set(): 

96 break 

97 

98 try: 

99 if not queue.empty(): 

100 stat = queue.get() 

101 

102 logging.debug("flow notify = %s", str(stat)) 

103 

104 if stat["type"] == "total": 

105 my_lib.webapp.log.info( 

106 "🚿 {time_str}間、約 {water:.2f}L の水やりを行いました。".format( 

107 time_str=second_str(stat["period"]), water=stat["total"] 

108 ) 

109 ) 

110 

111 # メトリクス記録 

112 try: 

113 import rasp_water.metrics.collector 

114 operation_type = "auto" if stat.get("auto", False) else "manual" 

115 rasp_water.metrics.collector.record_watering( 

116 operation_type=operation_type, 

117 duration_seconds=stat["period"], 

118 volume_liters=stat["total"], 

119 metrics_data_path=config["metrics"]["data"] 

120 ) 

121 except Exception as e: 

122 logging.warning("Failed to record watering metrics: %s", e) 

123 elif stat["type"] == "instantaneous": 

124 send_data(config, stat["flow"]) 

125 elif stat["type"] == "error": 

126 my_lib.webapp.log.error(stat["message"]) 

127 

128 # エラーメトリクス記録 

129 try: 

130 import rasp_water.metrics.collector 

131 rasp_water.metrics.collector.record_error( 

132 error_type="valve_control", 

133 error_message=stat["message"], 

134 metrics_data_path=config["metrics"]["data"] 

135 ) 

136 except Exception as e: 

137 logging.warning("Failed to record error metrics: %s", e) 

138 else: # pragma: no cover 

139 pass 

140 time.sleep(sleep_sec) 

141 except OverflowError: # pragma: no cover 

142 # NOTE: テストする際、freezer 使って日付をいじるとこの例外が発生する 

143 logging.debug(traceback.format_exc()) 

144 

145 if i % (10 / sleep_sec) == 0: 

146 my_lib.footprint.update(liveness_file) 

147 

148 i += 1 

149 

150 logging.info("Terminate flow notify worker") 

151 

152 

153def get_valve_state(): 

154 try: 

155 state = rasp_water.control.valve.get_control_mode() 

156 

157 return { 

158 "state": state["mode"].value, 

159 "remain": state["remain"], 

160 "result": "success", 

161 } 

162 except Exception: 

163 logging.warning("Failed to get valve control mode") 

164 

165 return {"state": 0, "remain": 0, "result": "fail"} 

166 

167 

168def judge_execute(config, state, auto): 

169 if (state != 1) or (not auto): 

170 return True 

171 

172 rainfall_judge, rain_fall_sum = rasp_water.control.weather_sensor.get_rain_fall(config) 

173 if rainfall_judge: 173 ↛ 175line 173 didn't jump to line 175 because the condition on line 173 was never true

174 # NOTE: ダミーモードの場合、とにかく水やりする (CI テストの為) 

175 if os.environ.get("DUMMY_MODE", "false") == "true": 

176 return True 

177 

178 my_lib.webapp.log.info( 

179 f"☂ 前回の水やりから {rain_fall_sum:.0f}mm の雨が降ったため、自動での水やりを見合わせます。" 

180 ) 

181 return False 

182 

183 rainfall_judge, rain_fall_sum = rasp_water.control.weather_forecast.get_rain_fall(config) 

184 

185 if rainfall_judge: 

186 # NOTE: ダミーモードの場合、とにかく水やりする (CI テストの為) 

187 if os.environ.get("DUMMY_MODE", "false") == "true": 

188 return True 

189 

190 my_lib.webapp.log.info( 

191 f"☂ 前後で {rain_fall_sum:.0f}mm の雨が降る予報があるため、自動での水やりを見合わせます。" 

192 ) 

193 return False 

194 

195 return True 

196 

197 

198def set_valve_state(config, state, period, auto, host=""): 

199 is_execute = judge_execute(config, state, auto) 

200 

201 if not is_execute: 

202 my_lib.webapp.event.notify_event(my_lib.webapp.event.EVENT_TYPE.CONTROL) 

203 return get_valve_state() 

204 

205 if state == 1: 

206 my_lib.webapp.log.info( 

207 "{auto}で{period_str}間の水やりを開始します。{by}".format( 

208 auto="🕑 自動" if auto else "🔧 手動", 

209 period_str=second_str(period), 

210 by=f"(by {host})" if host != "" else "", 

211 ) 

212 ) 

213 rasp_water.control.valve.set_control_mode(period, auto) 

214 else: 

215 my_lib.webapp.log.info( 

216 "{auto}で水やりを終了します。{by}".format( 

217 auto="🕑 自動" if auto else "🔧 手動", 

218 by=f"(by {host})" if host != "" else "", 

219 ) 

220 ) 

221 rasp_water.control.valve.set_state(rasp_water.control.valve.VALVE_STATE.CLOSE) 

222 

223 my_lib.webapp.event.notify_event(my_lib.webapp.event.EVENT_TYPE.CONTROL) 

224 return get_valve_state() 

225 

226 

227@blueprint.route("/api/valve_ctrl", methods=["GET", "POST"]) 

228@my_lib.flask_util.support_jsonp 

229@flask_cors.cross_origin() 

230def api_valve_ctrl(): 

231 cmd = flask.request.args.get("cmd", 0, type=int) 

232 state = flask.request.args.get("state", 0, type=int) 

233 period = flask.request.args.get("period", 0, type=int) 

234 auto = flask.request.args.get("auto", False, type=bool) 

235 

236 config = flask.current_app.config["CONFIG"] 

237 

238 if cmd == 1: 

239 user = my_lib.flask_util.auth_user(flask.request) 

240 return flask.jsonify(dict({"cmd": "set"}, **set_valve_state(config, state, period, auto, user))) 

241 else: 

242 return flask.jsonify(dict({"cmd": "get"}, **get_valve_state())) 

243 

244 

245@blueprint.route("/api/valve_flow", methods=["GET"]) 

246@my_lib.flask_util.support_jsonp 

247@flask_cors.cross_origin() 

248def api_valve_flow(): 

249 config = flask.current_app.config["CONFIG"] 

250 

251 return flask.jsonify({"cmd": "get", "flow": rasp_water.control.valve.get_flow(config["flow"]["offset"])["flow"]})