dataclasses and bugfixes
This commit is contained in:
315
room_control.py
315
room_control.py
@@ -1,13 +1,123 @@
|
||||
import asyncio
|
||||
from copy import deepcopy
|
||||
from datetime import time, timedelta
|
||||
from typing import List
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, time, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import appdaemon.utils as utils
|
||||
import astral
|
||||
import yaml
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||
from astral import SunDirection
|
||||
|
||||
|
||||
def str_to_timedelta(input_str: str) -> timedelta:
|
||||
try:
|
||||
hours, minutes, seconds = map(int, input_str.split(':'))
|
||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
except Exception:
|
||||
return timedelta()
|
||||
|
||||
@dataclass
|
||||
class RoomState:
|
||||
scene: Dict[str, Dict[str, str | int]]
|
||||
off_duration: timedelta = None
|
||||
time: time = None
|
||||
time_fmt: List[str] = field(default_factory=lambda : ['%H:%M:%S', '%I:%M:%S %p'], repr=False)
|
||||
elevation: int | float = None
|
||||
direction: SunDirection = None
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.time, str):
|
||||
for fmt in self.time_fmt:
|
||||
try:
|
||||
self.time = datetime.strptime(self.time, fmt).time()
|
||||
except:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if self.elevation is not None:
|
||||
assert self.direction is not None, f'Elevation setting requires a direction'
|
||||
if self.direction.lower() == 'setting':
|
||||
self.direction = SunDirection.SETTING
|
||||
elif self.direction.lower() == 'rising':
|
||||
self.direction = SunDirection.RISING
|
||||
else:
|
||||
raise ValueError(f'Invalid sun direction: {self.direction}')
|
||||
|
||||
if isinstance(self.elevation, str):
|
||||
self.elevation = float(self.elevation)
|
||||
|
||||
if isinstance(self.off_duration, str):
|
||||
self.off_duration = str_to_timedelta(self.off_duration)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_input):
|
||||
return cls(**json_input)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoomConfig:
|
||||
states: List[RoomState]
|
||||
off_duration: timedelta = None
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.off_duration, str):
|
||||
self.off_duration = str_to_timedelta(self.off_duration)
|
||||
|
||||
@classmethod
|
||||
def from_app_config(cls, app_cfg: Dict[str, Dict]):
|
||||
if 'off_duration' in app_cfg:
|
||||
kwargs = {'off_duration': app_cfg['off_duration']}
|
||||
else:
|
||||
kwargs = {}
|
||||
|
||||
self = cls(
|
||||
states=[RoomState.from_json(s) for s in app_cfg['states']],
|
||||
**kwargs
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls, yaml_path: Path, app_name: str):
|
||||
with yaml_path.open('r') as f:
|
||||
cfg: Dict = yaml.load(f, Loader=yaml.SafeLoader)[app_name]
|
||||
return cls.from_app_config(cfg)
|
||||
|
||||
def sort_states(self):
|
||||
"""Should only be called after all the times have been resolved
|
||||
"""
|
||||
assert all(isinstance(state.time, time) for state in self.states), 'Times have not all been resolved yet'
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
def current_state(self, now: time) -> RoomState:
|
||||
time_fmt = "%I:%M:%S %p"
|
||||
print(now.strftime(time_fmt))
|
||||
|
||||
self.sort_states()
|
||||
for state in self.states:
|
||||
if state.time <= now:
|
||||
return state
|
||||
else:
|
||||
# self.log(f'Defaulting to first state')
|
||||
return self.states[0]
|
||||
|
||||
def current_scene(self, now: time) -> Dict:
|
||||
state = self.current_state(now)
|
||||
return state.scene
|
||||
|
||||
def current_off_duration(self, now: time) -> timedelta:
|
||||
state = self.current_state(now)
|
||||
if state.off_duration is None:
|
||||
if self.off_duration is None:
|
||||
raise ValueError(f'Need an off duration')
|
||||
else:
|
||||
return self.off_duration
|
||||
else:
|
||||
return state.off_duration
|
||||
|
||||
|
||||
class RoomController(Hass, Mqtt):
|
||||
@@ -20,26 +130,31 @@ class RoomController(Hass, Mqtt):
|
||||
- When the light comes on, check if it's attributes match what they should, given the time.
|
||||
"""
|
||||
|
||||
async def initialize(self):
|
||||
self.app_entities = await self.gather_app_entities()
|
||||
self.log(f'entities: {self.app_entities}')
|
||||
await self.refresh_state_times()
|
||||
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||
@property
|
||||
def states(self) -> List[RoomState]:
|
||||
return self._room_config.states
|
||||
|
||||
@states.setter
|
||||
def states(self, new: List[RoomState]):
|
||||
assert all(isinstance(s, RoomState) for s in new), f'Invalid: {new}'
|
||||
self._room_config.states = new
|
||||
|
||||
# if (ha_button := self.args.get('ha_button')):
|
||||
# self.log(f'Setting up input button: {self.friendly_name(ha_button)}')
|
||||
# self.listen_state(callback=self.activate_any_on, entity_id=ha_button)
|
||||
def initialize(self):
|
||||
self.app_entities = self.gather_app_entities()
|
||||
# self.log(f'entities: {self.app_entities}')
|
||||
self.refresh_state_times()
|
||||
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||
|
||||
async def gather_app_entities(self) -> List[str]:
|
||||
def gather_app_entities(self) -> List[str]:
|
||||
"""Returns a list of all the entities involved in any of the states
|
||||
"""
|
||||
async def async_generator():
|
||||
def generator():
|
||||
for settings in deepcopy(self.args['states']):
|
||||
if (scene := settings.get('scene')):
|
||||
if isinstance(scene, str):
|
||||
assert scene.startswith('scene.'), f"Scene definition must start with 'scene.' for app {self.name}"
|
||||
entity: Entity = self.get_entity(scene)
|
||||
entity_state = await entity.get_state('all')
|
||||
entity_state = entity.get_state('all')
|
||||
attributes = entity_state['attributes']
|
||||
for entity in attributes['entity_id']:
|
||||
yield entity
|
||||
@@ -49,100 +164,100 @@ class RoomController(Hass, Mqtt):
|
||||
else:
|
||||
yield self.args['entity']
|
||||
|
||||
entities = [e async for e in async_generator()]
|
||||
entities = [e for e in generator()]
|
||||
return set(entities)
|
||||
|
||||
async def refresh_state_times(self, *args, **kwargs):
|
||||
def refresh_state_times(self, *args, **kwargs):
|
||||
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
||||
|
||||
Parsed states have an absolute time for a certain day.
|
||||
Parsed states have an absolute time for the current day.
|
||||
"""
|
||||
# re-parse the state strings into times for the current day
|
||||
self.states = await self.parse_states()
|
||||
self._room_config = RoomConfig.from_app_config(self.args)
|
||||
self.log(f'{len(self._room_config.states)} states in the RoomConfig')
|
||||
|
||||
for state in self._room_config.states:
|
||||
if state.time is None and state.elevation is not None:
|
||||
state.time = self.AD.sched.location.time_at_elevation(
|
||||
elevation=state.elevation,
|
||||
direction=state.direction
|
||||
).time()
|
||||
elif isinstance(state.time, str):
|
||||
state.time = self.parse_time(state.time)
|
||||
|
||||
assert isinstance(state.time, time), f'Invalid time: {state.time}'
|
||||
|
||||
for state in self.states:
|
||||
self.log(f'State: {state.time.strftime("%I:%M:%S %p")} {state.scene}')
|
||||
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
# schedule the transitions
|
||||
for state in self.states:
|
||||
dt = str(state['time'])[:8]
|
||||
self.log(f'Scheduling transition at: {dt}')
|
||||
for state in self.states[::-1]:
|
||||
# t: time = state['time']
|
||||
t: time = state.time
|
||||
try:
|
||||
await self.run_at(callback=self.activate_any_on, start=dt)
|
||||
self.run_at(
|
||||
callback=self.activate_any_on,
|
||||
start=t.strftime('%H:%M:%S'),
|
||||
cause='scheduled transition'
|
||||
)
|
||||
except ValueError:
|
||||
# happens when the callback time is in the past
|
||||
pass
|
||||
except Exception as e:
|
||||
self.log(f'Failed with {type(e)}: {e}')
|
||||
|
||||
async def parse_states(self):
|
||||
async def gen():
|
||||
for state in deepcopy(self.args['states']):
|
||||
if (time := state.get('time')):
|
||||
state['time'] = await self.parse_time(time)
|
||||
def current_state(self, now: time = None) -> RoomState:
|
||||
if self.sleep_bool():
|
||||
self.log(f'sleep: active')
|
||||
if (state := self.args.get('sleep_state')):
|
||||
return RoomState.from_json(state)
|
||||
else:
|
||||
return RoomState(scene={})
|
||||
else:
|
||||
now = now or self.get_now().time()
|
||||
self.log(f'Getting state for {now}', level='DEBUG')
|
||||
|
||||
elif isinstance((elevation := state.get('elevation')), (int, float)):
|
||||
assert 'direction' in state, f'State needs a direction if it has an elevation'
|
||||
state = self._room_config.current_state(now)
|
||||
self.log(f'Current state: {state}', level='DEBUG')
|
||||
|
||||
if state['direction'] == 'rising':
|
||||
dir = astral.SunDirection.RISING
|
||||
elif state['direction'] == 'setting':
|
||||
dir = astral.SunDirection.SETTING
|
||||
else:
|
||||
raise ValueError(f'Invalid sun direction: {state["direction"]}')
|
||||
return state
|
||||
|
||||
state['time'] = self.AD.sched.location.time_at_elevation(
|
||||
elevation=elevation, direction=dir
|
||||
).time()
|
||||
def current_scene(self, now: time = None) -> Dict[str, Dict[str, str | int]]:
|
||||
state = self.current_state(now)
|
||||
assert isinstance(state, RoomState)
|
||||
self.log(f'Current scene: {state}')
|
||||
return state.scene
|
||||
|
||||
else:
|
||||
raise ValueError(f'Missing time')
|
||||
|
||||
yield state
|
||||
|
||||
states = [s async for s in gen()]
|
||||
states = sorted(states, key=lambda s: s['time'])
|
||||
def app_entity_states(self) -> Dict[str, str]:
|
||||
states = {
|
||||
entity: self.get_state(entity)
|
||||
for entity in self.app_entities
|
||||
}
|
||||
return states
|
||||
|
||||
async def current_state(self, time: time = None):
|
||||
if (await self.sleep_bool()):
|
||||
if (state := self.args.get('sleep_state')):
|
||||
return state
|
||||
else:
|
||||
return {}
|
||||
else:
|
||||
now = await self.get_now()
|
||||
self.log(f'Getting state for datetime: {now}')
|
||||
time = time or (await self.get_now()).time()
|
||||
for state in self.states[::-1]:
|
||||
if state['time'] <= time:
|
||||
self.log(f'Selected state from {state["time"]}')
|
||||
return state
|
||||
else:
|
||||
return self.states[-1]
|
||||
|
||||
async def current_scene(self, time: time = None):
|
||||
if (state := (await self.current_state(time=time))) is not None:
|
||||
return state['scene']
|
||||
|
||||
@property
|
||||
def all_off(self) -> bool:
|
||||
""""All off" is the logic opposite of "any on"
|
||||
|
||||
Returns:
|
||||
bool: Whether all the lights associated with the app are off
|
||||
"""
|
||||
return all(self.get_state(entity) != 'on' for entity in self.app_entities)
|
||||
states = self.app_entity_states()
|
||||
return all(state != 'on' for entity, state in states.items())
|
||||
|
||||
@property
|
||||
def any_on(self) -> bool:
|
||||
""""Any on" is the logic opposite of "all off"
|
||||
|
||||
Returns:
|
||||
bool: Whether any of the lights associated with the app are on
|
||||
"""
|
||||
return any(self.get_state(entity) == 'on' for entity in self.app_entities)
|
||||
states = self.app_entity_states()
|
||||
return any(state == 'on' for entity, state in states.items())
|
||||
|
||||
async def sleep_bool(self) -> bool:
|
||||
def sleep_bool(self) -> bool:
|
||||
if (sleep_var := self.args.get('sleep')):
|
||||
return (await self.get_state(sleep_var)) == 'on'
|
||||
return self.get_state(sleep_var) == 'on'
|
||||
else:
|
||||
return False
|
||||
|
||||
@@ -156,7 +271,7 @@ class RoomController(Hass, Mqtt):
|
||||
# else:
|
||||
# raise ValueError('Sleep variable is undefined')
|
||||
|
||||
async def off_duration(self) -> timedelta:
|
||||
def off_duration(self, now: time = None) -> timedelta:
|
||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||
|
||||
Priority:
|
||||
@@ -166,21 +281,22 @@ class RoomController(Hass, Mqtt):
|
||||
- Sleep - 0
|
||||
|
||||
"""
|
||||
current_state = await self.current_state()
|
||||
duration_str = current_state.get(
|
||||
'off_duration',
|
||||
self.args.get('off_duration', '00:00:00')
|
||||
)
|
||||
|
||||
try:
|
||||
hours, minutes, seconds = map(int, duration_str.split(':'))
|
||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
except Exception:
|
||||
sleep_mode_active = self.sleep_bool()
|
||||
if sleep_mode_active:
|
||||
self.log(f'Sleeping mode active: {sleep_mode_active}')
|
||||
return timedelta()
|
||||
else:
|
||||
now = now or self.get_now().time()
|
||||
return self._room_config.current_off_duration(now)
|
||||
|
||||
async def activate(self, *args, cause: str = 'unknown', **kwargs):
|
||||
def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
||||
if kwargs is not None:
|
||||
cause = kwargs.get('cause', 'unknown')
|
||||
else:
|
||||
cause = 'unknown'
|
||||
|
||||
self.log(f'Activating: {cause}')
|
||||
scene = await self.current_scene()
|
||||
scene = self.current_scene()
|
||||
|
||||
if isinstance(scene, str):
|
||||
self.turn_on(scene)
|
||||
@@ -202,26 +318,25 @@ class RoomController(Hass, Mqtt):
|
||||
else:
|
||||
self.log(f'ERROR: unknown scene: {scene}')
|
||||
|
||||
async def activate_all_off(self, *args, **kwargs):
|
||||
"""Activate if all of the entities are off
|
||||
def activate_all_off(self, *args, **kwargs):
|
||||
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()
|
||||
"""
|
||||
if self.all_off:
|
||||
self.log(f'Activate all off kwargs: {kwargs}')
|
||||
await self.activate(*args, **kwargs)
|
||||
if self.all_off():
|
||||
self.activate(*args, **kwargs)
|
||||
else:
|
||||
self.log(f'Skipped activating - everything is not off')
|
||||
|
||||
async def activate_any_on(self, *args, **kwargs):
|
||||
"""Activate if any of the entities are on
|
||||
def activate_any_on(self, *args, **kwargs):
|
||||
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()
|
||||
"""
|
||||
if self.any_on:
|
||||
await self.activate(*args, **kwargs)
|
||||
if self.any_on():
|
||||
self.activate(*args, **kwargs)
|
||||
else:
|
||||
self.log(f'Skipped activating - everything is off')
|
||||
|
||||
def deactivate(self, *args, cause: str = 'unknown', **kwargs):
|
||||
def deactivate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
||||
cause = kwargs.get('cause', 'unknown')
|
||||
self.log(f'Deactivating: {cause}')
|
||||
for entity in self.app_entities:
|
||||
self.turn_off(entity)
|
||||
self.log(f'Turned off {entity}')
|
||||
|
||||
for e in self.app_entities:
|
||||
self.turn_off(e)
|
||||
self.log(f'Turned off {e}')
|
||||
|
||||
Reference in New Issue
Block a user