Coverage for src/unit_cooler/actuator/valve.py: 97%
101 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 08:08 +0000
1#!/usr/bin/env python3
2"""
3電磁弁を可変デューティ制御します。
5Usage:
6 valve.py [-c CONFIG] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml]
10 -D : デバッグモードで動作します。
11"""
13import logging
14import pathlib
15import threading
16import time
18import my_lib.footprint
19import my_lib.rpi
21import unit_cooler.actuator.work_log
22import unit_cooler.const
24STAT_DIR_PATH = pathlib.Path("/dev/shm") # noqa: S108
26# STATE が WORKING になった際に作られるファイル。Duty 制御している場合、
27# OFF Duty から ON Duty に遷移する度に変更日時が更新される。
28# STATE が IDLE になった際に削除される。
29# (OFF Duty になって実際にバルブを閉じただけでは削除されない)
30STAT_PATH_VALVE_STATE_WORKING = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "working"
32# STATE が IDLE になった際に作られるファイル。
33# (OFF Duty になって実際にバルブを閉じただけでは作られない)
34# STATE が WORKING になった際に削除される。
35STAT_PATH_VALVE_STATE_IDLE = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "idle"
37# 実際にバルブを開いた際に作られるファイル。
38# 実際にバルブを閉じた際に削除される。
39STAT_PATH_VALVE_OPEN = STAT_DIR_PATH / "unit_cooler" / "valve" / "open"
41# 実際にバルブを閉じた際に作られるファイル。
42# 実際にバルブを開いた際に削除される。
43STAT_PATH_VALVE_CLOSE = STAT_DIR_PATH / "unit_cooler" / "valve" / "close"
45pin_no = None
46valve_lock = None
47ctrl_hist = []
50def init(pin):
51 global pin_no # noqa: PLW0603
52 global valve_lock # noqa: PLW0603
54 pin_no = pin
55 valve_lock = threading.Lock()
57 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING)
58 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE)
60 my_lib.rpi.gpio.setwarnings(False)
61 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM)
62 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT)
64 set_state(unit_cooler.const.VALVE_STATE.CLOSE)
67# NOTE: テスト用
68def clear_stat():
69 global ctrl_hist # noqa: PLW0603
71 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING)
72 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE)
73 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN)
74 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE)
75 ctrl_hist = []
78# NOTE: テスト用
79def get_hist():
80 global ctrl_hist
82 return ctrl_hist
85# NOTE: 実際にバルブを開きます。
86# 現在のバルブの状態と、バルブが現在の状態になってからの経過時間を返します。
87def set_state(valve_state):
88 global pin_no
89 global valve_lock
90 global ctrl_hist
92 with valve_lock:
93 curr_state = get_state()
95 if valve_state != curr_state:
96 logging.info("VALVE: %s -> %s", curr_state.name, valve_state.name)
97 # NOTE: テスト時のみ履歴を記録
98 import os
100 if os.environ.get("TEST") == "true": 100 ↛ 103line 100 didn't jump to line 103 because the condition on line 100 was always true
101 ctrl_hist.append(curr_state)
103 my_lib.rpi.gpio.output(pin_no, valve_state.value)
105 if valve_state == unit_cooler.const.VALVE_STATE.OPEN:
106 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE)
107 if not my_lib.footprint.exists(STAT_PATH_VALVE_OPEN):
108 my_lib.footprint.update(STAT_PATH_VALVE_OPEN)
109 else:
110 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN)
111 if not my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE):
112 my_lib.footprint.update(STAT_PATH_VALVE_CLOSE)
114 return get_status()
117# NOTE: 実際のバルブの状態を返します
118def get_state():
119 global pin_no
121 if my_lib.rpi.gpio.input(pin_no) == 1:
122 return unit_cooler.const.VALVE_STATE.OPEN
123 else:
124 return unit_cooler.const.VALVE_STATE.CLOSE
127# NOTE: 実際のバルブの状態と、その状態になってからの経過時間を返します
128def get_status():
129 global valve_lock
131 with valve_lock:
132 valve_state = get_state()
134 if valve_state == unit_cooler.const.VALVE_STATE.OPEN:
135 assert my_lib.footprint.exists(STAT_PATH_VALVE_OPEN) # noqa: S101
137 return {
138 "state": valve_state,
139 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_OPEN),
140 }
141 else: # noqa: PLR5501
142 if my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 142 ↛ 148line 142 didn't jump to line 148 because the condition on line 142 was always true
143 return {
144 "state": valve_state,
145 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_CLOSE),
146 }
147 else:
148 return {"state": valve_state, "duration": 0}
151# NOTE: バルブを動作状態にします。
152# Duty 制御を実現するため、OFF Duty 期間の場合はバルブを閉じます。
153# 実際にバルブを開いてからの経過時間を返します。
154# duty_info = { "enable": bool, "on": on_sec, "off": off_sec }
155def set_cooling_working(duty_info):
156 logging.debug(duty_info)
158 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE)
160 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_WORKING):
161 my_lib.footprint.update(STAT_PATH_VALVE_STATE_WORKING)
162 unit_cooler.actuator.work_log.add("冷却を開始します。")
163 logging.info("COOLING: IDLE -> WORKING")
164 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
166 if not duty_info["enable"]:
167 # NOTE Duty 制御しない場合
168 logging.info("COOLING: WORKING")
169 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
171 status = get_status()
173 if status["state"] == unit_cooler.const.VALVE_STATE.OPEN:
174 # NOTE: 現在バルブが開かれている
175 if status["duration"] >= duty_info["on_sec"]:
176 logging.info("COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"])
177 unit_cooler.actuator.work_log.add("OFF Duty になったのでバルブを締めます。")
178 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
179 else:
180 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"] - status["duration"])
182 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
183 else: # noqa: PLR5501
184 # NOTE: 現在バルブが閉じられている
185 if status["duration"] >= duty_info["off_sec"]:
186 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"])
187 unit_cooler.actuator.work_log.add("ON Duty になったのでバルブを開けます。")
188 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
189 else:
190 logging.info(
191 "COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"] - status["duration"]
192 )
193 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
196def set_cooling_idle():
197 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING)
199 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_IDLE):
200 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE)
201 unit_cooler.actuator.work_log.add("冷却を停止しました。")
202 logging.info("COOLING: WORKING -> IDLE")
203 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
204 else:
205 logging.info("COOLING: IDLE")
206 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
209def set_cooling_state(control_message):
210 if control_message["state"] == unit_cooler.const.COOLING_STATE.WORKING:
211 return set_cooling_working(control_message["duty"])
212 else:
213 return set_cooling_idle()
216if __name__ == "__main__":
217 # TEST Code
218 import multiprocessing
220 import docopt
221 import my_lib.config
222 import my_lib.logger
223 import my_lib.pretty
225 args = docopt.docopt(__doc__)
227 config_file = args["-c"]
228 debug_mode = args["-D"]
230 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
232 config = my_lib.config.load(config_file)
233 event_queue = multiprocessing.Queue()
235 my_lib.webapp.config.init(config["actuator"])
236 my_lib.webapp.log.init(config)
237 unit_cooler.actuator.work_log.init(config, event_queue)
238 init(config["actuator"]["control"]["valve"]["pin_no"])
240 while True:
241 set_cooling_state(
242 {
243 "state": unit_cooler.const.COOLING_STATE.WORKING,
244 "mode_index": 1,
245 "duty": {"enable": True, "on_sec": 1, "off_sec": 3},
246 }
247 )
248 time.sleep(1)