Coverage for src/unit_cooler/metrics/analyzer.py: 16%

107 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 14:35 +0000

1""" 

2New metrics analysis for outdoor unit cooler system. 

3 

4Provides: 

5- 時間別の cooling_mode, DUTY比, バルブ操作回数の箱ヒゲ図 

6- 時系列推移グラフ 

7- 環境要因との相関分析 (散布図と相関係数) 

8""" 

9 

10import datetime 

11import logging 

12import zoneinfo 

13 

14# 分析ライブラリ 

15try: 

16 import pandas as pd 

17 from scipy import stats 

18 

19 _ANALYSIS_AVAILABLE = True 

20except ImportError: 

21 _ANALYSIS_AVAILABLE = False 

22 logger = logging.getLogger(__name__) 

23 logger.warning("Analysis libraries not available. Install numpy, pandas, scipy for analytics.") 

24 

25from .collector import MetricsCollector, get_metrics_collector 

26 

27TIMEZONE = zoneinfo.ZoneInfo("Asia/Tokyo") 

28logger = logging.getLogger(__name__) 

29 

30 

31class MetricsAnalyzer: 

32 """Metrics analysis focused on cooling mode and environmental correlations.""" # noqa: D203 

33 

34 def __init__(self, collector: MetricsCollector | None = None): 

35 """Initialize analyzer with metrics collector.""" 

36 self.collector = collector or get_metrics_collector() 

37 

38 def get_hourly_boxplot_data(self, days: int = 7) -> dict: 

39 """Get hourly box plot data for cooling_mode, duty_ratio, valve_operations.""" 

40 if not _ANALYSIS_AVAILABLE: 

41 return {"error": "Analysis libraries not available"} 

42 

43 end_time = datetime.datetime.now(TIMEZONE) 

44 start_time = end_time - datetime.timedelta(days=days) 

45 

46 # Get minute data for cooling_mode and duty_ratio 

47 minute_data = self.collector.get_minute_data(start_time, end_time, limit=10080) # 7 days 

48 # Get hourly data for valve operations 

49 hourly_data = self.collector.get_hourly_data(start_time, end_time, limit=168) # 7 days 

50 

51 # Process minute data 

52 df_minute = pd.DataFrame(minute_data) 

53 if not df_minute.empty: 

54 df_minute["timestamp"] = pd.to_datetime(df_minute["timestamp"]) 

55 df_minute["hour"] = df_minute["timestamp"].dt.hour 

56 

57 # Process hourly data 

58 df_hourly = pd.DataFrame(hourly_data) 

59 if not df_hourly.empty: 

60 df_hourly["timestamp"] = pd.to_datetime(df_hourly["timestamp"]) 

61 df_hourly["hour"] = df_hourly["timestamp"].dt.hour 

62 

63 return { 

64 "cooling_mode_boxplot": self._calculate_hourly_boxplot(df_minute, "cooling_mode"), 

65 "duty_ratio_boxplot": self._calculate_hourly_boxplot(df_minute, "duty_ratio"), 

66 "valve_operations_boxplot": self._calculate_hourly_boxplot(df_hourly, "valve_operations"), 

67 } 

68 

69 def get_timeseries_data(self, days: int = 7) -> dict: 

70 """Get time series data for trending analysis.""" 

71 end_time = datetime.datetime.now(TIMEZONE) 

72 start_time = end_time - datetime.timedelta(days=days) 

73 

74 minute_data = self.collector.get_minute_data(start_time, end_time, limit=10080) 

75 hourly_data = self.collector.get_hourly_data(start_time, end_time, limit=168) 

76 

77 return { 

78 "cooling_mode_timeseries": [ 

79 {"timestamp": row["timestamp"], "value": row["cooling_mode"]} 

80 for row in minute_data 

81 if row["cooling_mode"] is not None 

82 ], 

83 "duty_ratio_timeseries": [ 

84 {"timestamp": row["timestamp"], "value": row["duty_ratio"]} 

85 for row in minute_data 

86 if row["duty_ratio"] is not None 

87 ], 

88 "valve_operations_timeseries": [ 

89 {"timestamp": row["timestamp"], "value": row["valve_operations"]} for row in hourly_data 

90 ], 

91 } 

92 

93 def get_correlation_analysis(self, days: int = 30) -> dict: 

94 """Get correlation analysis between environmental factors and system metrics.""" 

95 if not _ANALYSIS_AVAILABLE: 

96 return {"error": "Analysis libraries not available"} 

97 

98 end_time = datetime.datetime.now(TIMEZONE) 

99 start_time = end_time - datetime.timedelta(days=days) 

100 

101 minute_data = self.collector.get_minute_data(start_time, end_time, limit=43200) # 30 days 

102 df = pd.DataFrame(minute_data) 

103 

104 if df.empty: 

105 return {"error": "No data available for correlation analysis"} 

106 

107 # Environmental factors 

108 env_factors = ["temperature", "humidity", "lux", "solar_radiation", "rain_amount"] 

109 target_metrics = ["cooling_mode", "duty_ratio"] 

110 

111 correlations = {} 

112 scatter_data = {} 

113 

114 for target in target_metrics: 

115 correlations[target] = {} 

116 scatter_data[target] = {} 

117 

118 for factor in env_factors: 

119 # Filter data where both values are not null 

