Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c1990b632 | ||
|
|
826e866a52 | ||
|
|
6ef45830fb | ||
|
|
53cb116372 | ||
|
|
e973205b06 | ||
|
|
f00421d273 | ||
|
|
061ad8d322 | ||
|
|
12ac6c4cc4 |
244
room_control.py
244
room_control.py
@@ -1,13 +1,123 @@
|
|||||||
import asyncio
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
from dataclasses import dataclass, field
|
||||||
from datetime import datetime, time, timedelta
|
from datetime import datetime, time, timedelta
|
||||||
|
from pathlib import Path
|
||||||
from typing import Dict, List
|
from typing import Dict, List
|
||||||
|
|
||||||
import appdaemon.utils as utils
|
import appdaemon.utils as utils
|
||||||
import astral
|
import yaml
|
||||||
from appdaemon.entity import Entity
|
from appdaemon.entity import Entity
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
from appdaemon.plugins.hass.hassapi import Hass
|
||||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||||
|
from astral import SunDirection
|
||||||
|
|
||||||
|
|
||||||
|
def str_to_timedelta(input_str: str) -> timedelta:
|
||||||
|
try:
|
||||||
|
hours, minutes, seconds = map(int, input_str.split(':'))
|
||||||
|
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||||
|
except Exception:
|
||||||
|
return timedelta()
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RoomState:
|
||||||
|
scene: Dict[str, Dict[str, str | int]]
|
||||||
|
off_duration: timedelta = None
|
||||||
|
time: time = None
|
||||||
|
time_fmt: List[str] = field(default_factory=lambda : ['%H:%M:%S', '%I:%M:%S %p'], repr=False)
|
||||||
|
elevation: int | float = None
|
||||||
|
direction: SunDirection = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if isinstance(self.time, str):
|
||||||
|
for fmt in self.time_fmt:
|
||||||
|
try:
|
||||||
|
self.time = datetime.strptime(self.time, fmt).time()
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
if self.elevation is not None:
|
||||||
|
assert self.direction is not None, f'Elevation setting requires a direction'
|
||||||
|
if self.direction.lower() == 'setting':
|
||||||
|
self.direction = SunDirection.SETTING
|
||||||
|
elif self.direction.lower() == 'rising':
|
||||||
|
self.direction = SunDirection.RISING
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Invalid sun direction: {self.direction}')
|
||||||
|
|
||||||
|
if isinstance(self.elevation, str):
|
||||||
|
self.elevation = float(self.elevation)
|
||||||
|
|
||||||
|
if isinstance(self.off_duration, str):
|
||||||
|
self.off_duration = str_to_timedelta(self.off_duration)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_json(cls, json_input):
|
||||||
|
return cls(**json_input)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RoomConfig:
|
||||||
|
states: List[RoomState]
|
||||||
|
off_duration: timedelta = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if isinstance(self.off_duration, str):
|
||||||
|
self.off_duration = str_to_timedelta(self.off_duration)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_app_config(cls, app_cfg: Dict[str, Dict]):
|
||||||
|
if 'off_duration' in app_cfg:
|
||||||
|
kwargs = {'off_duration': app_cfg['off_duration']}
|
||||||
|
else:
|
||||||
|
kwargs = {}
|
||||||
|
|
||||||
|
self = cls(
|
||||||
|
states=[RoomState.from_json(s) for s in app_cfg['states']],
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_yaml(cls, yaml_path: Path, app_name: str):
|
||||||
|
with yaml_path.open('r') as f:
|
||||||
|
cfg: Dict = yaml.load(f, Loader=yaml.SafeLoader)[app_name]
|
||||||
|
return cls.from_app_config(cfg)
|
||||||
|
|
||||||
|
def sort_states(self):
|
||||||
|
"""Should only be called after all the times have been resolved
|
||||||
|
"""
|
||||||
|
assert all(isinstance(state.time, time) for state in self.states), 'Times have not all been resolved yet'
|
||||||
|
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||||
|
|
||||||
|
def current_state(self, now: time) -> RoomState:
|
||||||
|
time_fmt = "%I:%M:%S %p"
|
||||||
|
print(now.strftime(time_fmt))
|
||||||
|
|
||||||
|
self.sort_states()
|
||||||
|
for state in self.states:
|
||||||
|
if state.time <= now:
|
||||||
|
return state
|
||||||
|
else:
|
||||||
|
# self.log(f'Defaulting to first state')
|
||||||
|
return self.states[0]
|
||||||
|
|
||||||
|
def current_scene(self, now: time) -> Dict:
|
||||||
|
state = self.current_state(now)
|
||||||
|
return state.scene
|
||||||
|
|
||||||
|
def current_off_duration(self, now: time) -> timedelta:
|
||||||
|
state = self.current_state(now)
|
||||||
|
if state.off_duration is None:
|
||||||
|
if self.off_duration is None:
|
||||||
|
raise ValueError(f'Need an off duration')
|
||||||
|
else:
|
||||||
|
return self.off_duration
|
||||||
|
else:
|
||||||
|
return state.off_duration
|
||||||
|
|
||||||
|
|
||||||
class RoomController(Hass, Mqtt):
|
class RoomController(Hass, Mqtt):
|
||||||
@@ -20,16 +130,21 @@ class RoomController(Hass, Mqtt):
|
|||||||
- When the light comes on, check if it's attributes match what they should, given the time.
|
- When the light comes on, check if it's attributes match what they should, given the time.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def states(self) -> List[RoomState]:
|
||||||
|
return self._room_config.states
|
||||||
|
|
||||||
|
@states.setter
|
||||||
|
def states(self, new: List[RoomState]):
|
||||||
|
assert all(isinstance(s, RoomState) for s in new), f'Invalid: {new}'
|
||||||
|
self._room_config.states = new
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
self.app_entities = await self.gather_app_entities()
|
self.app_entities = await self.gather_app_entities()
|
||||||
# self.log(f'entities: {self.app_entities}')
|
# self.log(f'entities: {self.app_entities}')
|
||||||
await self.refresh_state_times()
|
await self.refresh_state_times()
|
||||||
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||||
|
|
||||||
# if (ha_button := self.args.get('ha_button')):
|
|
||||||
# self.log(f'Setting up input button: {self.friendly_name(ha_button)}')
|
|
||||||
# self.listen_state(callback=self.activate_any_on, entity_id=ha_button)
|
|
||||||
|
|
||||||
async def gather_app_entities(self) -> List[str]:
|
async def gather_app_entities(self) -> List[str]:
|
||||||
"""Returns a list of all the entities involved in any of the states
|
"""Returns a list of all the entities involved in any of the states
|
||||||
"""
|
"""
|
||||||
@@ -55,80 +170,65 @@ class RoomController(Hass, Mqtt):
|
|||||||
async def refresh_state_times(self, *args, **kwargs):
|
async def refresh_state_times(self, *args, **kwargs):
|
||||||
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
||||||
|
|
||||||
Parsed states have an absolute time for a certain day.
|
Parsed states have an absolute time for the current day.
|
||||||
"""
|
"""
|
||||||
# re-parse the state strings into times for the current day
|
# re-parse the state strings into times for the current day
|
||||||
self.states = await self.parse_states()
|
self._room_config = RoomConfig.from_app_config(self.args)
|
||||||
|
self.log(f'{len(self._room_config.states)} states in the RoomConfig')
|
||||||
|
|
||||||
|
for state in self._room_config.states:
|
||||||
|
if state.time is None and state.elevation is not None:
|
||||||
|
state.time = self.AD.sched.location.time_at_elevation(
|
||||||
|
elevation=state.elevation,
|
||||||
|
direction=state.direction
|
||||||
|
).time()
|
||||||
|
elif isinstance(state.time, str):
|
||||||
|
state.time = await self.parse_time(state.time)
|
||||||
|
|
||||||
|
assert isinstance(state.time, time), f'Invalid time: {state.time}'
|
||||||
|
|
||||||
|
for state in self.states:
|
||||||
|
self.log(f'State: {state.time.strftime("%I:%M:%S %p")} {state.scene}')
|
||||||
|
|
||||||
|
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||||
|
|
||||||
# schedule the transitions
|
# schedule the transitions
|
||||||
for state in self.states:
|
for state in self.states[::-1]:
|
||||||
t: time = state['time']
|
# t: time = state['time']
|
||||||
|
t: time = state.time
|
||||||
try:
|
try:
|
||||||
await self.run_at(callback=self.activate_any_on, start=t.strftime('%H:%M:%S'), cause='scheduled transition')
|
await self.run_at(
|
||||||
|
callback=self.activate_any_on,
|
||||||
|
start=t.strftime('%H:%M:%S'),
|
||||||
|
cause='scheduled transition'
|
||||||
|
)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# happens when the callback time is in the past
|
# happens when the callback time is in the past
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log(f'Failed with {type(e)}: {e}')
|
self.log(f'Failed with {type(e)}: {e}')
|
||||||
else:
|
|
||||||
self.log(f'Scheduled transition at: {t.strftime("%I:%M:%S %p")}')
|
|
||||||
|
|
||||||
async def parse_states(self):
|
async def current_state(self, now: time = None) -> RoomState:
|
||||||
async def gen():
|
|
||||||
for state in deepcopy(self.args['states']):
|
|
||||||
if (time := state.get('time')):
|
|
||||||
state['time'] = await self.parse_time(time)
|
|
||||||
|
|
||||||
elif isinstance((elevation := state.get('elevation')), (int, float)):
|
|
||||||
assert 'direction' in state, f'State needs a direction if it has an elevation'
|
|
||||||
|
|
||||||
if state['direction'] == 'rising':
|
|
||||||
dir = astral.SunDirection.RISING
|
|
||||||
elif state['direction'] == 'setting':
|
|
||||||
dir = astral.SunDirection.SETTING
|
|
||||||
else:
|
|
||||||
raise ValueError(f'Invalid sun direction: {state["direction"]}')
|
|
||||||
|
|
||||||
state['time'] = self.AD.sched.location.time_at_elevation(
|
|
||||||
elevation=elevation, direction=dir
|
|
||||||
).time()
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise ValueError(f'Missing time')
|
|
||||||
|
|
||||||
yield state
|
|
||||||
|
|
||||||
states = [s async for s in gen()]
|
|
||||||
states = sorted(states, key=lambda s: s['time'], reverse=True)
|
|
||||||
return states
|
|
||||||
|
|
||||||
async def current_state(self, time: time = None):
|
|
||||||
if (await self.sleep_bool()):
|
if (await self.sleep_bool()):
|
||||||
self.log(f'sleep: active')
|
self.log(f'sleep: active')
|
||||||
if (state := self.args.get('sleep_state')):
|
if (state := self.args.get('sleep_state')):
|
||||||
return state
|
return RoomState.from_json(state)
|
||||||
else:
|
else:
|
||||||
return {}
|
return RoomState(scene={})
|
||||||
else:
|
else:
|
||||||
# now: datetime = await self.get_now()
|
now = now or (await self.get_now()).time()
|
||||||
# self.log(f'Getting state for datetime: {now.strftime("%I:%M:%S %p")}')
|
self.log(f'Getting state for {now}', level='DEBUG')
|
||||||
time = time or (await self.get_now()).time()
|
|
||||||
time_fmt = "%I:%M %p"
|
|
||||||
self.log(f'Getting state before: {time.strftime(time_fmt)}')
|
|
||||||
for state in self.states:
|
|
||||||
time_str = state["time"].strftime(time_fmt)
|
|
||||||
if state['time'] <= time:
|
|
||||||
self.log(f'Selected state from {time_str}')
|
|
||||||
return state
|
|
||||||
else:
|
|
||||||
self.log(f'Not {time_str}')
|
|
||||||
else:
|
|
||||||
self.log(f'Defaulting to first state')
|
|
||||||
return self.states[0]
|
|
||||||
|
|
||||||
async def current_scene(self, time: time = None):
|
state = self._room_config.current_state(now)
|
||||||
if (state := (await self.current_state(time=time))) is not None:
|
self.log(f'Current state: {state}', level='DEBUG')
|
||||||
return state['scene']
|
|
||||||
|
return state
|
||||||
|
|
||||||
|
async def current_scene(self, now: time = None) -> Dict[str, Dict[str, str | int]]:
|
||||||
|
state = await self.current_state(now)
|
||||||
|
assert isinstance(state, RoomState)
|
||||||
|
self.log(f'Current scene: {state}')
|
||||||
|
return state.scene
|
||||||
|
|
||||||
async def app_entity_states(self) -> Dict[str, str]:
|
async def app_entity_states(self) -> Dict[str, str]:
|
||||||
states = {
|
states = {
|
||||||
@@ -171,7 +271,7 @@ class RoomController(Hass, Mqtt):
|
|||||||
# else:
|
# else:
|
||||||
# raise ValueError('Sleep variable is undefined')
|
# raise ValueError('Sleep variable is undefined')
|
||||||
|
|
||||||
async def off_duration(self) -> timedelta:
|
async def off_duration(self, now: time = None) -> timedelta:
|
||||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||||
|
|
||||||
Priority:
|
Priority:
|
||||||
@@ -181,17 +281,13 @@ class RoomController(Hass, Mqtt):
|
|||||||
- Sleep - 0
|
- Sleep - 0
|
||||||
|
|
||||||
"""
|
"""
|
||||||
current_state = await self.current_state()
|
sleep_mode_active = await self.sleep_bool()
|
||||||
duration_str = current_state.get(
|
if sleep_mode_active:
|
||||||
'off_duration',
|
self.log(f'Sleeping mode active: {sleep_mode_active}')
|
||||||
self.args.get('off_duration', '00:00:00')
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
hours, minutes, seconds = map(int, duration_str.split(':'))
|
|
||||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
||||||
except Exception:
|
|
||||||
return timedelta()
|
return timedelta()
|
||||||
|
else:
|
||||||
|
now = now or (await self.get_now()).time()
|
||||||
|
return self._room_config.current_off_duration(now)
|
||||||
|
|
||||||
@utils.sync_wrapper
|
@utils.sync_wrapper
|
||||||
async def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
async def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
||||||
|
|||||||
Reference in New Issue
Block a user