Coverage for src/unit_cooler/metrics/collector.py: 56%

176 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 14:35 +0000

1""" 

2New metrics collection system for outdoor unit cooler. 

3 

4Collects: 

5- 1分毎の cooling_mode の値 

6- 1分毎の Duty 比 (ON と ON+OFF の比率) 

7- 1分毎の 気温、照度、日射量、降水量、湿度 

8- 1時間あたりのバルブ操作回数 

9- ON している際の流量 

10- エラー発生 

11""" 

12 

13import datetime 

14import logging 

15import pathlib 

16import sqlite3 

17import threading 

18import zoneinfo 

19from contextlib import contextmanager 

20 

21TIMEZONE = zoneinfo.ZoneInfo("Asia/Tokyo") 

22DEFAULT_DB_PATH = pathlib.Path("data/metrics.db") 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class MetricsCollector: 

28 """Metrics collection system focused on cooling mode analysis.""" # noqa: D203 

29 

30 def __init__(self, db_path: str | pathlib.Path = DEFAULT_DB_PATH): 

31 """Initialize MetricsCollector with database path.""" 

32 self.db_path = pathlib.Path(db_path) 

33 self.db_path.parent.mkdir(parents=True, exist_ok=True) 

34 self._init_database() 

35 self._lock = threading.Lock() 

36 

37 # Current state tracking 

38 self._current_minute_data = {} 

39 self._current_hour_data = {"valve_operations": 0} 

40 self._last_minute = None 

41 self._last_hour = None 

42 

43 def _init_database(self): 

44 """Initialize database tables for new metrics schema.""" 

45 with self._get_db_connection() as conn: 

46 # 1分毎のメトリクス 

47 conn.execute(""" 

48 CREATE TABLE IF NOT EXISTS minute_metrics ( 

49 id INTEGER PRIMARY KEY AUTOINCREMENT, 

50 timestamp DATETIME NOT NULL, 

51 cooling_mode INTEGER, 

52 duty_ratio REAL, 

53 temperature REAL, 

54 humidity REAL, 

55 lux REAL, 

56 solar_radiation REAL, 

57 rain_amount REAL, 

58 flow_value REAL, 

59 created_at DATETIME DEFAULT CURRENT_TIMESTAMP, 

60 UNIQUE(timestamp) 

61 ) 

62 """) 

63 

64 # 1時間毎のメトリクス 

65 conn.execute(""" 

66 CREATE TABLE IF NOT EXISTS hourly_metrics ( 

67 id INTEGER PRIMARY KEY AUTOINCREMENT, 

68 timestamp DATETIME NOT NULL, 

69 valve_operations INTEGER DEFAULT 0, 

70 created_at DATETIME DEFAULT CURRENT_TIMESTAMP, 

71 UNIQUE(timestamp) 

72 ) 

73 """) 

74 

75 # エラー記録 

76 conn.execute(""" 

77 CREATE TABLE IF NOT EXISTS error_events ( 

78 id INTEGER PRIMARY KEY AUTOINCREMENT, 

79 timestamp DATETIME NOT NULL, 

80 error_type TEXT NOT NULL, 

81 error_message TEXT, 

82 created_at DATETIME DEFAULT CURRENT_TIMESTAMP 

83 ) 

84 """) 

85 

86 # インデックス作成 

87 conn.execute("CREATE INDEX IF NOT EXISTS idx_minute_timestamp ON minute_metrics(timestamp)") 

88 conn.execute("CREATE INDEX IF NOT EXISTS idx_hourly_timestamp ON hourly_metrics(timestamp)") 

89 conn.execute("CREATE INDEX IF NOT EXISTS idx_error_timestamp ON error_events(timestamp)") 

90 

91 @contextmanager 

92 def _get_db_connection(self): 

93 """Get database connection with proper error handling.""" 

94 conn = None 

95 try: 

96 conn = sqlite3.connect(str(self.db_path), timeout=30.0) 

97 conn.row_factory = sqlite3.Row 

98 yield conn 

99 conn.commit() 

100 except Exception: 

101 if conn: 

102 conn.rollback() 

103 logger.exception("Database error") 

104 raise 

105 finally: 

106 if conn: 106 ↛ exitline 106 didn't return from function '_get_db_connection' because the condition on line 106 was always true

107 conn.close() 

108 

109 def update_cooling_mode(self, cooling_mode: int): 

