Coverage for flask/src/rasp_water/webapp_schedule.py: 98%

65 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-24 13:56 +0900

1#!/usr/bin/env python3 

2import json 

3import threading 

4import urllib.parse 

5from multiprocessing import Queue 

6 

7import flask_cors 

8import my_lib.flask_util 

9import my_lib.webapp.config 

10import my_lib.webapp.event 

11import my_lib.webapp.log 

12import rasp_water.scheduler 

13 

14from flask import Blueprint, jsonify, request, url_for 

15 

16blueprint = Blueprint("rasp-water-schedule", __name__, url_prefix=my_lib.webapp.config.URL_PREFIX) 

17 

18schedule_lock = threading.Lock() 

19schedule_queue = None 

20worker = None 

21 

22WDAY_STR = ["日", "月", "火", "水", "木", "金", "土"] 

23 

24 

25def init(config): 

26 global worker # noqa: PLW0603 

27 global schedule_queue # noqa: PLW0603 

28 

29 if worker is not None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true

30 raise ValueError("worker should be None") # noqa: TRY003, EM101 

31 

32 schedule_queue = Queue() 

33 rasp_water.scheduler.init() 

34 worker = threading.Thread( 

35 target=rasp_water.scheduler.schedule_worker, 

36 args=( 

37 config, 

38 schedule_queue, 

39 ), 

40 ) 

41 worker.start() 

42 

43 

44def term(): 

45 global worker # noqa: PLW0603 

46 

47 if worker is None: 

48 return 

49 

50 rasp_water.scheduler.should_terminate.set() 

51 worker.join() 

52 

53 worker = None 

54 rasp_water.scheduler.should_terminate.clear() 

55 

56 

57def wday_str_list(wday_list): 

58 wday_str = WDAY_STR 

59 return [wday_str[i] for i in range(len(wday_list)) if wday_list[i]] 

60 

61 

62def schedule_entry_str(entry): 

63 return "{} 開始 {} 分間 {}".format(entry["time"], entry["period"], ",".join(wday_str_list(entry["wday"]))) 

64 

65 

66def schedule_str(schedule): 

67 str_buf = [] 

68 for entry in schedule: 

69 if not entry["is_active"]: 

70 continue 

71 str_buf.append(schedule_entry_str(entry)) 

72 

73 if len(str_buf) == 0: 

74 return "∅ 全て無効" 

75 

76 return "、\n".join(str_buf) 

77 

78 

79@blueprint.route("/api/schedule_ctrl", methods=["GET", "POST"]) 

80@my_lib.flask_util.support_jsonp 

81@flask_cors.cross_origin() 

82def api_schedule_ctrl(): 

83 cmd = request.args.get("cmd", None) 

84 data = request.args.get("data", None) 

85 if cmd == "set": 

86 schedule_data = json.loads(data) 

87 

88 if not rasp_water.scheduler.schedule_validate(schedule_data): 

89 my_lib.webapp.log.error("😵 スケジュールの指定が不正です。") 

90 return jsonify(rasp_water.scheduler.schedule_load()) 

91 

92 with schedule_lock: 

93 endpoint = urllib.parse.urljoin( 

94 request.url_root, 

95 url_for("rasp-water-valve.api_valve_ctrl"), 

96 ) 

97 

98 for entry in schedule_data: 

99 entry["endpoint"] = endpoint 

100 schedule_queue.put(schedule_data) 

101 

102 # NOTE: 本来は schedule_worker の中だけで呼んでるので不要だけど, 

103 # レスポンスを schedule_load() で返したいので,ここでも呼ぶ. 

104 rasp_water.scheduler.schedule_store(schedule_data) 

105 

106 my_lib.webapp.event.notify_event(my_lib.webapp.event.EVENT_TYPE.SCHEDULE) 

107 

108 user = my_lib.flask_util.auth_user(request) 

109 my_lib.webapp.log.info( 

110 "📅 スケジュールを更新しました。\n{schedule}\n{by}".format( 

111 schedule=schedule_str(schedule_data), 

112 by=f"by {user}" if user != "" else "", 

113 ) 

114 ) 

115 

116 return jsonify(rasp_water.scheduler.schedule_load())