Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c1990b632 | ||
|
|
826e866a52 | ||
|
|
6ef45830fb | ||
|
|
53cb116372 | ||
|
|
e973205b06 | ||
|
|
f00421d273 | ||
|
|
061ad8d322 | ||
|
|
12ac6c4cc4 |
244
room_control.py
244
room_control.py
@@ -1,13 +1,123 @@
|
||||
import asyncio
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, time, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import appdaemon.utils as utils
|
||||
import astral
|
||||
import yaml
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||
from astral import SunDirection
|
||||
|
||||
|
||||
def str_to_timedelta(input_str: str) -> timedelta:
|
||||
try:
|
||||
hours, minutes, seconds = map(int, input_str.split(':'))
|
||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
except Exception:
|
||||
return timedelta()
|
||||
|
||||
@dataclass
|
||||
class RoomState:
|
||||
scene: Dict[str, Dict[str, str | int]]
|
||||
off_duration: timedelta = None
|
||||
time: time = None
|
||||
time_fmt: List[str] = field(default_factory=lambda : ['%H:%M:%S', '%I:%M:%S %p'], repr=False)
|
||||
elevation: int | float = None
|
||||
direction: SunDirection = None
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.time, str):
|
||||
for fmt in self.time_fmt:
|
||||
try:
|
||||
self.time = datetime.strptime(self.time, fmt).time()
|
||||
except:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if self.elevation is not None:
|
||||
assert self.direction is not None, f'Elevation setting requires a direction'
|
||||
if self.direction.lower() == 'setting':
|
||||
self.direction = SunDirection.SETTING
|
||||
elif self.direction.lower() == 'rising':
|
||||
self.direction = SunDirection.RISING
|
||||
else:
|
||||
raise ValueError(f'Invalid sun direction: {self.direction}')
|
||||
|
||||
if isinstance(self.elevation, str):
|
||||
self.elevation = float(self.elevation)
|
||||
|
||||
if isinstance(self.off_duration, str):
|
||||
self.off_duration = str_to_timedelta(self.off_duration)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_input):
|
||||
return cls(**json_input)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoomConfig:
|
||||
states: List[RoomState]
|
||||
off_duration: timedelta = None
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.off_duration, str):
|
||||
self.off_duration = str_to_timedelta(self.off_duration)
|
||||
|
||||
@classmethod
|
||||
def from_app_config(cls, app_cfg: Dict[str, Dict]):
|
||||
if 'off_duration' in app_cfg:
|
||||
kwargs = {'off_duration': app_cfg['off_duration']}
|
||||
else:
|
||||
kwargs = {}
|
||||
|
||||
self = cls(
|
||||
states=[RoomState.from_json(s) for s in app_cfg['states']],
|
||||
**kwargs
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls, yaml_path: Path, app_name: str):
|
||||
with yaml_path.open('r') as f:
|
||||
cfg: Dict = yaml.load(f, Loader=yaml.SafeLoader)[app_name]
|
||||
return cls.from_app_config(cfg)
|
||||
|
||||
def sort_states(self):
|
||||
"""Should only be called after all the times have been resolved
|
||||
"""
|
||||
assert all(isinstance(state.time, time) for state in self.states), 'Times have not all been resolved yet'
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
def current_state(self, now: time) -> RoomState:
|
||||
time_fmt = "%I:%M:%S %p"
|
||||
print(now.strftime(time_fmt))
|
||||
|
||||
self.sort_states()
|
||||
for state in self.states:
|
||||
if state.time <= now:
|
||||
return state
|
||||
else:
|
||||
# self.log(f'Defaulting to first state')
|
||||
return self.states[0]
|
||||
|
||||
def current_scene(self, now: time) -> Dict:
|
||||
state = self.current_state(now)
|
||||
return state.scene
|
||||
|
||||
def current_off_duration(self, now: time) -> timedelta:
|
||||
state = self.current_state(now)
|
||||
if state.off_duration is None:
|
||||
if self.off_duration is None:
|
||||
raise ValueError(f'Need an off duration')
|
||||
else:
|
||||
return self.off_duration
|
||||
else:
|
||||
return state.off_duration
|
||||
|
||||
|
||||
class RoomController(Hass, Mqtt):
|
||||
@@ -20,16 +130,21 @@ class RoomController(Hass, Mqtt):
|
||||
- When the light comes on, check if it's attributes match what they should, given the time.
|
||||
"""
|
||||
|
||||
@property
|
||||
def states(self) -> List[RoomState]:
|
||||
return self._room_config.states
|
||||
|
||||
@states.setter
|
||||
def states(self, new: List[RoomState]):
|
||||
assert all(isinstance(s, RoomState) for s in new), f'Invalid: {new}'
|
||||
self._room_config.states = new
|
||||
|
||||
async def initialize(self):
|
||||
self.app_entities = await self.gather_app_entities()
|
||||
# self.log(f'entities: {self.app_entities}')
|
||||
await self.refresh_state_times()
|
||||
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||
|
||||
# if (ha_button := self.args.get('ha_button')):
|
||||
# self.log(f'Setting up input button: {self.friendly_name(ha_button)}')
|
||||
# self.listen_state(callback=self.activate_any_on, entity_id=ha_button)
|
||||
|
||||
async def gather_app_entities(self) -> List[str]:
|
||||
"""Returns a list of all the entities involved in any of the states
|
||||
"""
|
||||
@@ -55,80 +170,65 @@ class RoomController(Hass, Mqtt):
|
||||
async def refresh_state_times(self, *args, **kwargs):
|
||||
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
||||
|
||||
Parsed states have an absolute time for a certain day.
|
||||
Parsed states have an absolute time for the current day.
|
||||
"""
|
||||
# re-parse the state strings into times for the current day
|
||||
self.states = await self.parse_states()
|
||||
self._room_config = RoomConfig.from_app_config(self.args)
|
||||
self.log(f'{len(self._room_config.states)} states in the RoomConfig')
|
||||
|
||||
for state in self._room_config.states:
|
||||
if state.time is None and state.elevation is not None:
|
||||
state.time = self.AD.sched.location.time_at_elevation(
|
||||
elevation=state.elevation,
|
||||
direction=state.direction
|
||||
).time()
|
||||
elif isinstance(state.time, str):
|
||||
state.time = await self.parse_time(state.time)
|
||||
|
||||
assert isinstance(state.time, time), f'Invalid time: {state.time}'
|
||||
|
||||
for state in self.states:
|
||||
self.log(f'State: {state.time.strftime("%I:%M:%S %p")} {state.scene}')
|
||||
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
# schedule the transitions
|
||||
for state in self.states:
|
||||
t: time = state['time']
|
||||
for state in self.states[::-1]:
|
||||
# t: time = state['time']
|
||||
t: time = state.time
|
||||
try:
|
||||
await self.run_at(callback=self.activate_any_on, start=t.strftime('%H:%M:%S'), cause='scheduled transition')
|
||||
await self.run_at(
|
||||
callback=self.activate_any_on,
|
||||
start=t.strftime('%H:%M:%S'),
|
||||
cause='scheduled transition'
|
||||
)
|
||||
except ValueError:
|
||||
# happens when the callback time is in the past
|
||||
pass
|
||||
except Exception as e:
|
||||
self.log(f'Failed with {type(e)}: {e}')
|
||||
else:
|
||||
self.log(f'Scheduled transition at: {t.strftime("%I:%M:%S %p")}')
|
||||
|
||||
async def parse_states(self):
|
||||
async def gen():
|
||||
for state in deepcopy(self.args['states']):
|
||||
if (time := state.get('time')):
|
||||
state['time'] = await self.parse_time(time)
|
||||
|
||||
elif isinstance((elevation := state.get('elevation')), (int, float)):
|
||||
assert 'direction' in state, f'State needs a direction if it has an elevation'
|
||||
|
||||
if state['direction'] == 'rising':
|
||||
dir = astral.SunDirection.RISING
|
||||
elif state['direction'] == 'setting':
|
||||
dir = astral.SunDirection.SETTING
|
||||
else:
|
||||
raise ValueError(f'Invalid sun direction: {state["direction"]}')
|
||||
|
||||
state['time'] = self.AD.sched.location.time_at_elevation(
|
||||
elevation=elevation, direction=dir
|
||||
).time()
|
||||
|
||||
else:
|
||||
raise ValueError(f'Missing time')
|
||||
|
||||
yield state
|
||||
|
||||
states = [s async for s in gen()]
|
||||
states = sorted(states, key=lambda s: s['time'], reverse=True)
|
||||
return states
|
||||
|
||||
async def current_state(self, time: time = None):
|
||||
async def current_state(self, now: time = None) -> RoomState:
|
||||
if (await self.sleep_bool()):
|
||||
self.log(f'sleep: active')
|
||||
if (state := self.args.get('sleep_state')):
|
||||
return state
|
||||
return RoomState.from_json(state)
|
||||
else:
|
||||
return {}
|
||||
return RoomState(scene={})
|
||||
else:
|
||||
# now: datetime = await self.get_now()
|
||||
# self.log(f'Getting state for datetime: {now.strftime("%I:%M:%S %p")}')
|
||||
time = time or (await self.get_now()).time()
|
||||
time_fmt = "%I:%M %p"
|
||||
self.log(f'Getting state before: {time.strftime(time_fmt)}')
|
||||
for state in self.states:
|
||||
time_str = state["time"].strftime(time_fmt)
|
||||
if state['time'] <= time:
|
||||
self.log(f'Selected state from {time_str}')
|
||||
return state
|
||||
else:
|
||||
self.log(f'Not {time_str}')
|
||||
else:
|
||||
self.log(f'Defaulting to first state')
|
||||
return self.states[0]
|
||||
now = now or (await self.get_now()).time()
|
||||
self.log(f'Getting state for {now}', level='DEBUG')
|
||||
|
||||
async def current_scene(self, time: time = None):
|
||||
if (state := (await self.current_state(time=time))) is not None:
|
||||
return state['scene']
|
||||
state = self._room_config.current_state(now)
|
||||
self.log(f'Current state: {state}', level='DEBUG')
|
||||
|
||||
return state
|
||||
|
||||
async def current_scene(self, now: time = None) -> Dict[str, Dict[str, str | int]]:
|
||||
state = await self.current_state(now)
|
||||
assert isinstance(state, RoomState)
|
||||
self.log(f'Current scene: {state}')
|
||||
return state.scene
|
||||
|
||||
async def app_entity_states(self) -> Dict[str, str]:
|
||||
states = {
|
||||
@@ -171,7 +271,7 @@ class RoomController(Hass, Mqtt):
|
||||
# else:
|
||||
# raise ValueError('Sleep variable is undefined')
|
||||
|
||||
async def off_duration(self) -> timedelta:
|
||||
async def off_duration(self, now: time = None) -> timedelta:
|
||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||
|
||||
Priority:
|
||||
@@ -181,17 +281,13 @@ class RoomController(Hass, Mqtt):
|
||||
- Sleep - 0
|
||||
|
||||
"""
|
||||
current_state = await self.current_state()
|
||||
duration_str = current_state.get(
|
||||
'off_duration',
|
||||
self.args.get('off_duration', '00:00:00')
|
||||
)
|
||||
|
||||
try:
|
||||
hours, minutes, seconds = map(int, duration_str.split(':'))
|
||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
except Exception:
|
||||
sleep_mode_active = await self.sleep_bool()
|
||||
if sleep_mode_active:
|
||||
self.log(f'Sleeping mode active: {sleep_mode_active}')
|
||||
return timedelta()
|
||||
else:
|
||||
now = now or (await self.get_now()).time()
|
||||
return self._room_config.current_off_duration(now)
|
||||
|
||||
@utils.sync_wrapper
|
||||
async def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
||||
|
||||
Reference in New Issue
Block a user