110 """Update current cooling mode value.""" 

111 with self._lock: 

112 self._current_minute_data["cooling_mode"] = cooling_mode 

113 self._check_minute_boundary() 

114 

115 def update_duty_ratio(self, on_time: float, total_time: float): 

116 """Update duty ratio (ON time / total time).""" 

117 with self._lock: 

118 if total_time > 0: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true

119 self._current_minute_data["duty_ratio"] = on_time / total_time 

120 self._check_minute_boundary() 

121 

122 def update_environmental_data( 

123 self, 

124 temperature: float | None = None, 

125 humidity: float | None = None, 

126 lux: float | None = None, 

127 solar_radiation: float | None = None, 

128 rain_amount: float | None = None, 

129 ): 

130 """Update environmental sensor data.""" 

131 with self._lock: 

132 if temperature is not None: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true

133 self._current_minute_data["temperature"] = temperature 

134 if humidity is not None: 134 ↛ 136line 134 didn't jump to line 136 because the condition on line 134 was always true

135 self._current_minute_data["humidity"] = humidity 

136 if lux is not None: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true

137 self._current_minute_data["lux"] = lux 

138 if solar_radiation is not None: 138 ↛ 140line 138 didn't jump to line 140 because the condition on line 138 was always true

139 self._current_minute_data["solar_radiation"] = solar_radiation 

140 if rain_amount is not None: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was always true

141 self._current_minute_data["rain_amount"] = rain_amount 

142 self._check_minute_boundary() 

143 

144 def update_flow_value(self, flow_value: float): 

145 """Update flow value when valve is ON.""" 

146 with self._lock: 

147 self._current_minute_data["flow_value"] = flow_value 

148 self._check_minute_boundary() 

149 

150 def record_valve_operation(self): 

151 """Record a valve operation for hourly counting.""" 

152 with self._lock: 

153 self._current_hour_data["valve_operations"] += 1 

154 self._check_hour_boundary() 

155 

156 def record_error(self, error_type: str, error_message: str | None = None): 

157 """Record an error event.""" 

158 now = datetime.datetime.now(TIMEZONE) 

159 

160 try: 

161 with self._get_db_connection() as conn: 

162 conn.execute( 

163 """ 

164 INSERT INTO error_events (timestamp, error_type, error_message) 

165 VALUES (?, ?, ?) 

166 """, 

167 (now, error_type, error_message), 

168 ) 

169 logger.info("Recorded error: %s", error_type) 

170 except Exception: 

171 logger.exception("Failed to record error") 

172 

173 def _check_minute_boundary(self): 

174 """Check if we crossed a minute boundary and save data.""" 

175 now = datetime.datetime.now(TIMEZONE) 

176 current_minute = now.replace(second=0, microsecond=0) 

177 

178 if self._last_minute is None: 

179 self._last_minute = current_minute 

180 return 

181 

182 if current_minute > self._last_minute: 

183 self._save_minute_data(self._last_minute) 

184 self._current_minute_data = {} 

185 self._last_minute = current_minute 

186 

187 def _check_hour_boundary(self): 

188 """Check if we crossed an hour boundary and save data.""" 

189 now = datetime.datetime.now(TIMEZONE) 

190 current_hour = now.replace(minute=0, second=0, microsecond=0) 

191 

192 if self._last_hour is None: 

193 self._last_hour = current_hour 

194 return 

195 

196 if current_hour > self._last_hour: 

197 self._save_hour_data(self._last_hour) 

198 self._current_hour_data = {"valve_operations": 0} 

199 self._last_hour = current_hour 

200 

201 def _save_minute_data(self, timestamp: datetime.datetime): 

202 """Save accumulated minute data to database.""" 

203 if not self._current_minute_data: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 logger.debug("No current minute data to save for %s", timestamp) 

205 return 

206 

207 try: 

208 data = ( 

209 timestamp, 

210 self._current_minute_data.get("cooling_mode"), 

211 self._current_minute_data.get("duty_ratio"), 

212 self._current_minute_data.get("temperature"), 

213 self._current_minute_data.get("humidity"), 

214 self._current_minute_data.get("lux"), 

215 self._current_minute_data.get("solar_radiation"), 

216 self._current_minute_data.get("rain_amount"), 

217 self._current_minute_data.get("flow_value"), 

218 ) 

