Coverage for src / server_list / spec / cpu_benchmark.py: 81%
329 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-31 11:45 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-31 11:45 +0000
1#!/usr/bin/env python3
2"""
3CPU Benchmark scraper from cpubenchmark.net
4Fetches multi-thread and single-thread performance scores and stores them in SQLite database.
5"""
7import logging
8import re
9import threading
10import time
11from typing import Any
13import bs4
14import requests
16from server_list.spec.db import get_connection
17from server_list.spec.db_config import get_cpu_spec_db_path
18from server_list.spec.models import CPUBenchmark
21# =============================================================================
22# In-memory cache with TTL for benchmark data
23# =============================================================================
25class BenchmarkCache:
26 """Thread-safe in-memory cache with TTL for benchmark data."""
28 def __init__(self, ttl_seconds: int = 3600):
29 self._cache: dict[str, Any] = {}
30 self._timestamps: dict[str, float] = {}
31 self._ttl = ttl_seconds
32 self._lock = threading.Lock()
34 def get(self, key: str) -> Any | None:
35 """Get cached value if not expired."""
36 with self._lock:
37 if key not in self._cache: 37 ↛ 39line 37 didn't jump to line 39 because the condition on line 37 was always true
38 return None
39 if time.time() - self._timestamps[key] > self._ttl:
40 del self._cache[key]
41 del self._timestamps[key]
42 return None
43 return self._cache[key]
45 def set(self, key: str, value: Any) -> None:
46 """Set cache value with current timestamp."""
47 with self._lock:
48 self._cache[key] = value
49 self._timestamps[key] = time.time()
51 def invalidate(self, key: str | None = None) -> None:
52 """Invalidate specific key or all cache."""
53 with self._lock:
54 if key is None: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 self._cache.clear()
56 self._timestamps.clear()
57 elif key in self._cache:
58 del self._cache[key]
59 del self._timestamps[key]
62# Global cache instance (1 hour TTL)
63_benchmark_cache = BenchmarkCache(ttl_seconds=3600)
66# =============================================================================
67# Background fetch queue for on-demand benchmark retrieval
68# =============================================================================
70class BackgroundFetchQueue:
71 """Thread-safe queue for background CPU benchmark fetches.
73 Prevents duplicate fetches and provides status tracking.
74 """
76 def __init__(self):
77 self._pending: set[str] = set()
78 self._lock = threading.Lock()
80 def is_pending(self, cpu_name: str) -> bool:
81 """Check if a fetch is already pending for this CPU."""
82 with self._lock:
83 return cpu_name in self._pending
85 def add(self, cpu_name: str) -> bool:
86 """Add CPU to pending set. Returns False if already pending."""
87 with self._lock:
88 if cpu_name in self._pending:
89 return False
90 self._pending.add(cpu_name)
91 return True
93 def remove(self, cpu_name: str) -> None:
94 """Remove CPU from pending set."""
95 with self._lock:
96 self._pending.discard(cpu_name)
99_fetch_queue = BackgroundFetchQueue()
102def queue_background_fetch(cpu_name: str) -> bool:
103 """Queue a background fetch for a CPU benchmark.
105 If the CPU is already being fetched, returns False.
106 Otherwise, starts a background thread to fetch the data
107 and returns True.
109 When fetch completes, notifies frontend via SSE.
110 """
111 if not _fetch_queue.add(cpu_name):
112 logging.debug("Fetch already pending for: %s", cpu_name)
113 return False
115 def _fetch_task():
116 try:
117 result = fetch_and_save_benchmark(cpu_name)
118 if result:
119 # Import here to avoid circular import
120 import my_lib.webapp.event
121 my_lib.webapp.event.notify_event(my_lib.webapp.event.EVENT_TYPE.CONTENT)
122 logging.info("Background fetch completed for: %s", cpu_name)
123 except Exception:
124 logging.exception("Background fetch failed for: %s", cpu_name)
125 finally:
126 _fetch_queue.remove(cpu_name)
128 thread = threading.Thread(target=_fetch_task, daemon=True)
129 thread.start()
130 logging.info("Queued background fetch for: %s", cpu_name)
131 return True
134def queue_background_fetch_batch(cpu_names: list[str]) -> int:
135 """Queue background fetches for multiple CPUs.
137 Returns the number of CPUs that were queued (not already pending).
138 """
139 queued = 0
140 for cpu_name in cpu_names:
141 if queue_background_fetch(cpu_name):
142 queued += 1
143 return queued
146def is_fetch_pending(cpu_name: str) -> bool:
147 """Check if a background fetch is pending for this CPU."""
148 return _fetch_queue.is_pending(cpu_name)
151MULTITHREAD_URL = "https://www.cpubenchmark.net/multithread/"
152SINGLETHREAD_URL = "https://www.cpubenchmark.net/singleThread.html"
153CPU_LIST_URL = "https://www.cpubenchmark.net/cpu_list.php"
155HEADERS = {
156 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
157}
160CPU_BENCHMARK_SCHEMA = """
161CREATE TABLE IF NOT EXISTS cpu_benchmark (
162 id INTEGER PRIMARY KEY AUTOINCREMENT,
163 cpu_name TEXT UNIQUE NOT NULL,
164 multi_thread_score INTEGER,
165 single_thread_score INTEGER,
166 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
167)
168"""
171def init_db():
172 """Initialize the SQLite database."""
173 with get_connection(get_cpu_spec_db_path()) as conn:
174 conn.executescript(CPU_BENCHMARK_SCHEMA)
175 conn.commit()
178def extract_model_number(cpu_name: str) -> str | None:
179 """Extract the model number from CPU name for precise matching."""
180 patterns = [
181 r"(E5-\d{4}\s*v\d)", # Xeon E5-2699 v4
182 r"(i[3579]-\d{4,5}\w*)", # Core i5-1135G7, i7-12700K
183 r"(Ryzen\s+\d+\s+\d{4}\w*)", # Ryzen 9 5900X
184 r"(EPYC\s+\d{4}\w*)", # EPYC 7742
185 r"(\d{4,5}\w*)", # Generic model number
186 ]
188 for pattern in patterns:
189 match = re.search(pattern, cpu_name, re.IGNORECASE)
190 if match:
191 return match.group(1).lower().replace(" ", "")
193 return None
196def normalize_cpu_name(cpu_name: str) -> str:
197 """Normalize CPU name for matching."""
198 name = " ".join(cpu_name.split())
199 # Remove clock speed info
200 name = re.sub(r"@.*$", "", name).strip()
201 # Remove trademark symbols
202 name = name.replace("(R)", "").replace("(TM)", "").replace("®", "").replace("™", "")
203 # Normalize whitespace again after removing symbols
204 name = " ".join(name.split())
205 return name
208def _match_by_model_number(
209 search_name: str, candidate_name: str, search_lower: str, candidate_lower: str
210) -> float | None:
211 """モデル番号による精密マッチング."""
212 search_model = extract_model_number(search_name)
213 candidate_model = extract_model_number(candidate_name)
215 if not search_model or not candidate_model:
216 return None
218 if search_model == candidate_model:
219 return 1.0
221 if search_model not in candidate_model and candidate_model not in search_model:
222 return None
224 # 部分一致の場合、バージョンチェック
225 search_version = re.search(r"v(\d)", search_lower)
226 candidate_version = re.search(r"v(\d)", candidate_lower)
227 if search_version and candidate_version: 227 ↛ 230line 227 didn't jump to line 230 because the condition on line 227 was always true
228 if search_version.group(1) != candidate_version.group(1):
229 return 0.3
230 return 0.9
233def _match_xeon_e5(search_lower: str, candidate_lower: str) -> float | None:
234 """Xeon E5 シリーズの特別マッチング."""
235 search_id = re.search(r"e5-(\d{4})", search_lower)
236 candidate_id = re.search(r"e5-(\d{4})", candidate_lower)
238 if not search_id or not candidate_id:
239 return None
241 if search_id.group(1) != candidate_id.group(1):
242 return 0.2
244 # 同一モデル - バージョンチェック
245 search_v = re.search(r"v(\d)", search_lower)
246 candidate_v = re.search(r"v(\d)", candidate_lower)
248 if search_v and candidate_v and search_v.group(1) == candidate_v.group(1): 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 return 0.95
250 if not search_v and not candidate_v: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 return 0.95
253 return 0.2
256def _match_core_i(search_lower: str, candidate_lower: str) -> float | None:
257 """Intel Core i シリーズの特別マッチング."""
258 search_core = re.search(r"i([3579])-(\d{4,5})", search_lower)
259 candidate_core = re.search(r"i([3579])-(\d{4,5})", candidate_lower)
261 if not search_core or not candidate_core:
262 return None
264 if ( 264 ↛ 268line 264 didn't jump to line 268 because the condition on line 264 was never true
265 search_core.group(1) == candidate_core.group(1)
266 and search_core.group(2) == candidate_core.group(2)
267 ):
268 return 0.95
270 return 0.2
273def _match_by_word_overlap(search_lower: str, candidate_lower: str) -> float:
274 """単語の重複によるファジーマッチング."""
275 search_words = set(re.findall(r"\w+", search_lower))
276 candidate_words = set(re.findall(r"\w+", candidate_lower))
278 if not search_words:
279 return 0.0
281 common_words = search_words & candidate_words
282 return len(common_words) / len(search_words) * 0.5
285def calculate_match_score(search_name: str, candidate_name: str) -> float:
286 """Calculate how well the candidate matches the search name."""
287 search_lower = normalize_cpu_name(search_name).lower()
288 candidate_lower = normalize_cpu_name(candidate_name).lower()
290 # 1. モデル番号による精密マッチング
291 if (score := _match_by_model_number(search_name, candidate_name, search_lower, candidate_lower)) is not None:
292 return score
294 # 2. 完全一致
295 if search_lower == candidate_lower:
296 return 1.0
298 # 3. Xeon E5 シリーズ特別処理
299 if (score := _match_xeon_e5(search_lower, candidate_lower)) is not None:
300 return score
302 # 4. Core i シリーズ特別処理
303 if (score := _match_core_i(search_lower, candidate_lower)) is not None:
304 return score
306 # 5. 単語重複によるファジーマッチング
307 return _match_by_word_overlap(search_lower, candidate_lower)
310def _extract_benchmark_score_from_chart_entry(entry_text: str) -> int | None:
311 """チャートエントリからベンチマークスコアを抽出.
313 Args:
314 entry_text: エントリのテキスト (例: "CPU Name(XX%)12,345$XXX")
316 Returns:
317 ベンチマークスコア (int) または None
318 """
319 score_match = re.search(r"\)\s*([\d,]+)", entry_text)
320 if not score_match:
321 return None
323 try:
324 return int(score_match.group(1).replace(",", ""))
325 except ValueError:
326 return None
329def _extract_benchmark_score_from_table_cell(cell_text: str) -> int | None:
330 """テーブルセルからベンチマークスコアを抽出.
332 Args:
333 cell_text: セルのテキスト
335 Returns:
336 ベンチマークスコア (int) または None
337 """
338 try:
339 return int(re.sub(r"[^\d]", "", cell_text))
340 except ValueError:
341 return None
344def search_chart_page(url: str, cpu_name: str) -> tuple[str | None, int | None]:
345 """Search for CPU on a chart page (multithread or singlethread)."""
346 try:
347 response = requests.get(url, headers=HEADERS, timeout=30)
348 response.raise_for_status()
349 except requests.RequestException as e:
350 logging.warning("Error fetching %s: %s", url, e)
351 return None, None
353 soup = bs4.BeautifulSoup(response.text, "html.parser")
354 entries = soup.select("ul.chartlist li")
356 best_match_name = None
357 best_match_score_value = None
358 best_score = 0.0
360 for entry in entries:
361 link = entry.select_one("a")
362 if not link:
363 continue
365 entry_cpu_name = link.get_text(strip=True)
366 match_score = calculate_match_score(cpu_name, entry_cpu_name)
367 if match_score <= best_score or match_score <= 0.5:
368 continue
370 benchmark_score = _extract_benchmark_score_from_chart_entry(entry.get_text())
371 if benchmark_score is not None:
372 best_match_name = entry_cpu_name
373 best_match_score_value = benchmark_score
374 best_score = match_score
376 return best_match_name, best_match_score_value
379def search_cpu_list(cpu_name: str) -> tuple[str | None, int | None]:
380 """Search for CPU on the CPU list page (for multi-thread score)."""
381 try:
382 response = requests.get(CPU_LIST_URL, headers=HEADERS, timeout=30)
383 response.raise_for_status()
384 except requests.RequestException as e:
385 logging.warning("Error fetching CPU list page: %s", e)
386 return None, None
388 soup = bs4.BeautifulSoup(response.text, "html.parser")
389 table = soup.find("table", id="cputable")
390 if not table:
391 return None, None
393 tbody = table.find("tbody")
394 if not tbody:
395 return None, None
397 best_match_name = None
398 best_match_score_value = None
399 best_score = 0.0
401 for row in tbody.find_all("tr"):
402 cells = row.find_all("td")
403 if len(cells) < 2:
404 continue
406 name_link = cells[0].find("a")
407 if not name_link:
408 continue
410 entry_cpu_name = name_link.get_text(strip=True)
411 match_score = calculate_match_score(cpu_name, entry_cpu_name)
412 if match_score <= best_score or match_score <= 0.5: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 continue
415 benchmark_score = _extract_benchmark_score_from_table_cell(cells[1].get_text(strip=True))
416 if benchmark_score is not None:
417 best_match_name = entry_cpu_name
418 best_match_score_value = benchmark_score
419 best_score = match_score
421 return best_match_name, best_match_score_value
424def search_cpu_benchmark(cpu_name: str) -> CPUBenchmark | None:
425 """
426 Search for CPU benchmark scores on cpubenchmark.net.
428 Fetches both multi-thread and single-thread scores.
430 Returns CPUBenchmark with multi_thread_score and single_thread_score, or None if not found.
431 """
432 normalized_name = normalize_cpu_name(cpu_name)
434 # Get multi-thread score (try multithread page first, then CPU list)
435 multi_name, multi_score = search_chart_page(MULTITHREAD_URL, normalized_name)
436 if not multi_score:
437 multi_name, multi_score = search_cpu_list(normalized_name)
439 # Get single-thread score
440 single_name, single_score = search_chart_page(SINGLETHREAD_URL, normalized_name)
442 # Use the best matched name
443 result_name = multi_name or single_name
445 if not result_name:
446 return None
448 return CPUBenchmark(
449 cpu_name=result_name,
450 multi_thread_score=multi_score,
451 single_thread_score=single_score,
452 )
455def save_benchmark(cpu_name: str, multi_thread: int | None, single_thread: int | None):
456 """Save benchmark data to database."""
457 with get_connection(get_cpu_spec_db_path()) as conn:
458 cursor = conn.cursor()
459 cursor.execute("""
460 INSERT OR REPLACE INTO cpu_benchmark (cpu_name, multi_thread_score, single_thread_score, updated_at)
461 VALUES (?, ?, ?, CURRENT_TIMESTAMP)
462 """, (cpu_name, multi_thread, single_thread))
463 conn.commit()
465 # Invalidate cache when new data is saved
466 _benchmark_cache.invalidate("all_benchmarks")
469def get_benchmark(cpu_name: str) -> CPUBenchmark | None:
470 """Get benchmark data from database."""
471 normalized_name = normalize_cpu_name(cpu_name)
472 logging.debug("Looking up CPU benchmark for: %s (normalized: %s)", cpu_name, normalized_name)
474 with get_connection(get_cpu_spec_db_path()) as conn:
475 cursor = conn.cursor()
477 # First try exact match
478 cursor.execute("""
479 SELECT cpu_name, multi_thread_score, single_thread_score
480 FROM cpu_benchmark
481 WHERE cpu_name = ?
482 """, (cpu_name,))
484 row = cursor.fetchone()
486 if not row:
487 # Try fuzzy match with LIKE using original name
488 cursor.execute("""
489 SELECT cpu_name, multi_thread_score, single_thread_score
490 FROM cpu_benchmark
491 WHERE cpu_name LIKE ?
492 """, (f"%{cpu_name}%",))
493 row = cursor.fetchone()
495 if not row:
496 # Try fuzzy match with LIKE using normalized name
497 cursor.execute("""
498 SELECT cpu_name, multi_thread_score, single_thread_score
499 FROM cpu_benchmark
500 WHERE cpu_name LIKE ?
501 """, (f"%{normalized_name}%",))
502 row = cursor.fetchone()
504 if not row:
505 # Try model number based matching
506 model = extract_model_number(cpu_name)
507 if model:
508 cursor.execute("""
509 SELECT cpu_name, multi_thread_score, single_thread_score
510 FROM cpu_benchmark
511 """)
512 all_rows = cursor.fetchall()
513 for r in all_rows: 513 ↛ 514line 513 didn't jump to line 514 because the loop on line 513 never started
514 db_model = extract_model_number(r[0])
515 if db_model and db_model == model:
516 row = r
517 break
519 if row:
520 logging.debug("Found benchmark for %s: multi=%s, single=%s", cpu_name, row[1], row[2])
521 return CPUBenchmark(
522 cpu_name=row[0],
523 multi_thread_score=row[1],
524 single_thread_score=row[2],
525 )
527 logging.debug("No benchmark found for: %s", cpu_name)
528 return None
531def get_all_benchmarks() -> dict[str, CPUBenchmark]:
532 """Get all benchmark data from database in a single query.
534 Uses in-memory cache with 1 hour TTL to avoid repeated DB queries.
536 Returns:
537 Dict mapping CPU name to CPUBenchmark
538 """
539 cache_key = "all_benchmarks"
541 # Try cache first
542 cached = _benchmark_cache.get(cache_key)
543 if cached is not None: 543 ↛ 544line 543 didn't jump to line 544 because the condition on line 543 was never true
544 return cached
546 # Fetch from database
547 with get_connection(get_cpu_spec_db_path()) as conn:
548 cursor = conn.cursor()
549 cursor.execute("""
550 SELECT cpu_name, multi_thread_score, single_thread_score
551 FROM cpu_benchmark
552 """)
554 result = {
555 row[0]: CPUBenchmark(
556 cpu_name=row[0],
557 multi_thread_score=row[1],
558 single_thread_score=row[2],
559 )
560 for row in cursor.fetchall()
561 }
563 # Cache the result
564 _benchmark_cache.set(cache_key, result)
565 return result
568def _find_benchmark_match(
569 cpu_name: str, all_benchmarks: dict[str, CPUBenchmark]
570) -> CPUBenchmark | None:
571 """Find a matching benchmark for a CPU name using various strategies.
573 Matching strategies (in order of priority):
574 1. Exact match
575 2. Substring match (original name)
576 3. Substring match (normalized name)
577 4. Model number match
579 Args:
580 cpu_name: CPU name to look up
581 all_benchmarks: Dict of all benchmarks from database
583 Returns:
584 Matching CPUBenchmark or None if not found
585 """
586 # Try exact match first
587 if cpu_name in all_benchmarks:
588 return all_benchmarks[cpu_name]
590 # Try fuzzy matching
591 normalized_name = normalize_cpu_name(cpu_name)
592 for db_name, benchmark in all_benchmarks.items():
593 if cpu_name in db_name or normalized_name in db_name:
594 return benchmark
596 # Try model number matching
597 if model := extract_model_number(cpu_name): 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true
598 for db_name, benchmark in all_benchmarks.items():
599 if (db_model := extract_model_number(db_name)) and db_model == model:
600 return benchmark
602 return None
605def get_benchmarks_batch(cpu_names: list[str]) -> dict[str, CPUBenchmark | None]:
606 """Get benchmark data for multiple CPUs efficiently.
608 Uses a single DB query to fetch all benchmarks, then matches
609 against requested CPU names using various matching strategies.
611 Args:
612 cpu_names: List of CPU names to look up
614 Returns:
615 Dict mapping requested CPU name to CPUBenchmark (or None if not found)
616 """
617 all_benchmarks = get_all_benchmarks()
618 return {cpu_name: _find_benchmark_match(cpu_name, all_benchmarks) for cpu_name in cpu_names}
621def clear_benchmark(cpu_name: str):
622 """Clear benchmark data from database."""
623 with get_connection(get_cpu_spec_db_path()) as conn:
624 cursor = conn.cursor()
625 cursor.execute("DELETE FROM cpu_benchmark WHERE cpu_name = ?", (cpu_name,))
626 conn.commit()
628 # Invalidate cache when data is deleted
629 _benchmark_cache.invalidate("all_benchmarks")
632def fetch_and_save_benchmark(cpu_name: str) -> CPUBenchmark | None:
633 """Fetch benchmark from web and save to database."""
634 logging.info("Fetching CPU benchmark from web for: %s", cpu_name)
635 result = search_cpu_benchmark(cpu_name)
637 if result:
638 logging.info("Found benchmark for %s: multi=%s, single=%s",
639 cpu_name, result.multi_thread_score, result.single_thread_score)
640 save_benchmark(
641 cpu_name,
642 result.multi_thread_score,
643 result.single_thread_score
644 )
645 return result
647 logging.warning("Could not find benchmark data for: %s", cpu_name)
648 return None
651def main():
652 """Main function to test the scraper."""
653 logging.basicConfig(level=logging.INFO)
654 init_db()
656 test_cpus = [
657 "Core i5-1135G7",
658 "Intel Xeon E5-2699 v4",
659 ]
661 for cpu in test_cpus:
662 logging.info("Searching for: %s", cpu)
664 # Clear existing cache to re-fetch
665 clear_benchmark(cpu)
667 # Fetch from web
668 result = fetch_and_save_benchmark(cpu)
669 if result: 669 ↛ 672line 669 didn't jump to line 672 because the condition on line 669 was always true
670 logging.info(" Found: %s", result)
671 else:
672 logging.info(" Not found")
674 # Be nice to the server
675 time.sleep(2)
678if __name__ == "__main__":
679 main()