Coverage for src/rasp_aqua/control.py: 92%

59 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-24 19:24 +0900

1#!/usr/bin/env python3 

2 

3import datetime 

4import logging 

5import pathlib 

6from multiprocessing import Queue 

7 

8import my_lib.rpi 

9import rasp_aqua.scheduler 

10import rasp_aqua.valve 

11 

12 

13def check_time_in_range(start, end): 

14 timezone = datetime.timezone(datetime.timedelta(hours=rasp_aqua.scheduler.timezone["offset"])) 

15 time_curr = datetime.datetime.now(tz=timezone).time() 

16 

17 time_start = datetime.datetime.strptime(start, "%H:%M").replace(tzinfo=timezone).time() 

18 time_end = datetime.datetime.strptime(end, "%H:%M").replace(tzinfo=timezone).time() 

19 

20 if time_start <= time_end: 

21 if time_start <= time_curr <= time_end: 

22 return (True, 0) 

23 else: 

24 return (False, 0) 

25 else: # noqa: PLR5501 

26 if time_curr >= time_start: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 return (True, 1) 

28 elif time_curr <= time_end: 

29 return (True, 2) 

30 else: 

31 return (False, 1) 

32 

33 

34# NOTE: 現在時刻に基づいてバルブの状態を設定する 

35def init_valve(config): 

36 logging.info( 

37 "Now is %s", 

38 datetime.datetime.now( 

39 tz=datetime.timezone(datetime.timedelta(hours=rasp_aqua.scheduler.timezone["offset"])) 

40 ).strftime("%Y-%m-%d %H:%M"), 

41 ) 

42 

43 for target in ["air", "co2"]: 

44 judge = check_time_in_range( 

45 config["valve"][target]["control"]["on"], config["valve"][target]["control"]["off"] 

46 ) 

47 if judge[0]: 

48 mode = "on" 

49 

50 if judge[1] == 0: 

51 reason = "now is between {start}-{end}".format( 

52 start=config["valve"][target]["control"]["on"], 

53 end=config["valve"][target]["control"]["off"], 

54 ) 

55 elif judge[1] == 1: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 reason = "now is after {start}".format( 

57 start=config["valve"][target]["control"]["on"], 

58 ) 

59 elif judge[1] == 2: 59 ↛ 78line 59 didn't jump to line 78 because the condition on line 59 was always true

60 reason = "now is before {end}".format( 

61 end=config["valve"][target]["control"]["off"], 

62 ) 

63 

64 else: 

65 mode = "off" 

66 

67 if judge[1] == 0: 

68 reason = "now is not between {start}-{end}".format( 

69 start=config["valve"][target]["control"]["on"], 

70 end=config["valve"][target]["control"]["off"], 

71 ) 

72 else: 

73 reason = "now is after {end} and before {start}".format( 

74 start=config["valve"][target]["control"]["on"], 

75 end=config["valve"][target]["control"]["off"], 

76 ) 

77 

78 logging.info("initialize %s %s, because %s", target, mode.upper(), reason) 

79 rasp_aqua.valve.control( 

80 rasp_aqua.valve.TARGET[target.upper()], 

81 my_lib.rpi.gpio.level[config["valve"][target]["mode"][mode]], 

82 ) 

83 

84 

85def set_schedule(config, queue): 

86 task_list = [] 

87 for target in ["air", "co2"]: 

88 for mode in ["on", "off"]: 

89 task_list.append( # noqa: PERF401 

90 { 

91 "name": f"{target} {mode}", 

92 "time": config["valve"][target]["control"][mode], 

93 "func": rasp_aqua.valve.control, 

94 "args": ( 

95 rasp_aqua.valve.TARGET[target.upper()], 

96 my_lib.rpi.gpio.level[config["valve"][target]["mode"][mode]], 

97 ), 

98 } 

99 ) 

100 

101 queue.put(task_list) 

102 

103 

104def execute(config, check_interval_sec=10): 

105 rasp_aqua.valve.init(config["valve"]["air"]["gpio"], config["valve"]["co2"]["gpio"]) 

106 

107 queue = Queue() 

108 

109 rasp_aqua.scheduler.init( 

110 config["timezone"], queue, pathlib.Path(config["liveness"]["file"]["scheduler"]), check_interval_sec 

111 ) 

112 

113 init_valve(config) 

114 set_schedule(config, queue) 

115 

116 

117def term(): 

118 if rasp_aqua.scheduler.worker is None: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 return 

120 

121 rasp_aqua.scheduler.should_terminate.set() 

122 rasp_aqua.scheduler.worker.join() 

123 rasp_aqua.scheduler.worker = None 

124 

125 rasp_aqua.scheduler.should_terminate.clear() 

126 

127 my_lib.rpi.gpio.cleanup()