Coverage for src / server_list / spec / cache_manager.py: 90%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-31 11:45 +0000

1#!/usr/bin/env python3 

2""" 

3Cache manager for server-list data. 

4Caches config, VM info, CPU benchmarks to SQLite for fast API responses. 

5Updates data in background and notifies via SSE. 

6""" 

7 

8import json 

9import logging 

10import pathlib 

11import sqlite3 

12import threading 

13from datetime import datetime 

14 

15import my_lib.config 

16import my_lib.webapp.event 

17 

18import server_list.spec.db 

19import server_list.spec.db_config 

20 

21UPDATE_INTERVAL_SEC = 300 # 5 minutes 

22 

23_update_thread: threading.Thread | None = None 

24_should_stop = threading.Event() 

25_db_lock = threading.Lock() 

26_watch_thread: threading.Thread | None = None 

27_watch_stop_event: threading.Event | None = None 

28 

29 

30CACHE_SCHEMA = """ 

31CREATE TABLE IF NOT EXISTS cache ( 

32 key TEXT PRIMARY KEY, 

33 value TEXT NOT NULL, 

34 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 

35) 

36""" 

37 

38 

39def init_db(): 

40 """Initialize the cache database.""" 

41 with server_list.spec.db.get_connection( 

42 server_list.spec.db_config.get_cache_db_path() 

43 ) as conn: 

44 conn.executescript(CACHE_SCHEMA) 

45 conn.commit() 

46 

47 

48def _get_cache(key: str) -> dict | None: 

49 """Internal: Get cached value by key. 

50 

51 For type-safe access, use the specialized getters: 

52 - get_config() for config cache (returns dict | None) 

53 

54 Args: 

55 key: Cache key to retrieve 

56 

57 Returns: 

58 Cached value as dict, or None if not found 

59 """ 

60 try: 

61 with _db_lock, server_list.spec.db.get_connection( 

62 server_list.spec.db_config.get_cache_db_path() 

63 ) as conn: 

64 cursor = conn.cursor() 

65 cursor.execute("SELECT value FROM cache WHERE key = ?", (key,)) 

66 row = cursor.fetchone() 

67 

68 if row: 

69 return json.loads(row[0]) 

70 except (sqlite3.Error, json.JSONDecodeError) as e: 

71 logging.warning("Failed to get cache for %s: %s", key, e) 

72 

73 return None 

74 

75 

76def _set_cache(key: str, value: dict): 

77 """Internal: Set cache value.""" 

78 try: 

79 with _db_lock, server_list.spec.db.get_connection( 

80 server_list.spec.db_config.get_cache_db_path() 

81 ) as conn: 

82 cursor = conn.cursor() 

83 cursor.execute(""" 

84 INSERT OR REPLACE INTO cache (key, value, updated_at) 

85 VALUES (?, ?, ?) 

86 """, (key, json.dumps(value, ensure_ascii=False), datetime.now().isoformat())) 

87 conn.commit() 

88 except sqlite3.Error as e: 

89 logging.warning("Failed to set cache for %s: %s", key, e) 

90 

91 

92def _get_cache_state(db_path: str | pathlib.Path) -> str | None: 

93 """キャッシュ DB の状態を取得する.""" 

94 try: 

95 with sqlite3.connect(db_path) as conn: 

96 cursor = conn.execute("SELECT MAX(updated_at) FROM cache") 

97 row = cursor.fetchone() 

98 return row[0] if row else None 

99 except sqlite3.Error: 

100 logging.exception("Failed to get cache state") 

101 return None 

102 

103 

104def load_config_from_file() -> dict | None: 

105 """Load config from YAML file with schema validation. 

106 

107 Uses my_lib.config.load() for consistent config loading 

108 with schema validation across the codebase. 

109 The config path is obtained from db_config for testability. 

110 Falls back to yaml.safe_load() if schema file is not available (e.g., in tests). 

111 """ 

112 import yaml 

113 

114 try: 

115 config_path = server_list.spec.db_config.get_config_path() 

116 if not config_path.exists(): 

117 return None 

118 

119 # Use centralized schema path from db module 

120 schema_path = server_list.spec.db.CONFIG_SCHEMA_PATH 

121 

122 # Use schema validation if available, otherwise fallback to yaml.safe_load 

123 if schema_path.exists(): 

124 return my_lib.config.load(config_path, schema_path) 

125 

126 with open(config_path, encoding="utf-8") as f: 

127 return yaml.safe_load(f) 

128 except Exception as e: 

129 logging.warning("Failed to load config: %s", e) 

130 return None 

131 

132 

133def get_config() -> dict | None: 

134 """Get config from cache, or load from file if not cached. 

135 

136 This is the recommended way to access config data, providing 

137 type-safe access with dict | None return type. 

138 

139 Returns: 

140 Config dictionary or None if not available 

141 """ 

142 cached = _get_cache("config") 

143 if cached: 

144 return cached 

145 

146 # Load from file and cache 

147 config = load_config_from_file() 

148 if config: 148 ↛ 150line 148 didn't jump to line 150 because the condition on line 148 was always true

149 _set_cache("config", config) 

150 return config 

151 

152 

153def update_all_caches(): 

154 """Update all caches from source data.""" 

155 updated = False 

156 

157 # Update config cache 

158 config = load_config_from_file() 

159 if config: 

160 old_config = _get_cache("config") 

161 if old_config != config: 

162 _set_cache("config", config) 

163 updated = True 

164 logging.info("Config cache updated") 

165 

166 if updated: 

167 logging.info("Cache updated") 

168 my_lib.webapp.event.notify_event(my_lib.webapp.event.EVENT_TYPE.CONTENT) 

169 

170 

171def _update_worker(): 

172 """Background worker that updates caches periodically.""" 

173 logging.info("Cache update worker started (interval: %d sec)", UPDATE_INTERVAL_SEC) 

174 

175 # Initial update 

176 update_all_caches() 

177 

178 while not _should_stop.wait(UPDATE_INTERVAL_SEC): 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 update_all_caches() 

180 

181 logging.info("Cache update worker stopped") 

182 

183 

184def start_cache_worker(): 

185 """Start the background cache update worker.""" 

186 global _update_thread, _watch_thread, _watch_stop_event 

187 

188 init_db() 

189 

190 # Initial cache population 

191 _watch_stop_event, _watch_thread = my_lib.webapp.event.start_db_state_watcher( 

192 server_list.spec.db_config.get_cache_db_path(), 

193 _get_cache_state, 

194 my_lib.webapp.event.EVENT_TYPE.CONTENT, 

195 notify_on_first=True, 

196 ) 

197 

198 update_all_caches() 

199 

200 if _update_thread and _update_thread.is_alive(): 

201 return 

202 

203 _should_stop.clear() 

204 _update_thread = threading.Thread(target=_update_worker, daemon=True) 

205 _update_thread.start() 

206 

207 

208def stop_cache_worker(): 

209 """Stop the background cache update worker.""" 

210 global _watch_thread, _watch_stop_event 

211 _should_stop.set() 

212 if _update_thread: 212 ↛ 215line 212 didn't jump to line 215 because the condition on line 212 was always true

213 _update_thread.join(timeout=5) 

214 

215 if _watch_thread is not None and _watch_stop_event is not None: 215 ↛ exitline 215 didn't return from function 'stop_cache_worker' because the condition on line 215 was always true

216 my_lib.webapp.event.stop_db_state_watcher(_watch_stop_event, _watch_thread) 

217 _watch_thread = None 

218 _watch_stop_event = None