Coverage for src/unit_cooler/actuator/control.py: 97%

46 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 08:08 +0000

1#!/usr/bin/env python3 

2import logging 

3import os 

4 

5import my_lib.time 

6 

7import unit_cooler.actuator.valve 

8import unit_cooler.const 

9import unit_cooler.util 

10 

11HAZARD_NOTIFY_INTERVAL_MIN = 30 

12 

13 

14def gen_handle(config, message_queue): 

15 return { 

16 "config": config, 

17 "message_queue": message_queue, 

18 "receive_time": my_lib.time.now(), 

19 "receive_count": 0, 

20 } 

21 

22 

23def hazard_register(config): 

24 my_lib.footprint.update(config["actuator"]["control"]["hazard"]["file"]) 

25 

26 

27def hazard_clear(config): 

28 my_lib.footprint.clear(config["actuator"]["control"]["hazard"]["file"]) 

29 

30 

31def hazard_notify(config, message): 

32 if ( 

33 my_lib.footprint.elapsed(config["actuator"]["control"]["hazard"]["file"]) / 60 

34 > HAZARD_NOTIFY_INTERVAL_MIN 

35 ): 

36 unit_cooler.actuator.work_log.add(message, unit_cooler.const.LOG_LEVEL.ERROR) 

37 

38 hazard_register(config) 

39 

40 unit_cooler.actuator.valve.set_state(unit_cooler.const.VALVE_STATE.CLOSE) 

41 

42 

43def hazard_check(config): 

44 if my_lib.footprint.exists(config["actuator"]["control"]["hazard"]["file"]): 

45 hazard_notify(config, "過去に水漏れもしくは電磁弁の故障が検出されているので制御を停止しています。") 

46 return True 

47 else: 

48 return False 

49 

50 

51def get_control_message_impl(handle, last_message): 

52 if handle["message_queue"].empty(): 

53 if (my_lib.time.now() - handle["receive_time"]).total_seconds() > handle["config"]["controller"][ 

54 "interval_sec" 

55 ] * 3: 

56 unit_cooler.actuator.work_log.add( 

57 "冷却モードの指示を受信できません。", unit_cooler.const.LOG_LEVEL.ERROR 

58 ) 

59 

60 return last_message 

61 

62 control_message = None 

63 while not handle["message_queue"].empty(): 63 ↛ 75line 63 didn't jump to line 75 because the condition on line 63 was always true

64 control_message = handle["message_queue"].get() 

65 

66 logging.info("Receive: %s", control_message) 

67 

68 handle["receive_time"] = my_lib.time.now() 

69 handle["receive_count"] += 1 

70 if os.environ.get("TEST", "false") == "true": 70 ↛ 63line 70 didn't jump to line 63 because the condition on line 70 was always true

71 # NOTE: テスト時は、コマンドの数を整合させたいので、 

72 # 1 回に1個のコマンドのみ処理する。 

73 break 

74 

75 if control_message["mode_index"] != last_message["mode_index"]: 

76 unit_cooler.actuator.work_log.add( 

77 ("冷却モードが変更されました。({before} → {after})").format( 

78 before="init" if last_message["mode_index"] == -1 else last_message["mode_index"], 

79 after=control_message["mode_index"], 

80 ) 

81 ) 

82 

83 return control_message 

84 

85 

86def get_control_message(handle, last_message): 

87 try: 

88 return get_control_message_impl(handle, last_message) 

89 except OverflowError: # pragma: no cover 

90 # NOTE: テストする際、timemachinefreezer 使って日付をいじるとこの例外が発生する 

91 logging.exception("Failed to get control message") 

92 return last_message 

93 

94 

95def execute(config, control_message): 

96 if hazard_check(config): 

97 control_message = {"mode_index": 0, "state": unit_cooler.const.COOLING_STATE.IDLE} 

98 

99 unit_cooler.actuator.valve.set_cooling_state(control_message)