Coverage for src / rasp_shutter / control / webapi / test / sync.py: 32%
161 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-13 00:10 +0900
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-13 00:10 +0900
1#!/usr/bin/env python3
2"""テスト同期用API
4テストでの非同期処理の同期化を支援するAPIを提供します。
5"""
7import logging
8import threading
9import time
11import flask
12import my_lib.footprint
13import my_lib.notify.slack
14import my_lib.time
15import my_lib.webapp.config
17import rasp_shutter.control.config
18import rasp_shutter.control.scheduler
19import rasp_shutter.control.webapi.control
20import rasp_shutter.util
22blueprint = flask.Blueprint("rasp-shutter-test-sync", __name__, url_prefix=my_lib.webapp.config.URL_PREFIX)
24# イベント管理用の辞書
25_events: dict[str, threading.Event] = {}
26_events_lock = threading.Lock()
29def _get_event(event_name: str) -> threading.Event:
30 """イベントを取得(存在しない場合は作成)"""
31 with _events_lock:
32 if event_name not in _events:
33 _events[event_name] = threading.Event()
34 return _events[event_name]
37@blueprint.route("/api/test/sync/wait/<event_name>", methods=["POST"])
38@rasp_shutter.util.require_dummy_mode
39def wait_for_event(event_name: str):
40 """
41 イベントの完了を待機
43 Args:
44 event_name: イベント名
46 Query Params:
47 timeout: タイムアウト秒数(デフォルト: 30)
49 Returns:
50 JSON: 待機結果
51 """
52 # FlaskのTypeConversionDictの型定義が不完全なため抑制
53 timeout = flask.request.args.get("timeout", 30, type=float) # type: ignore[arg-type]
55 event = _get_event(event_name)
56 event.clear() # 待機前にクリア
58 logging.info("Waiting for event '%s' (timeout: %s sec)", event_name, timeout)
60 if event.wait(timeout):
61 logging.info("Event '%s' received", event_name)
62 return {
63 "success": True,
64 "event": event_name,
65 "received": True,
66 }
67 else:
68 logging.warning("Timeout waiting for event '%s'", event_name)
69 return {
70 "success": False,
71 "event": event_name,
72 "received": False,
73 "error": "Timeout waiting for event",
74 }, 408
77@blueprint.route("/api/test/sync/signal/<event_name>", methods=["POST"])
78@rasp_shutter.util.require_dummy_mode
79def signal_event(event_name: str):
80 """
81 イベント完了をシグナル
83 Args:
84 event_name: イベント名
86 Returns:
87 JSON: シグナル結果
88 """
89 event = _get_event(event_name)
90 event.set()
92 logging.info("Event '%s' signaled", event_name)
94 return {
95 "success": True,
96 "event": event_name,
97 "signaled": True,
98 }
101@blueprint.route("/api/test/sync/clear/<event_name>", methods=["POST"])
102@rasp_shutter.util.require_dummy_mode
103def clear_event(event_name: str):
104 """
105 イベントをクリア
107 Args:
108 event_name: イベント名
110 Returns:
111 JSON: クリア結果
112 """
113 event = _get_event(event_name)
114 event.clear()
116 logging.info("Event '%s' cleared", event_name)
118 return {
119 "success": True,
120 "event": event_name,
121 "cleared": True,
122 }
125@blueprint.route("/api/test/scheduler/state", methods=["GET"])
126@rasp_shutter.util.require_dummy_mode
127def get_scheduler_state():
128 """
129 スケジューラーの状態を取得
131 Returns:
132 JSON: スケジューラー状態
133 """
134 scheduler = rasp_shutter.control.scheduler.get_scheduler()
135 schedule_data = rasp_shutter.control.scheduler.get_schedule_data()
137 jobs = [
138 {
139 "next_run": job.next_run.isoformat() if job.next_run else None,
140 # partialオブジェクトの場合__name__がないためgetattr使用
141 "job_func": getattr(job.job_func, "__name__", str(job.job_func)) if job.job_func else None,
142 }
143 for job in scheduler.get_jobs()
144 ]
146 idle_sec = scheduler.idle_seconds
148 return {
149 "success": True,
150 "current_time": my_lib.time.now().isoformat(),
151 "idle_seconds": idle_sec,
152 "job_count": len(jobs),
153 "jobs": jobs,
154 "schedule_data": schedule_data,
155 }
158@blueprint.route("/api/test/scheduler/trigger", methods=["POST"])
159@rasp_shutter.util.require_dummy_mode
160def trigger_scheduler():
161 """
162 スケジューラーのジョブを即座に実行
164 Query Params:
165 job_index: 実行するジョブのインデックス(省略時は全ジョブ)
167 Returns:
168 JSON: 実行結果
169 """
170 scheduler = rasp_shutter.control.scheduler.get_scheduler()
172 job_index = flask.request.args.get("job_index", type=int)
174 try:
175 jobs = scheduler.get_jobs()
177 if job_index is not None:
178 if 0 <= job_index < len(jobs):
179 jobs[job_index].run()
180 return {
181 "success": True,
182 "executed_job": job_index,
183 }
184 else:
185 return {"error": f"Invalid job index: {job_index}"}, 400
187 # 全ジョブを実行
188 executed = 0
189 for job in jobs:
190 job.run()
191 executed += 1
193 return {
194 "success": True,
195 "executed_jobs": executed,
196 }
198 except Exception as e:
199 logging.exception("Error triggering scheduler")
200 return {"error": str(e)}, 500
203@blueprint.route("/api/test/scheduler/wait_auto_control", methods=["POST"])
204@rasp_shutter.util.require_dummy_mode
205def wait_auto_control():
206 """
207 自動制御の完了を待機
209 Query Params:
210 timeout: タイムアウト秒数(デフォルト: 30)
212 Returns:
213 JSON: 待機結果
214 """
215 timeout = flask.request.args.get("timeout", 30.0, type=float)
217 success = rasp_shutter.control.scheduler.wait_for_auto_control_completion(timeout)
219 return {
220 "success": success,
221 "completed": success,
222 }
225@blueprint.route("/api/test/scheduler/loop_sequence", methods=["GET"])
226@rasp_shutter.util.require_dummy_mode
227def get_loop_sequence():
228 """
229 現在のループシーケンス番号を取得
231 Returns:
232 JSON: シーケンス番号
233 """
234 import my_lib.pytest_util
236 worker_id = my_lib.pytest_util.get_worker_id()
237 sequence = rasp_shutter.control.scheduler.get_loop_sequence()
239 logging.debug("get_loop_sequence: worker_id=%s, sequence=%d", worker_id, sequence)
241 return {
242 "success": True,
243 "sequence": sequence,
244 "worker_id": worker_id,
245 "current_time": my_lib.time.now().isoformat(),
246 }
249@blueprint.route("/api/test/scheduler/wait_loop", methods=["POST"])
250@rasp_shutter.util.require_dummy_mode
251def wait_loop():
252 """
253 指定シーケンス番号より大きくなるまで待機
255 Query Params:
256 sequence: 待機開始時のシーケンス番号
257 timeout: タイムアウト秒数(デフォルト: 10)
259 Returns:
260 JSON: 待機結果
261 """
262 sequence = flask.request.args.get("sequence", 0, type=int)
263 timeout = flask.request.args.get("timeout", 10.0, type=float)
265 success = rasp_shutter.control.scheduler.wait_for_loop_after(sequence, timeout)
266 current_sequence = rasp_shutter.control.scheduler.get_loop_sequence()
268 if success:
269 return {
270 "success": True,
271 "waited": True,
272 "start_sequence": sequence,
273 "current_sequence": current_sequence,
274 "current_time": my_lib.time.now().isoformat(),
275 }
276 else:
277 return {
278 "success": False,
279 "waited": False,
280 "start_sequence": sequence,
281 "current_sequence": current_sequence,
282 "error": "Timeout waiting for scheduler loop",
283 }, 408
286@blueprint.route("/api/test/state/shutter", methods=["GET"])
287@rasp_shutter.util.require_dummy_mode
288def get_shutter_state():
289 """
290 シャッターの詳細状態を取得
292 Returns:
293 JSON: シャッター状態
294 """
295 config = flask.current_app.config["CONFIG"]
297 states = []
298 for index, shutter in enumerate(config.shutter):
299 open_elapsed = my_lib.footprint.elapsed(
300 rasp_shutter.control.webapi.control.exec_stat_file("open", index)
301 )
302 close_elapsed = my_lib.footprint.elapsed(
303 rasp_shutter.control.webapi.control.exec_stat_file("close", index)
304 )
306 states.append(
307 {
308 "index": index,
309 "name": shutter.name if hasattr(shutter, "name") else f"shutter_{index}",
310 "open_elapsed_sec": open_elapsed,
311 "close_elapsed_sec": close_elapsed,
312 }
313 )
315 pending_open = my_lib.footprint.exists(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
316 pending_open_elapsed = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_PENDING_OPEN.to_path())
318 auto_close_elapsed = my_lib.footprint.elapsed(rasp_shutter.control.config.STAT_AUTO_CLOSE.to_path())
320 return {
321 "success": True,
322 "current_time": my_lib.time.now().isoformat(),
323 "shutters": states,
324 "pending_open": pending_open,
325 "pending_open_elapsed_sec": pending_open_elapsed,
326 "auto_close_elapsed_sec": auto_close_elapsed,
327 }
330@blueprint.route("/api/test/state/reset", methods=["POST"])
331@rasp_shutter.util.require_dummy_mode
332def reset_test_state():
333 """
334 テスト状態をリセット
336 Returns:
337 JSON: リセット結果
338 """
339 config = flask.current_app.config["CONFIG"]
341 # 制御統計をクリア
342 rasp_shutter.control.webapi.control.clean_stat_exec(config)
344 # 自動制御状態をクリア
345 rasp_shutter.control.config.STAT_AUTO_CLOSE.unlink(missing_ok=True)
346 rasp_shutter.control.config.STAT_PENDING_OPEN.unlink(missing_ok=True)
348 # Slack通知履歴をクリア
349 my_lib.notify.slack._interval_clear()
350 my_lib.notify.slack._hist_clear()
352 # 制御ログをクリア
353 rasp_shutter.control.webapi.control.cmd_hist.clear()
355 # イベントをクリア
356 with _events_lock:
357 for event in _events.values():
358 event.clear()
360 logging.info("Test state reset completed")
362 return {
363 "success": True,
364 "message": "Test state reset completed",
365 }
368@blueprint.route("/api/ctrl/clear", methods=["POST"])
369@rasp_shutter.util.require_dummy_mode
370def clear_control_log():
371 """
372 制御ログをクリア(E2Eテスト用)
374 Returns:
375 JSON: クリア結果
376 """
377 rasp_shutter.control.webapi.control.cmd_hist.clear()
379 return {
380 "success": True,
381 "message": "Control log cleared",
382 }
385@blueprint.route("/api/test/schedule/reset", methods=["POST"])
386@rasp_shutter.util.require_dummy_mode
387def reset_schedule():
388 """
389 スケジュールをリセット(E2Eテスト用)
391 スケジューラのジョブとスケジュールデータをクリアして、
392 テスト間の干渉を防ぐ。
394 Returns:
395 JSON: リセット結果
396 """
397 rasp_shutter.control.scheduler.clear_scheduler_jobs()
399 logging.info("Schedule reset completed")
401 return {
402 "success": True,
403 "message": "Schedule reset completed",
404 }
407@blueprint.route("/api/test/wait_condition", methods=["POST"])
408@rasp_shutter.util.require_dummy_mode
409def wait_condition():
410 """
411 条件が満たされるまで待機
413 JSON Body:
414 type: 条件タイプ ("ctrl_log_count", "app_log_count", "shutter_state")
415 value: 期待値
416 timeout: タイムアウト秒数
418 Returns:
419 JSON: 待機結果
420 """
421 data = flask.request.get_json()
422 if not data:
423 return {"error": "JSON body required"}, 400
425 condition_type = data.get("type")
426 expected_value = data.get("value")
427 timeout = data.get("timeout", 10.0)
429 if not condition_type or expected_value is None:
430 return {"error": "type and value are required"}, 400
432 # NOTE: time.perf_counter() を使用(time_machine の影響を受けない)
433 start_time = time.perf_counter()
434 poll_interval = 0.1
435 current_value = None
437 while time.perf_counter() - start_time < timeout:
438 if condition_type == "ctrl_log_count":
439 current_value = len(rasp_shutter.control.webapi.control.cmd_hist)
440 elif condition_type == "shutter_state":
441 # TODO: シャッター状態のチェック実装
442 current_value = None
443 else:
444 current_value = None
446 if current_value is not None and current_value >= expected_value:
447 return {
448 "success": True,
449 "condition_met": True,
450 "current_value": current_value,
451 "elapsed_sec": time.perf_counter() - start_time,
452 }
454 time.sleep(poll_interval)
456 return {
457 "success": False,
458 "condition_met": False,
459 "current_value": current_value,
460 "elapsed_sec": timeout,
461 "error": "Timeout waiting for condition",
462 }, 408