Coverage for src/unit_cooler/controller/engine.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 08:08 +0000

1#!/usr/bin/env python3 

2""" 

3屋外の気象情報とエアコンの稼働状況に基づき、冷却モードを決定します。 

4 

5Usage: 

6 engine.py [-c CONFIG] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml] 

10 -D : デバッグモードで動作します。 

11""" 

12 

13import copy 

14import logging 

15 

16import my_lib.notify.slack 

17 

18import unit_cooler.controller.message 

19import unit_cooler.controller.sensor 

20import unit_cooler.util 

21 

22# 最低でもこの時間は ON にする (テスト時含む) 

23ON_SEC_MIN = 5 

24# 最低でもこの時間は OFF にする (テスト時含む) 

25OFF_SEC_MIN = 5 

26 

27 

28def dummy_cooling_mode(): 

29 import random 

30 

31 current_mode = dummy_cooling_mode.prev_mode 

32 max_mode = len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1 

33 

34 # 60%の確率で現状維持、40%の確率で変更 

35 if random.random() < 0.6: # noqa: S311 

36 cooling_mode = current_mode 

37 elif current_mode == 1: 

38 # モード1の場合、特別な処理 

39 if random.random() < 0.1: # noqa: S311 # 10%の確率で0へ 

40 cooling_mode = 0 

41 elif random.random() < 0.5: # noqa: S311 # 残り90%のうち50%で+1 

42 cooling_mode = min(current_mode + 1, max_mode) 

43 else: # 残り90%のうち50%で現状維持 

44 cooling_mode = current_mode 

45 elif current_mode == 0: 

46 # モード0の場合、+1のみ 

47 cooling_mode = 1 

48 elif current_mode == max_mode: 

49 # 最大モードの場合、-1のみ 

50 cooling_mode = current_mode - 1 

51 else: 

52 # その他の場合、50%で+1、50%で-1 

53 cooling_mode = current_mode + 1 if random.random() < 0.5 else current_mode - 1 # noqa: S311 

54 

55 dummy_cooling_mode.prev_mode = cooling_mode 

56 

57 logging.info("cooling_mode: %d (prev: %d)", cooling_mode, current_mode) 

58 

59 return {"cooling_mode": cooling_mode} 

60 

61 

62dummy_cooling_mode.prev_mode = 0 

63 

64 

65def judge_cooling_mode(config): 

66 logging.info("Judge cooling mode") 

67 

68 sense_data = unit_cooler.controller.sensor.get_sense_data(config) 

69 

70 try: 

71 cooler_activity = unit_cooler.controller.sensor.get_cooler_activity(sense_data) 

72 except RuntimeError as e: 

73 unit_cooler.util.notify_error(config, e.args[0]) 

74 cooler_activity = {"status": 0, "message": None} 

75 

76 if cooler_activity["status"] == 0: 

77 outdoor_status = {"status": None, "message": None} 

78 cooling_mode = 0 

79 else: 

80 outdoor_status = unit_cooler.controller.sensor.get_outdoor_status(sense_data) 

81 cooling_mode = max(cooler_activity["status"] + outdoor_status["status"], 0) 

82 

83 if cooler_activity["message"] is not None: 

84 logging.info(cooler_activity["message"]) 

85 if outdoor_status["message"] is not None: 

86 logging.info(outdoor_status["message"]) 

87 

88 logging.info( 

89 "cooling_mode: %d (cooler_status: %s, outdoor_status: %s)", 

90 cooling_mode, 

91 cooler_activity["status"], 

92 outdoor_status["status"], 

93 ) 

94 

95 return { 

96 "cooling_mode": cooling_mode, 

97 "cooler_status": cooler_activity, 

98 "outdoor_status": outdoor_status, 

99 "sense_data": sense_data, 

100 } 

101 

102 

103def gen_control_msg(config, dummy_mode=False, speedup=1): 

104 mode = dummy_cooling_mode() if dummy_mode else judge_cooling_mode(config) 

105 mode_index = min(mode["cooling_mode"], len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1) 

106 control_msg = copy.deepcopy(unit_cooler.controller.message.CONTROL_MESSAGE_LIST[mode_index]) 

107 

108 # NOTE: 参考として、どのモードかも通知する 

109 control_msg["mode_index"] = mode_index 

110 

111 if dummy_mode: 

112 control_msg["duty"]["on_sec"] = max(control_msg["duty"]["on_sec"] / speedup, ON_SEC_MIN) 

113 control_msg["duty"]["off_sec"] = max(control_msg["duty"]["off_sec"] / speedup, OFF_SEC_MIN) 

114 

115 logging.info(control_msg) 

116 

117 return control_msg 

118 

119 

120if __name__ == "__main__": 

121 # TEST Code 

122 import docopt 

123 import my_lib.config 

124 import my_lib.logger 

125 import my_lib.pretty 

126 

127 args = docopt.docopt(__doc__) 

128 

129 config_file = args["-c"] 

130 debug_mode = args["-D"] 

131 

132 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

133 

134 config = my_lib.config.load(config_file) 

135 

136 logging.info(my_lib.pretty.format(gen_control_msg(config)))