Coverage for src/unit_cooler/actuator/sensor.py: 85%

48 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 12:29 +0000

1#!/usr/bin/env python3 

2import logging 

3import os 

4import random 

5 

6import my_lib.rpi 

7 

8import unit_cooler.const 

9 

10fd_q10c = None 

11pin_no = None 

12 

13if os.environ.get("DUMMY_MODE", "false") != "true": # pragma: no cover 

14 from my_lib.sensor.fd_q10c import FD_Q10C 

15else: 

16 

17 class FD_Q10C: # noqa: N801 

18 # ワーカーごとの電源状態を管理する辞書(初期値はTrue) 

19 _power_states = {} 

20 

21 def __init__(self, lock_file="DUMMY", timeout=2): # noqa: D107, ARG002 

22 worker_id = self._get_worker_id() 

23 self._power_states[worker_id] = True 

24 

25 def _get_worker_id(self): 

26 """現在のワーカーIDを取得""" 

27 return os.environ.get("PYTEST_XDIST_WORKER", "") 

28 

29 def get_value(self, force_power_on=True): 

30 global pin_no 

31 worker_id = self._get_worker_id() 

32 

33 # force_power_on=Trueで呼ばれた場合、電源状態をTrueに設定 

34 if force_power_on: 34 ↛ 37line 34 didn't jump to line 37 because the condition on line 34 was always true

35 self._power_states[worker_id] = True 

36 

37 if my_lib.rpi.gpio.input(pin_no) == unit_cooler.const.VALVE_STATE.OPEN.value: 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true

38 return 1 + random.random() * 1.5 # noqa: S311 

39 else: 

40 return 0 

41 

42 def get_state(self): 

43 worker_id = self._get_worker_id() 

44 

45 return self._power_states[worker_id] 

46 

47 def stop(self): 

48 worker_id = self._get_worker_id() 

49 # stopが呼ばれたら電源状態をFalseに設定 

50 self._power_states[worker_id] = False 

51 

52 

53def init(pin_no_): 

54 global fd_q10c # noqa: PLW0603 

55 global pin_no # noqa: PLW0603 

56 

57 pin_no = pin_no_ 

58 fd_q10c = FD_Q10C() 

59 

60 

61def stop(): 

62 global fd_q10c 

63 

64 logging.info("Stop flow sensing") 

65 

66 try: 

67 fd_q10c.stop() 

68 except RuntimeError: 

69 logging.exception("Failed to stop FD-Q10C") 

70 

71 

72def get_power_state(): 

73 global fd_q10c 

74 

75 return fd_q10c.get_state() 

76 

77 

78def get_flow(force_power_on=True): 

79 global fd_q10c 

80 

81 try: 

82 flow = fd_q10c.get_value(force_power_on) 

83 except Exception: 

84 logging.exception("バグの可能性あり。") 

85 flow = None 

86 

87 if flow is not None: 

88 logging.info("Valve flow = %.2f", flow) 

89 else: 

90 logging.info("Valve flow = UNKNOWN") 

91 

92 return flow