Coverage for src / rasp_shutter / control / webapi / schedule.py: 96%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-13 00:10 +0900

1#!/usr/bin/env python3 

2import json 

3import multiprocessing 

4import threading 

5import urllib.parse 

6 

7import flask 

8import flask_cors 

9import my_lib.flask_util 

10import my_lib.pytest_util 

11import my_lib.webapp.config 

12import my_lib.webapp.event 

13import my_lib.webapp.log 

14from flask_pydantic import validate 

15 

16import rasp_shutter.control.scheduler 

17import rasp_shutter.type_defs 

18from rasp_shutter.schemas import ScheduleCtrlRequest 

19 

20blueprint = flask.Blueprint("rasp-shutter-schedule", __name__, url_prefix=my_lib.webapp.config.URL_PREFIX) 

21 

22_schedule_lock: dict[str, threading.RLock] = {} 

23_schedule_queue: dict[str, "multiprocessing.Queue[dict[str, bool]]"] = {} 

24_worker_thread: dict[str, threading.Thread] = {} 

25 

26WDAY_STR = ["日", "月", "火", "水", "木", "金", "土"] 

27 

28 

29def init(config): 

30 init_impl(config) 

31 

32 

33def term(): 

34 worker_thread = get_worker_thread() 

35 if worker_thread is None: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 return 

37 

38 rasp_shutter.control.scheduler.term() 

39 worker_thread.join() 

40 del _worker_thread[my_lib.pytest_util.get_worker_id()] 

41 

42 

43def init_impl(config): 

44 if get_worker_thread() is not None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 raise ValueError("worker should be None") 

46 

47 worker_id = my_lib.pytest_util.get_worker_id() 

48 

49 _schedule_queue[worker_id] = multiprocessing.Queue() 

50 _schedule_lock[worker_id] = threading.RLock() 

51 rasp_shutter.control.scheduler.init() 

52 

53 _worker_thread[worker_id] = threading.Thread( 

54 target=rasp_shutter.control.scheduler.schedule_worker, 

55 args=( 

56 config, 

57 get_schedule_queue(), 

58 ), 

59 ) 

60 _worker_thread[worker_id].start() 

61 

62 

63def get_schedule_lock(): 

64 return _schedule_lock.get(my_lib.pytest_util.get_worker_id(), None) 

65 

66 

67def get_schedule_queue(): 

68 return _schedule_queue.get(my_lib.pytest_util.get_worker_id(), None) 

69 

70 

71def get_worker_thread(): 

72 return _worker_thread.get(my_lib.pytest_util.get_worker_id(), None) 

73 

74 

75def wday_str_list(wday_list: list[bool]) -> list[str]: 

76 wday_str = WDAY_STR 

77 

78 return [wday_str[i] for i, is_active in enumerate(wday_list) if is_active] 

79 

80 

81def schedule_entry_str(name: str, entry: rasp_shutter.type_defs.ScheduleEntry) -> str: 

82 name_upper = name.upper() 

83 time = entry["time"] 

84 solar_rad = entry["solar_rad"] 

85 lux = entry["lux"] 

86 altitude = entry["altitude"] 

87 wday = ",".join(wday_str_list(entry["wday"])) 

88 return f"{name_upper} {time} {solar_rad} W/mm^2 {lux} LUX {altitude} deg {wday}" 

89 

90 

91def schedule_str(schedule_data: dict) -> str: 

92 str_buf = [] 

93 for name in ["open", "close"]: 

94 entry = schedule_data[name] 

95 if not entry["is_active"]: 

96 continue 

97 str_buf.append(schedule_entry_str(name, entry)) 

98 

99 if len(str_buf) == 0: 

100 return "∅ 全て無効" 

101 

102 return "、\n".join(str_buf) 

103 

104 

105@blueprint.route("/api/schedule_ctrl", methods=["GET", "POST"]) 

106@flask_cors.cross_origin() 

107@validate(query=ScheduleCtrlRequest) 

108def api_schedule_ctrl(query: ScheduleCtrlRequest) -> flask.Response: 

109 if query.cmd == "set" and query.data is not None: 

110 schedule_data = json.loads(query.data) 

111 

112 if not rasp_shutter.control.scheduler.schedule_validate(schedule_data): 

113 my_lib.webapp.log.error("😵 スケジュールの指定が不正です。") 

114 return flask.jsonify(rasp_shutter.control.scheduler.schedule_load()) 

115 

116 schedule_lock = get_schedule_lock() 

117 schedule_queue = get_schedule_queue() 

118 assert schedule_lock is not None # noqa: S101 

119 assert schedule_queue is not None # noqa: S101 

120 

121 with schedule_lock: 

122 schedule_data = json.loads(query.data) 

123 

124 endpoint = urllib.parse.urljoin( 

125 flask.request.url_root, 

126 flask.url_for("rasp-shutter-control.api_shutter_ctrl"), 

127 ) 

128 

129 for entry in schedule_data.values(): 

130 entry["endpoint"] = endpoint 

131 schedule_queue.put(schedule_data) 

132 

133 rasp_shutter.control.scheduler.schedule_store(schedule_data) 

134 my_lib.webapp.event.notify_event(my_lib.webapp.event.EVENT_TYPE.SCHEDULE) 

135 

136 user = my_lib.flask_util.auth_user(flask.request) 

137 schedule_text = schedule_str(schedule_data) 

138 by_text = f"by {user}" if user != "" else "" 

139 my_lib.webapp.log.info(f"📅 スケジュールを更新しました。\n{schedule_text}\n{by_text}") 

140 

141 return flask.jsonify(rasp_shutter.control.scheduler.schedule_load())