Coverage for src / rasp_shutter / control / webapi / sensor.py: 55%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-13 00:10 +0900

1#!/usr/bin/env python3 

2import dataclasses 

3import datetime 

4 

5import flask 

6import flask_cors 

7import my_lib.sensor_data 

8import my_lib.time 

9import my_lib.webapp.config 

10import pysolar.solar 

11 

12import rasp_shutter.config 

13import rasp_shutter.type_defs 

14 

15blueprint = flask.Blueprint("rasp-shutter-sensor", __name__, url_prefix=my_lib.webapp.config.URL_PREFIX) 

16 

17 

18def get_solar_altitude(config: rasp_shutter.config.AppConfig) -> rasp_shutter.type_defs.SensorValue: 

19 # pysolar.solar.get_altitude() はUTC時刻を要求するため、明示的にUTCを使用 

20 now = datetime.datetime.now(datetime.UTC) 

21 return rasp_shutter.type_defs.SensorValue.create_valid( 

22 value=pysolar.solar.get_altitude(config.location.latitude, config.location.longitude, now), 

23 time=now, 

24 ) 

25 

26 

27def get_sensor_data(config: rasp_shutter.config.AppConfig) -> rasp_shutter.type_defs.SensorData: 

28 timezone = my_lib.time.get_zoneinfo() 

29 

30 sensor_values: dict[str, rasp_shutter.type_defs.SensorValue] = {} 

31 for field in ["lux", "solar_rad"]: 

32 sensor = getattr(config.sensor, field) 

33 data = my_lib.sensor_data.fetch_data( 

34 config.sensor.influxdb, 

35 sensor.measure, 

36 sensor.hostname, 

37 field, 

38 start="-1h", 

39 last=True, 

40 ) 

41 if data.valid: 

42 sensor_values[field] = rasp_shutter.type_defs.SensorValue.create_valid( 

43 value=data.value[0], 

44 # NOTE: タイムゾーン情報を削除しておく。 

45 time=data.time[0].replace(tzinfo=timezone), 

46 ) 

47 else: 

48 sensor_values[field] = rasp_shutter.type_defs.SensorValue.create_invalid() 

49 

50 return rasp_shutter.type_defs.SensorData( 

51 lux=sensor_values["lux"], 

52 solar_rad=sensor_values["solar_rad"], 

53 altitude=get_solar_altitude(config), 

54 ) 

55 

56 

57@blueprint.route("/api/sensor", methods=["GET"]) 

58@flask_cors.cross_origin() 

59def api_sensor_data() -> flask.Response: 

60 config: rasp_shutter.config.AppConfig = flask.current_app.config["CONFIG"] 

61 return flask.jsonify(dataclasses.asdict(get_sensor_data(config)))