Coverage for src/unit_cooler/actuator/valve.py: 95%
111 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 14:35 +0000
1#!/usr/bin/env python3
2"""
3電磁弁を可変デューティ制御します。
5Usage:
6 valve.py [-c CONFIG] [-D]
8Options:
9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml]
10 -D : デバッグモードで動作します。
11"""
13import logging
14import pathlib
15import threading
16import time
18import my_lib.footprint
19import my_lib.rpi
21import unit_cooler.actuator.work_log
22import unit_cooler.const
24STAT_DIR_PATH = pathlib.Path("/dev/shm") # noqa: S108
26# STATE が WORKING になった際に作られるファイル。Duty 制御している場合、
27# OFF Duty から ON Duty に遷移する度に変更日時が更新される。
28# STATE が IDLE になった際に削除される。
29# (OFF Duty になって実際にバルブを閉じただけでは削除されない)
30STAT_PATH_VALVE_STATE_WORKING = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "working"
32# STATE が IDLE になった際に作られるファイル。
33# (OFF Duty になって実際にバルブを閉じただけでは作られない)
34# STATE が WORKING になった際に削除される。
35STAT_PATH_VALVE_STATE_IDLE = STAT_DIR_PATH / "unit_cooler" / "valve" / "state" / "idle"
37# 実際にバルブを開いた際に作られるファイル。
38# 実際にバルブを閉じた際に削除される。
39STAT_PATH_VALVE_OPEN = STAT_DIR_PATH / "unit_cooler" / "valve" / "open"
41# 実際にバルブを閉じた際に作られるファイル。
42# 実際にバルブを開いた際に削除される。
43STAT_PATH_VALVE_CLOSE = STAT_DIR_PATH / "unit_cooler" / "valve" / "close"
45pin_no = None
46valve_lock = None
47ctrl_hist = []
48config = None
51def init(pin, valve_config):
52 global pin_no # noqa: PLW0603
53 global valve_lock # noqa: PLW0603
54 global config # noqa: PLW0603
56 pin_no = pin
57 valve_lock = threading.Lock()
58 config = valve_config
60 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING)
61 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE)
63 my_lib.rpi.gpio.setwarnings(False)
64 my_lib.rpi.gpio.setmode(my_lib.rpi.gpio.BCM)
65 my_lib.rpi.gpio.setup(pin_no, my_lib.rpi.gpio.OUT)
67 set_state(unit_cooler.const.VALVE_STATE.CLOSE)
70# NOTE: テスト用
71def clear_stat():
72 global ctrl_hist # noqa: PLW0603
74 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING)
75 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE)
76 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN)
77 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE)
78 ctrl_hist = []
81# NOTE: テスト用
82def get_hist():
83 global ctrl_hist
85 return ctrl_hist
88# NOTE: 実際にバルブを開きます。
89# 現在のバルブの状態と、バルブが現在の状態になってからの経過時間を返します。
90def set_state(valve_state):
91 global pin_no
92 global valve_lock
93 global ctrl_hist
95 with valve_lock:
96 curr_state = get_state()
98 if valve_state != curr_state:
99 logging.info("VALVE: %s -> %s", curr_state.name, valve_state.name)
100 # NOTE: テスト時のみ履歴を記録
101 import os
103 if os.environ.get("TEST") == "true": 103 ↛ 107line 103 didn't jump to line 107 because the condition on line 103 was always true
104 ctrl_hist.append(curr_state)
106 # メトリクス記録
107 try:
108 from unit_cooler.metrics import get_metrics_collector
110 global config
112 if config and "actuator" in config and "metrics" in config["actuator"]: 112 ↛ 119line 112 didn't jump to line 119 because the condition on line 112 was always true
113 metrics_db_path = config["actuator"]["metrics"]["data"]
114 metrics_collector = get_metrics_collector(metrics_db_path)
115 metrics_collector.record_valve_operation()
116 except Exception:
117 logging.debug("Failed to record valve operation metrics")
119 my_lib.rpi.gpio.output(pin_no, valve_state.value)
121 if valve_state == unit_cooler.const.VALVE_STATE.OPEN:
122 my_lib.footprint.clear(STAT_PATH_VALVE_CLOSE)
123 if not my_lib.footprint.exists(STAT_PATH_VALVE_OPEN):
124 my_lib.footprint.update(STAT_PATH_VALVE_OPEN)
125 else:
126 my_lib.footprint.clear(STAT_PATH_VALVE_OPEN)
127 if not my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE):
128 my_lib.footprint.update(STAT_PATH_VALVE_CLOSE)
130 return get_status()
133# NOTE: 実際のバルブの状態を返します
134def get_state():
135 global pin_no
137 if my_lib.rpi.gpio.input(pin_no) == 1:
138 return unit_cooler.const.VALVE_STATE.OPEN
139 else:
140 return unit_cooler.const.VALVE_STATE.CLOSE
143# NOTE: 実際のバルブの状態と、その状態になってからの経過時間を返します
144def get_status():
145 global valve_lock
147 with valve_lock:
148 valve_state = get_state()
150 if valve_state == unit_cooler.const.VALVE_STATE.OPEN:
151 assert my_lib.footprint.exists(STAT_PATH_VALVE_OPEN) # noqa: S101
153 return {
154 "state": valve_state,
155 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_OPEN),
156 }
157 else: # noqa: PLR5501
158 if my_lib.footprint.exists(STAT_PATH_VALVE_CLOSE): 158 ↛ 164line 158 didn't jump to line 164 because the condition on line 158 was always true
159 return {
160 "state": valve_state,
161 "duration": my_lib.footprint.elapsed(STAT_PATH_VALVE_CLOSE),
162 }
163 else:
164 return {"state": valve_state, "duration": 0}
167# NOTE: バルブを動作状態にします。
168# Duty 制御を実現するため、OFF Duty 期間の場合はバルブを閉じます。
169# 実際にバルブを開いてからの経過時間を返します。
170# duty_info = { "enable": bool, "on": on_sec, "off": off_sec }
171def set_cooling_working(duty_info):
172 logging.debug(duty_info)
174 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_IDLE)
176 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_WORKING):
177 my_lib.footprint.update(STAT_PATH_VALVE_STATE_WORKING)
178 unit_cooler.actuator.work_log.add("冷却を開始します。")
179 logging.info("COOLING: IDLE -> WORKING")
180 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
182 if not duty_info["enable"]:
183 # NOTE Duty 制御しない場合
184 logging.info("COOLING: WORKING")
185 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
187 status = get_status()
189 if status["state"] == unit_cooler.const.VALVE_STATE.OPEN:
190 # NOTE: 現在バルブが開かれている
191 if status["duration"] >= duty_info["on_sec"]:
192 logging.info("COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"])
193 unit_cooler.actuator.work_log.add("OFF Duty になったのでバルブを締めます。")
194 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
195 else:
196 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"] - status["duration"])
198 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
199 else: # noqa: PLR5501
200 # NOTE: 現在バルブが閉じられている
201 if status["duration"] >= duty_info["off_sec"]:
202 logging.info("COOLING: WORKING (ON duty, %d sec left)", duty_info["on_sec"])
203 unit_cooler.actuator.work_log.add("ON Duty になったのでバルブを開けます。")
204 return set_state(unit_cooler.const.VALVE_STATE.OPEN)
205 else:
206 logging.info(
207 "COOLING: WORKING (OFF duty, %d sec left)", duty_info["off_sec"] - status["duration"]
208 )
209 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
212def set_cooling_idle():
213 my_lib.footprint.clear(STAT_PATH_VALVE_STATE_WORKING)
215 if not my_lib.footprint.exists(STAT_PATH_VALVE_STATE_IDLE):
216 my_lib.footprint.update(STAT_PATH_VALVE_STATE_IDLE)
217 unit_cooler.actuator.work_log.add("冷却を停止しました。")
218 logging.info("COOLING: WORKING -> IDLE")
219 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
220 else:
221 logging.info("COOLING: IDLE")
222 return set_state(unit_cooler.const.VALVE_STATE.CLOSE)
225def set_cooling_state(control_message):
226 if control_message["state"] == unit_cooler.const.COOLING_STATE.WORKING:
227 return set_cooling_working(control_message["duty"])
228 else:
229 return set_cooling_idle()
232if __name__ == "__main__":
233 # TEST Code
234 import multiprocessing
236 import docopt
237 import my_lib.config
238 import my_lib.logger
239 import my_lib.pretty
241 args = docopt.docopt(__doc__)
243 config_file = args["-c"]
244 debug_mode = args["-D"]
246 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
248 config = my_lib.config.load(config_file)
249 event_queue = multiprocessing.Queue()
251 my_lib.webapp.config.init(config["actuator"])
252 my_lib.webapp.log.init(config)
253 unit_cooler.actuator.work_log.init(config, event_queue)
254 init(config["actuator"]["control"]["valve"]["pin_no"])
256 while True:
257 set_cooling_state(
258 {
259 "state": unit_cooler.const.COOLING_STATE.WORKING,
260 "mode_index": 1,
261 "duty": {"enable": True, "on_sec": 1, "off_sec": 3},
262 }
263 )
264 time.sleep(1)