120 valid_data = df.dropna(subset=[target, factor]) 

121 

122 if len(valid_data) > 10: # Minimum data points for correlation 

123 corr_coef, p_value = stats.pearsonr(valid_data[factor], valid_data[target]) 

124 

125 correlations[target][factor] = { 

126 "correlation": float(corr_coef), 

127 "p_value": float(p_value), 

128 "significant": p_value < 0.05, 

129 "sample_size": len(valid_data), 

130 } 

131 

132 # Scatter plot data (sample for performance) 

133 if len(valid_data) > 1000: 

134 sampled_data = valid_data.sample(n=1000, random_state=42) 

135 else: 

136 sampled_data = valid_data 

137 

138 scatter_data[target][factor] = [ 

139 {"x": float(row[factor]), "y": float(row[target])} 

140 for _, row in sampled_data.iterrows() 

141 ] 

142 else: 

143 correlations[target][factor] = { 

144 "correlation": None, 

145 "p_value": None, 

146 "significant": False, 

147 "sample_size": len(valid_data), 

148 } 

149 scatter_data[target][factor] = [] 

150 

151 return {"correlations": correlations, "scatter_data": scatter_data} 

152 

153 def _calculate_hourly_boxplot(self, df, column: str) -> list[dict]: 

154 """Calculate box plot statistics for each hour.""" 

155 if df.empty or column not in df.columns: 

156 return [] 

157 

158 # Remove null values 

159 df_clean = df.dropna(subset=[column]) 

160 if df_clean.empty: 

161 return [] 

162 

163 boxplot_data = [] 

164 

165 for hour in range(24): 

166 hour_data = df_clean[df_clean["hour"] == hour][column] 

167 

168 if len(hour_data) > 0: 

169 stats_data = { 

170 "hour": hour, 

171 "min": float(hour_data.min()), 

172 "q1": float(hour_data.quantile(0.25)), 

173 "median": float(hour_data.median()), 

174 "q3": float(hour_data.quantile(0.75)), 

175 "max": float(hour_data.max()), 

176 "count": len(hour_data), 

177 "outliers": self._detect_outliers(hour_data), 

178 } 

179 boxplot_data.append(stats_data) 

180 else: 

181 boxplot_data.append( 

182 { 

183 "hour": hour, 

184 "min": None, 

185 "q1": None, 

186 "median": None, 

187 "q3": None, 

188 "max": None, 

189 "count": 0, 

190 "outliers": [], 

191 } 

192 ) 

193 

194 return boxplot_data 

195 

196 def _detect_outliers(self, data) -> list[float]: 

197 """Detect outliers using IQR method.""" 

198 q1 = data.quantile(0.25) 

199 q3 = data.quantile(0.75) 

200 iqr = q3 - q1 

201 lower_bound = q1 - 1.5 * iqr 

202 upper_bound = q3 + 1.5 * iqr 

203 

204 outliers = data[(data < lower_bound) | (data > upper_bound)] 

205 return [float(x) for x in outliers.tolist()] 

206 

207 def get_summary_statistics(self, days: int = 7) -> dict: 

208 """Get summary statistics for the dashboard.""" 

209 end_time = datetime.datetime.now(TIMEZONE) 

210 start_time = end_time - datetime.timedelta(days=days) 

211 

212 minute_data = self.collector.get_minute_data(start_time, end_time, limit=10080) 

213 hourly_data = self.collector.get_hourly_data(start_time, end_time, limit=168) 

214 error_data = self.collector.get_error_data(start_time, end_time, limit=1000) 

215 

216 # Calculate statistics 

217 df_minute = pd.DataFrame(minute_data) 

218 df_hourly = pd.DataFrame(hourly_data) 

219 

220 return { 

221 "period_days": days, 

222 "total_data_points": len(minute_data), 

223 "total_errors": len(error_data), 

224 "cooling_mode": self._get_column_stats(df_minute, "cooling_mode"), 

225 "duty_ratio": self._get_column_stats(df_minute, "duty_ratio"), 

226 "valve_operations": self._get_column_stats(df_hourly, "valve_operations"), 

227 "environmental": { 

228 "temperature": self._get_column_stats(df_minute, "temperature"), 

229 "humidity": self._get_column_stats(df_minute, "humidity"), 

230 "lux": self._get_column_stats(df_minute, "lux"), 

231 "solar_radiation": self._get_column_stats(df_minute, "solar_radiation"), 

232 "rain_amount": self._get_column_stats(df_minute, "rain_amount"), 

233 }, 

234 } 

235 

236 def _get_column_stats(self, df, column: str) -> dict: 

237 """Get basic statistics for a column.""" 

238 if df.empty or column not in df.columns: 

239 return {"count": 0, "mean": None, "median": None, "std": None, "min": None, "max": None} 

240 

241 data = df[column].dropna() 

242 if len(data) == 0: 

243 return {"count": 0, "mean": None, "median": None, "std": None, "min": None, "max": None} 

244 

245 return { 

246 "count": len(data), 

247 "mean": float(data.mean()), 

248 "median": float(data.median()), 

249 "std": float(data.std()), 

250 "min": float(data.min()), 

251 "max": float(data.max()), 

252 } 

253 

254 

255def get_metrics_analyzer() -> MetricsAnalyzer: 

256 """Get metrics analyzer instance.""" 

257 return MetricsAnalyzer()