Coverage for src/unit_cooler/metrics/collector.py: 56%
176 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
1"""
2New metrics collection system for outdoor unit cooler.
4Collects:
5- 1分毎の cooling_mode の値
6- 1分毎の Duty 比 (ON と ON+OFF の比率)
7- 1分毎の 気温、照度、日射量、降水量、湿度
8- 1時間あたりのバルブ操作回数
9- ON している際の流量
10- エラー発生
11"""
13import datetime
14import logging
15import pathlib
16import sqlite3
17import threading
18import zoneinfo
19from contextlib import contextmanager
21TIMEZONE = zoneinfo.ZoneInfo("Asia/Tokyo")
22DEFAULT_DB_PATH = pathlib.Path("data/metrics.db")
24logger = logging.getLogger(__name__)
27class MetricsCollector:
28 """Metrics collection system focused on cooling mode analysis.""" # noqa: D203
30 def __init__(self, db_path: str | pathlib.Path = DEFAULT_DB_PATH):
31 """Initialize MetricsCollector with database path."""
32 self.db_path = pathlib.Path(db_path)
33 self.db_path.parent.mkdir(parents=True, exist_ok=True)
34 self._init_database()
35 self._lock = threading.Lock()
37 # Current state tracking
38 self._current_minute_data = {}
39 self._current_hour_data = {"valve_operations": 0}
40 self._last_minute = None
41 self._last_hour = None
43 def _init_database(self):
44 """Initialize database tables for new metrics schema."""
45 with self._get_db_connection() as conn:
46 # 1分毎のメトリクス
47 conn.execute("""
48 CREATE TABLE IF NOT EXISTS minute_metrics (
49 id INTEGER PRIMARY KEY AUTOINCREMENT,
50 timestamp DATETIME NOT NULL,
51 cooling_mode INTEGER,
52 duty_ratio REAL,
53 temperature REAL,
54 humidity REAL,
55 lux REAL,
56 solar_radiation REAL,
57 rain_amount REAL,
58 flow_value REAL,
59 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
60 UNIQUE(timestamp)
61 )
62 """)
64 # 1時間毎のメトリクス
65 conn.execute("""
66 CREATE TABLE IF NOT EXISTS hourly_metrics (
67 id INTEGER PRIMARY KEY AUTOINCREMENT,
68 timestamp DATETIME NOT NULL,
69 valve_operations INTEGER DEFAULT 0,
70 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
71 UNIQUE(timestamp)
72 )
73 """)
75 # エラー記録
76 conn.execute("""
77 CREATE TABLE IF NOT EXISTS error_events (
78 id INTEGER PRIMARY KEY AUTOINCREMENT,
79 timestamp DATETIME NOT NULL,
80 error_type TEXT NOT NULL,
81 error_message TEXT,
82 created_at DATETIME DEFAULT CURRENT_TIMESTAMP
83 )
84 """)
86 # インデックス作成
87 conn.execute("CREATE INDEX IF NOT EXISTS idx_minute_timestamp ON minute_metrics(timestamp)")
88 conn.execute("CREATE INDEX IF NOT EXISTS idx_hourly_timestamp ON hourly_metrics(timestamp)")
89 conn.execute("CREATE INDEX IF NOT EXISTS idx_error_timestamp ON error_events(timestamp)")
91 @contextmanager
92 def _get_db_connection(self):
93 """Get database connection with proper error handling."""
94 conn = None
95 try:
96 conn = sqlite3.connect(str(self.db_path), timeout=30.0)
97 conn.row_factory = sqlite3.Row
98 yield conn
99 conn.commit()
100 except Exception:
101 if conn:
102 conn.rollback()
103 logger.exception("Database error")
104 raise
105 finally:
106 if conn: 106 ↛ exitline 106 didn't return from function '_get_db_connection' because the condition on line 106 was always true
107 conn.close()
109 def update_cooling_mode(self, cooling_mode: int):
110 """Update current cooling mode value."""
111 with self._lock:
112 self._current_minute_data["cooling_mode"] = cooling_mode
113 self._check_minute_boundary()
115 def update_duty_ratio(self, on_time: float, total_time: float):
116 """Update duty ratio (ON time / total time)."""
117 with self._lock:
118 if total_time > 0: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true
119 self._current_minute_data["duty_ratio"] = on_time / total_time
120 self._check_minute_boundary()
122 def update_environmental_data(
123 self,
124 temperature: float | None = None,
125 humidity: float | None = None,
126 lux: float | None = None,
127 solar_radiation: float | None = None,
128 rain_amount: float | None = None,
129 ):
130 """Update environmental sensor data."""
131 with self._lock:
132 if temperature is not None: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true
133 self._current_minute_data["temperature"] = temperature
134 if humidity is not None: 134 ↛ 136line 134 didn't jump to line 136 because the condition on line 134 was always true
135 self._current_minute_data["humidity"] = humidity
136 if lux is not None: 136 ↛ 138line 136 didn't jump to line 138 because the condition on line 136 was always true
137 self._current_minute_data["lux"] = lux
138 if solar_radiation is not None: 138 ↛ 140line 138 didn't jump to line 140 because the condition on line 138 was always true
139 self._current_minute_data["solar_radiation"] = solar_radiation
140 if rain_amount is not None: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was always true
141 self._current_minute_data["rain_amount"] = rain_amount
142 self._check_minute_boundary()
144 def update_flow_value(self, flow_value: float):
145 """Update flow value when valve is ON."""
146 with self._lock:
147 self._current_minute_data["flow_value"] = flow_value
148 self._check_minute_boundary()
150 def record_valve_operation(self):
151 """Record a valve operation for hourly counting."""
152 with self._lock:
153 self._current_hour_data["valve_operations"] += 1
154 self._check_hour_boundary()
156 def record_error(self, error_type: str, error_message: str | None = None):
157 """Record an error event."""
158 now = datetime.datetime.now(TIMEZONE)
160 try:
161 with self._get_db_connection() as conn:
162 conn.execute(
163 """
164 INSERT INTO error_events (timestamp, error_type, error_message)
165 VALUES (?, ?, ?)
166 """,
167 (now, error_type, error_message),
168 )
169 logger.info("Recorded error: %s", error_type)
170 except Exception:
171 logger.exception("Failed to record error")
173 def _check_minute_boundary(self):
174 """Check if we crossed a minute boundary and save data."""
175 now = datetime.datetime.now(TIMEZONE)
176 current_minute = now.replace(second=0, microsecond=0)
178 if self._last_minute is None:
179 self._last_minute = current_minute
180 return
182 if current_minute > self._last_minute:
183 self._save_minute_data(self._last_minute)
184 self._current_minute_data = {}
185 self._last_minute = current_minute
187 def _check_hour_boundary(self):
188 """Check if we crossed an hour boundary and save data."""
189 now = datetime.datetime.now(TIMEZONE)
190 current_hour = now.replace(minute=0, second=0, microsecond=0)
192 if self._last_hour is None:
193 self._last_hour = current_hour
194 return
196 if current_hour > self._last_hour:
197 self._save_hour_data(self._last_hour)
198 self._current_hour_data = {"valve_operations": 0}
199 self._last_hour = current_hour
201 def _save_minute_data(self, timestamp: datetime.datetime):
202 """Save accumulated minute data to database."""
203 if not self._current_minute_data: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 logger.debug("No current minute data to save for %s", timestamp)
205 return
207 try:
208 data = (
209 timestamp,
210 self._current_minute_data.get("cooling_mode"),
211 self._current_minute_data.get("duty_ratio"),
212 self._current_minute_data.get("temperature"),
213 self._current_minute_data.get("humidity"),
214 self._current_minute_data.get("lux"),
215 self._current_minute_data.get("solar_radiation"),
216 self._current_minute_data.get("rain_amount"),
217 self._current_minute_data.get("flow_value"),
218 )
219 logger.info("Saving minute metrics for %s: %s", timestamp, self._current_minute_data)
221 with self._get_db_connection() as conn:
222 conn.execute(
223 """
224 INSERT OR REPLACE INTO minute_metrics
225 (timestamp, cooling_mode, duty_ratio, temperature, humidity,
226 lux, solar_radiation, rain_amount, flow_value)
227 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
228 """,
229 data,
230 )
231 logger.info("Successfully saved minute metrics for %s", timestamp)
232 except Exception:
233 logger.exception("Failed to save minute data")
235 def _save_hour_data(self, timestamp: datetime.datetime):
236 """Save accumulated hour data to database."""
237 try:
238 with self._get_db_connection() as conn:
239 conn.execute(
240 """
241 INSERT OR REPLACE INTO hourly_metrics
242 (timestamp, valve_operations)
243 VALUES (?, ?)
244 """,
245 (timestamp, self._current_hour_data["valve_operations"]),
246 )
247 logger.debug("Saved hourly metrics for %s", timestamp)
248 except Exception:
249 logger.exception("Failed to save hour data")
251 def get_minute_data(
252 self,
253 start_time: datetime.datetime | None = None,
254 end_time: datetime.datetime | None = None,
255 limit: int = 1000,
256 ) -> list:
257 """Get minute-level metrics data."""
258 with self._get_db_connection() as conn:
259 query = "SELECT * FROM minute_metrics"
260 params = []
262 if start_time or end_time:
263 query += " WHERE"
264 conditions = []
265 if start_time:
266 conditions.append(" timestamp >= ?")
267 params.append(start_time)
268 if end_time:
269 conditions.append(" timestamp <= ?")
270 params.append(end_time)
271 query += " AND".join(conditions)
273 query += " ORDER BY timestamp DESC LIMIT ?"
274 params.append(limit)
276 return [dict(row) for row in conn.execute(query, params).fetchall()]
278 def get_hourly_data(
279 self,
280 start_time: datetime.datetime | None = None,
281 end_time: datetime.datetime | None = None,
282 limit: int = 168,
283 ) -> list: # 1週間分
284 """Get hourly-level metrics data."""
285 with self._get_db_connection() as conn:
286 query = "SELECT * FROM hourly_metrics"
287 params = []
289 if start_time or end_time:
290 query += " WHERE"
291 conditions = []
292 if start_time:
293 conditions.append(" timestamp >= ?")
294 params.append(start_time)
295 if end_time:
296 conditions.append(" timestamp <= ?")
297 params.append(end_time)
298 query += " AND".join(conditions)
300 query += " ORDER BY timestamp DESC LIMIT ?"
301 params.append(limit)
303 return [dict(row) for row in conn.execute(query, params).fetchall()]
305 def get_error_data(
306 self,
307 start_time: datetime.datetime | None = None,
308 end_time: datetime.datetime | None = None,
309 limit: int = 100,
310 ) -> list:
311 """Get error events data."""
312 with self._get_db_connection() as conn:
313 query = "SELECT * FROM error_events"
314 params = []
316 if start_time or end_time:
317 query += " WHERE"
318 conditions = []
319 if start_time:
320 conditions.append(" timestamp >= ?")
321 params.append(start_time)
322 if end_time:
323 conditions.append(" timestamp <= ?")
324 params.append(end_time)
325 query += " AND".join(conditions)
327 query += " ORDER BY timestamp DESC LIMIT ?"
328 params.append(limit)
330 return [dict(row) for row in conn.execute(query, params).fetchall()]
333# Global instance
334_metrics_collector = None
337def get_metrics_collector(db_path: str | pathlib.Path = DEFAULT_DB_PATH) -> MetricsCollector:
338 """Get global metrics collector instance."""
339 global _metrics_collector # noqa: PLW0603
340 if _metrics_collector is None:
341 _metrics_collector = MetricsCollector(db_path)
342 return _metrics_collector