Coverage for src/unit_cooler/controller/engine.py: 98%

62 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 14:35 +0000

1#!/usr/bin/env python3 

2""" 

3屋外の気象情報とエアコンの稼働状況に基づき、冷却モードを決定します。 

4 

5Usage: 

6 engine.py [-c CONFIG] [-D] 

7 

8Options: 

9 -c CONFIG : CONFIG を設定ファイルとして読み込んで実行します。[default: config.yaml] 

10 -D : デバッグモードで動作します。 

11""" 

12 

13import copy 

14import logging 

15 

16import my_lib.notify.slack 

17 

18import unit_cooler.controller.message 

19import unit_cooler.controller.sensor 

20import unit_cooler.util 

21 

22# 最低でもこの時間は ON にする (テスト時含む) 

23ON_SEC_MIN = 5 

24# 最低でもこの時間は OFF にする (テスト時含む) 

25OFF_SEC_MIN = 5 

26 

27 

28def dummy_cooling_mode(): 

29 import random 

30 

31 current_mode = dummy_cooling_mode.prev_mode 

32 max_mode = len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1 

33 

34 # 60%の確率で現状維持、40%の確率で変更 

35 if random.random() < 0.6: # noqa: S311 

36 cooling_mode = current_mode 

37 elif current_mode == 1: 

38 # モード1の場合、特別な処理 

39 if random.random() < 0.1: # noqa: S311 # 10%の確率で0へ 

40 cooling_mode = 0 

41 elif random.random() < 0.5: # noqa: S311 # 残り90%のうち50%で+1 

42 cooling_mode = min(current_mode + 1, max_mode) 

43 else: # 残り90%のうち50%で現状維持 

44 cooling_mode = current_mode 

45 elif current_mode == 0: 

46 # モード0の場合、+1のみ 

47 cooling_mode = 1 

48 elif current_mode == max_mode: 48 ↛ 50line 48 didn't jump to line 50 because the condition on line 48 was never true

49 # 最大モードの場合、-1のみ 

50 cooling_mode = current_mode - 1 

51 else: 

52 # その他の場合、50%で+1、50%で-1 

53 cooling_mode = current_mode + 1 if random.random() < 0.5 else current_mode - 1 # noqa: S311 

54 

55 dummy_cooling_mode.prev_mode = cooling_mode 

56 

57 logging.info("cooling_mode: %d (prev: %d)", cooling_mode, current_mode) 

58 

59 return {"cooling_mode": cooling_mode} 

60 

61 

62dummy_cooling_mode.prev_mode = 0 

63 

64 

65def judge_cooling_mode(config, sense_data): 

66 logging.info("Judge cooling mode") 

67 

68 try: 

69 cooler_activity = unit_cooler.controller.sensor.get_cooler_activity(sense_data) 

70 except RuntimeError as e: 

71 unit_cooler.util.notify_error(config, e.args[0]) 

72 cooler_activity = {"status": 0, "message": None} 

73 

74 if cooler_activity["status"] == 0: 

75 outdoor_status = {"status": None, "message": None} 

76 cooling_mode = 0 

77 else: 

78 outdoor_status = unit_cooler.controller.sensor.get_outdoor_status(sense_data) 

79 cooling_mode = max(cooler_activity["status"] + outdoor_status["status"], 0) 

80 

81 if cooler_activity["message"] is not None: 

82 logging.info(cooler_activity["message"]) 

83 if outdoor_status["message"] is not None: 

84 logging.info(outdoor_status["message"]) 

85 

86 logging.info( 

87 "cooling_mode: %d (cooler_status: %s, outdoor_status: %s)", 

88 cooling_mode, 

89 cooler_activity["status"], 

90 outdoor_status["status"], 

91 ) 

92 

93 return { 

94 "cooling_mode": cooling_mode, 

95 "cooler_status": cooler_activity, 

96 "outdoor_status": outdoor_status, 

97 "sense_data": sense_data, 

98 } 

99 

100 

101def gen_control_msg(config, dummy_mode=False, speedup=1): 

102 if dummy_mode: 

103 sense_data = {} 

104 mode = dummy_cooling_mode() 

105 else: 

106 sense_data = unit_cooler.controller.sensor.get_sense_data(config) 

107 mode = judge_cooling_mode(config, sense_data) 

108 

109 mode_index = min(mode["cooling_mode"], len(unit_cooler.controller.message.CONTROL_MESSAGE_LIST) - 1) 

110 

111 control_msg = copy.deepcopy(unit_cooler.controller.message.CONTROL_MESSAGE_LIST[mode_index]) 

112 

113 # NOTE: 参考として、どのモードかも通知する 

114 control_msg["mode_index"] = mode_index 

115 # NOTE: メトリクス用に、センサーデータも送る 

116 control_msg["sense_data"] = sense_data 

117 

118 if dummy_mode: 

119 control_msg["duty"]["on_sec"] = max(control_msg["duty"]["on_sec"] / speedup, ON_SEC_MIN) 

120 control_msg["duty"]["off_sec"] = max(control_msg["duty"]["off_sec"] / speedup, OFF_SEC_MIN) 

121 

122 logging.info(control_msg) 

123 

124 return control_msg 

125 

126 

127if __name__ == "__main__": 

128 # TEST Code 

129 import docopt 

130 import my_lib.config 

131 import my_lib.logger 

132 import my_lib.pretty 

133 

134 args = docopt.docopt(__doc__) 

135 

136 config_file = args["-c"] 

137 debug_mode = args["-D"] 

138 

139 my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO) 

140 

141 config = my_lib.config.load(config_file) 

142 

143 logging.info(my_lib.pretty.format(gen_control_msg(config)))