219 logger.info("Saving minute metrics for %s: %s", timestamp, self._current_minute_data) 

220 

221 with self._get_db_connection() as conn: 

222 conn.execute( 

223 """ 

224 INSERT OR REPLACE INTO minute_metrics 

225 (timestamp, cooling_mode, duty_ratio, temperature, humidity, 

226 lux, solar_radiation, rain_amount, flow_value) 

227 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 

228 """, 

229 data, 

230 ) 

231 logger.info("Successfully saved minute metrics for %s", timestamp) 

232 except Exception: 

233 logger.exception("Failed to save minute data") 

234 

235 def _save_hour_data(self, timestamp: datetime.datetime): 

236 """Save accumulated hour data to database.""" 

237 try: 

238 with self._get_db_connection() as conn: 

239 conn.execute( 

240 """ 

241 INSERT OR REPLACE INTO hourly_metrics 

242 (timestamp, valve_operations) 

243 VALUES (?, ?) 

244 """, 

245 (timestamp, self._current_hour_data["valve_operations"]), 

246 ) 

247 logger.debug("Saved hourly metrics for %s", timestamp) 

248 except Exception: 

249 logger.exception("Failed to save hour data") 

250 

251 def get_minute_data( 

252 self, 

253 start_time: datetime.datetime | None = None, 

254 end_time: datetime.datetime | None = None, 

255 limit: int = 1000, 

256 ) -> list: 

257 """Get minute-level metrics data.""" 

258 with self._get_db_connection() as conn: 

259 query = "SELECT * FROM minute_metrics" 

260 params = [] 

261 

262 if start_time or end_time: 

263 query += " WHERE" 

264 conditions = [] 

265 if start_time: 

266 conditions.append(" timestamp >= ?") 

267 params.append(start_time) 

268 if end_time: 

269 conditions.append(" timestamp <= ?") 

270 params.append(end_time) 

271 query += " AND".join(conditions) 

272 

273 query += " ORDER BY timestamp DESC LIMIT ?" 

274 params.append(limit) 

275 

276 return [dict(row) for row in conn.execute(query, params).fetchall()] 

277 

278 def get_hourly_data( 

279 self, 

280 start_time: datetime.datetime | None = None, 

281 end_time: datetime.datetime | None = None, 

282 limit: int = 168, 

283 ) -> list: # 1週間分 

284 """Get hourly-level metrics data.""" 

285 with self._get_db_connection() as conn: 

286 query = "SELECT * FROM hourly_metrics" 

287 params = [] 

288 

289 if start_time or end_time: 

290 query += " WHERE" 

291 conditions = [] 

292 if start_time: 

293 conditions.append(" timestamp >= ?") 

294 params.append(start_time) 

295 if end_time: 

296 conditions.append(" timestamp <= ?") 

297 params.append(end_time) 

298 query += " AND".join(conditions) 

299 

300 query += " ORDER BY timestamp DESC LIMIT ?" 

301 params.append(limit) 

302 

303 return [dict(row) for row in conn.execute(query, params).fetchall()] 

304 

305 def get_error_data( 

306 self, 

307 start_time: datetime.datetime | None = None, 

308 end_time: datetime.datetime | None = None, 

309 limit: int = 100, 

310 ) -> list: 

311 """Get error events data.""" 

312 with self._get_db_connection() as conn: 

313 query = "SELECT * FROM error_events" 

314 params = [] 

315 

316 if start_time or end_time: 

317 query += " WHERE" 

318 conditions = [] 

319 if start_time: 

320 conditions.append(" timestamp >= ?") 

321 params.append(start_time) 

322 if end_time: 

323 conditions.append(" timestamp <= ?") 

324 params.append(end_time) 

325 query += " AND".join(conditions) 

326 

327 query += " ORDER BY timestamp DESC LIMIT ?" 

328 params.append(limit) 

329 

330 return [dict(row) for row in conn.execute(query, params).fetchall()] 

331 

332 

333# Global instance 

334_metrics_collector = None 

335 

336 

337def get_metrics_collector(db_path: str | pathlib.Path = DEFAULT_DB_PATH) -> MetricsCollector: 

338 """Get global metrics collector instance.""" 

339 global _metrics_collector # noqa: PLW0603 

340 if _metrics_collector is None: 

341 _metrics_collector = MetricsCollector(db_path) 

342 return _metrics_collector