Coverage for src/rasp_aqua/control.py: 92%
59 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-24 19:24 +0900
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-24 19:24 +0900
1#!/usr/bin/env python3
3import datetime
4import logging
5import pathlib
6from multiprocessing import Queue
8import my_lib.rpi
9import rasp_aqua.scheduler
10import rasp_aqua.valve
13def check_time_in_range(start, end):
14 timezone = datetime.timezone(datetime.timedelta(hours=rasp_aqua.scheduler.timezone["offset"]))
15 time_curr = datetime.datetime.now(tz=timezone).time()
17 time_start = datetime.datetime.strptime(start, "%H:%M").replace(tzinfo=timezone).time()
18 time_end = datetime.datetime.strptime(end, "%H:%M").replace(tzinfo=timezone).time()
20 if time_start <= time_end:
21 if time_start <= time_curr <= time_end:
22 return (True, 0)
23 else:
24 return (False, 0)
25 else: # noqa: PLR5501
26 if time_curr >= time_start: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 return (True, 1)
28 elif time_curr <= time_end:
29 return (True, 2)
30 else:
31 return (False, 1)
34# NOTE: 現在時刻に基づいてバルブの状態を設定する
35def init_valve(config):
36 logging.info(
37 "Now is %s",
38 datetime.datetime.now(
39 tz=datetime.timezone(datetime.timedelta(hours=rasp_aqua.scheduler.timezone["offset"]))
40 ).strftime("%Y-%m-%d %H:%M"),
41 )
43 for target in ["air", "co2"]:
44 judge = check_time_in_range(
45 config["valve"][target]["control"]["on"], config["valve"][target]["control"]["off"]
46 )
47 if judge[0]:
48 mode = "on"
50 if judge[1] == 0:
51 reason = "now is between {start}-{end}".format(
52 start=config["valve"][target]["control"]["on"],
53 end=config["valve"][target]["control"]["off"],
54 )
55 elif judge[1] == 1: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 reason = "now is after {start}".format(
57 start=config["valve"][target]["control"]["on"],
58 )
59 elif judge[1] == 2: 59 ↛ 78line 59 didn't jump to line 78 because the condition on line 59 was always true
60 reason = "now is before {end}".format(
61 end=config["valve"][target]["control"]["off"],
62 )
64 else:
65 mode = "off"
67 if judge[1] == 0:
68 reason = "now is not between {start}-{end}".format(
69 start=config["valve"][target]["control"]["on"],
70 end=config["valve"][target]["control"]["off"],
71 )
72 else:
73 reason = "now is after {end} and before {start}".format(
74 start=config["valve"][target]["control"]["on"],
75 end=config["valve"][target]["control"]["off"],
76 )
78 logging.info("initialize %s %s, because %s", target, mode.upper(), reason)
79 rasp_aqua.valve.control(
80 rasp_aqua.valve.TARGET[target.upper()],
81 my_lib.rpi.gpio.level[config["valve"][target]["mode"][mode]],
82 )
85def set_schedule(config, queue):
86 task_list = []
87 for target in ["air", "co2"]:
88 for mode in ["on", "off"]:
89 task_list.append( # noqa: PERF401
90 {
91 "name": f"{target} {mode}",
92 "time": config["valve"][target]["control"][mode],
93 "func": rasp_aqua.valve.control,
94 "args": (
95 rasp_aqua.valve.TARGET[target.upper()],
96 my_lib.rpi.gpio.level[config["valve"][target]["mode"][mode]],
97 ),
98 }
99 )
101 queue.put(task_list)
104def execute(config, check_interval_sec=10):
105 rasp_aqua.valve.init(config["valve"]["air"]["gpio"], config["valve"]["co2"]["gpio"])
107 queue = Queue()
109 rasp_aqua.scheduler.init(
110 config["timezone"], queue, pathlib.Path(config["liveness"]["file"]["scheduler"]), check_interval_sec
111 )
113 init_valve(config)
114 set_schedule(config, queue)
117def term():
118 if rasp_aqua.scheduler.worker is None: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 return
121 rasp_aqua.scheduler.should_terminate.set()
122 rasp_aqua.scheduler.worker.join()
123 rasp_aqua.scheduler.worker = None
125 rasp_aqua.scheduler.should_terminate.clear()
127 my_lib.rpi.gpio.cleanup()