diff --git a/button.py b/button.py index 5e84c98..eb90e9a 100644 --- a/button.py +++ b/button.py @@ -1,16 +1,13 @@ -import asyncio import json from appdaemon.plugins.mqtt.mqttapi import Mqtt from room_control import RoomController -class ButtonController(Mqtt): - def initialize(self): - task = self.get_app(self.args['app']) - self.app: RoomController = asyncio.get_event_loop().run_until_complete(task) +class Button(Mqtt): + async def initialize(self): + self.app: RoomController = await self.get_app(self.args['app']) self.setup_buttons(self.args['button']) - # self.log(f'Done') def setup_buttons(self, buttons): if isinstance(buttons, list): @@ -25,30 +22,28 @@ class ButtonController(Mqtt): self.listen_event(self.handle_button, "MQTT_MESSAGE", topic=topic, namespace='mqtt', button=name) self.log(f'"{topic}" controls app {self.app.name}') - async def handle_button(self, event_name, data, kwargs): - topic = data['topic'] - self.log(f'Button event for: {topic}') + def handle_button(self, event_name, data, kwargs): try: payload = json.loads(data['payload']) action = payload['action'] - button = kwargs['button'] except json.JSONDecodeError: self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR') except KeyError as e: return else: - self.log(f'{button}: {action}') - await self.handle_action(action) + if action != '': + self.handle_action(action) - async def handle_action(self, action: str): - if action == '': - return - elif action == 'single': - cause = 'button single click' - state = await self.get_state(entity_id=self.args['ref_entity']) + def handle_action(self, action: str): + if action == 'single': + self.log(f' {action.upper()} '.center(50, '=')) + state = self.get_state(self.args['ref_entity']) + kwargs = { + 'kwargs': {'cause': f'button single click: toggle while {state}'} + } if state == 'on': - self.app.deactivate(cause=cause) + self.app.deactivate(**kwargs) else: - await self.app.activate(cause=cause) + self.app.activate(**kwargs) else: pass \ No newline at end of file diff --git a/door.py b/door.py index 8b7a76e..50ca1c1 100644 --- a/door.py +++ b/door.py @@ -4,8 +4,5 @@ from room_control import RoomController class Door(Hass): async def initialize(self): - await self.listen_state(self.door_open, entity_id=self.args['door'], new='on') - - async def door_open(self, entity, attribute, old, new, kwargs): app: RoomController = await self.get_app(self.args['app']) - await app.activate_all_off() + await self.listen_state(app.activate_all_off, entity_id=self.args['door'], new='on', cause='door open') diff --git a/motion.py b/motion.py index 33bf7eb..e8b4f95 100644 --- a/motion.py +++ b/motion.py @@ -1,28 +1,12 @@ -import asyncio -from datetime import timedelta import re +from datetime import timedelta from appdaemon.entity import Entity from appdaemon.plugins.hass.hassapi import Hass from room_control import RoomController +from appdaemon import utils -class CustomEventLoopPolicy(asyncio.DefaultEventLoopPolicy): - def get_event_loop(self): - try: - # Try to get the current event loop - loop = super().get_event_loop() - except RuntimeError as ex: - if "There is no current event loop" in str(ex): - # If there's no current loop, create a new one and set it - loop = self.new_event_loop() - self.set_event_loop(loop) - else: - raise - return loop - -# Set the custom event loop policy -asyncio.set_event_loop_policy(CustomEventLoopPolicy()) class Motion(Hass): @property @@ -42,46 +26,35 @@ class Motion(Hass): return self.ref_entity.get_state() == 'on' def initialize(self): - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - - self.app: RoomController = loop.run_until_complete(self.get_app(self.args['app'])) + self.app: RoomController = self.get_app(self.args['app']) self.log(f'Connected to app {self.app.name}') - self.listen_state(self.callback_light_on, self.ref_entity.entity_id, new='on') - self.listen_state(self.callback_light_off, self.ref_entity.entity_id, new='off') + base_kwargs = dict( + entity_id=self.ref_entity.entity_id, + immediate=True, # avoids needing to sync the state + ) + # don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator + self.listen_state(**base_kwargs, attribute='brightness', callback=self.callback_light_on) + self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off) - loop.run_until_complete(self.sync_state()) - - async def sync_state(self): - """Synchronizes the callbacks with the state of the light. - - Essentially mimics the `state_change` callback based on the current state of the light. - """ - if self.ref_entity_state: - await self.callback_light_on() - else: - await self.callback_light_off() - - async def listen_motion_on(self): + def listen_motion_on(self): """Sets up the motion on callback to activate the room """ - self.log(f'Waiting for motion on {self.sensor.friendly_name}') - self.motion_on_handle = await self.listen_state( + self.cancel_motion_callback() + self.listen_state( callback=self.app.activate_all_off, entity_id=self.sensor.entity_id, new='on', oneshot=True, cause='motion on' ) + self.log(f'Waiting for motion on {self.sensor.friendly_name}') - async def listen_motion_off(self, duration: timedelta): + def listen_motion_off(self, duration: timedelta): """Sets up the motion off callback to deactivate the room """ - self.log(f'Waiting for motion to stop on {self.sensor.friendly_name}') - self.motion_off_handle = await self.listen_state( + self.cancel_motion_callback() + self.listen_state( callback=self.app.deactivate, entity_id=self.sensor.entity_id, new='off', @@ -89,47 +62,48 @@ class Motion(Hass): oneshot=True, cause='motion off' ) + self.log(f'Waiting for motion to stop on {self.sensor.friendly_name} for {duration}') - async def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None): + def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None): """Called when the light turns on """ - self.log('Light on callback') - await self.cancel_motion_callback(new='on') - await self.listen_motion_off(await self.app.off_duration()) + if new is not None: + self.log(f'{entity} turned on') + duration = self.app.off_duration() + self.listen_motion_off(duration) - async def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None): + def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None): """Called when the light turns off """ - self.log('Light off callback') - await self.cancel_motion_callback(new='off') - await self.listen_motion_on() + self.log(f'{entity} turned off') + self.listen_motion_on() - async def get_app_callbacks(self, name: str = None): + def get_app_callbacks(self, name: str = None): """Gets all the callbacks associated with the app """ name = name or self.name callbacks = { handle: info - for app_name, callbacks in (await self.get_callback_entries()).items() + for app_name, callbacks in self.get_callback_entries().items() for handle, info in callbacks.items() if app_name == name } return callbacks - async def get_sensor_callbacks(self): + def get_sensor_callbacks(self): return { handle: info - for handle, info in (await self.get_app_callbacks()).items() + for handle, info in self.get_app_callbacks().items() if info['entity'] == self.sensor.entity_id } - async def cancel_motion_callback(self, new: str): - callbacks = await self.get_sensor_callbacks() - # self.log(f'Found {len(callbacks)}') + def cancel_motion_callback(self): + callbacks = self.get_sensor_callbacks() + # self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}') for handle, info in callbacks.items(): entity = info["entity"] - new_match = re.match('new=(?P.*?)\s', info['kwargs']) - # self.log(f'{handle}: {info["entity"]}: {info["kwargs"]}') - if new_match is not None and new_match.group("new") == new: - await self.cancel_listen_state(handle) - self.log(f'cancelled: {await self.friendly_name(entity)}: {new}') + kwargs = info['kwargs'] + if (m := re.match('new=(?P.*?)\s', kwargs)) is not None: + new = m.group('new') + self.cancel_listen_state(handle) + self.log(f'cancelled callback for sensor {entity} turning {new}') diff --git a/room_control.py b/room_control.py index 21e48b1..f2333bf 100755 --- a/room_control.py +++ b/room_control.py @@ -1,13 +1,123 @@ -import asyncio from copy import deepcopy -from datetime import time, timedelta -from typing import List +from dataclasses import dataclass, field +from datetime import datetime, time, timedelta +from pathlib import Path +from typing import Dict, List import appdaemon.utils as utils -import astral +import yaml from appdaemon.entity import Entity from appdaemon.plugins.hass.hassapi import Hass from appdaemon.plugins.mqtt.mqttapi import Mqtt +from astral import SunDirection + + +def str_to_timedelta(input_str: str) -> timedelta: + try: + hours, minutes, seconds = map(int, input_str.split(':')) + return timedelta(hours=hours, minutes=minutes, seconds=seconds) + except Exception: + return timedelta() + +@dataclass +class RoomState: + scene: Dict[str, Dict[str, str | int]] + off_duration: timedelta = None + time: time = None + time_fmt: List[str] = field(default_factory=lambda : ['%H:%M:%S', '%I:%M:%S %p'], repr=False) + elevation: int | float = None + direction: SunDirection = None + + def __post_init__(self): + if isinstance(self.time, str): + for fmt in self.time_fmt: + try: + self.time = datetime.strptime(self.time, fmt).time() + except: + continue + else: + break + + if self.elevation is not None: + assert self.direction is not None, f'Elevation setting requires a direction' + if self.direction.lower() == 'setting': + self.direction = SunDirection.SETTING + elif self.direction.lower() == 'rising': + self.direction = SunDirection.RISING + else: + raise ValueError(f'Invalid sun direction: {self.direction}') + + if isinstance(self.elevation, str): + self.elevation = float(self.elevation) + + if isinstance(self.off_duration, str): + self.off_duration = str_to_timedelta(self.off_duration) + + @classmethod + def from_json(cls, json_input): + return cls(**json_input) + + +@dataclass +class RoomConfig: + states: List[RoomState] + off_duration: timedelta = None + + def __post_init__(self): + if isinstance(self.off_duration, str): + self.off_duration = str_to_timedelta(self.off_duration) + + @classmethod + def from_app_config(cls, app_cfg: Dict[str, Dict]): + if 'off_duration' in app_cfg: + kwargs = {'off_duration': app_cfg['off_duration']} + else: + kwargs = {} + + self = cls( + states=[RoomState.from_json(s) for s in app_cfg['states']], + **kwargs + ) + + return self + + @classmethod + def from_yaml(cls, yaml_path: Path, app_name: str): + with yaml_path.open('r') as f: + cfg: Dict = yaml.load(f, Loader=yaml.SafeLoader)[app_name] + return cls.from_app_config(cfg) + + def sort_states(self): + """Should only be called after all the times have been resolved + """ + assert all(isinstance(state.time, time) for state in self.states), 'Times have not all been resolved yet' + self.states = sorted(self.states, key=lambda s: s.time, reverse=True) + + def current_state(self, now: time) -> RoomState: + time_fmt = "%I:%M:%S %p" + print(now.strftime(time_fmt)) + + self.sort_states() + for state in self.states: + if state.time <= now: + return state + else: + # self.log(f'Defaulting to first state') + return self.states[0] + + def current_scene(self, now: time) -> Dict: + state = self.current_state(now) + return state.scene + + def current_off_duration(self, now: time) -> timedelta: + state = self.current_state(now) + if state.off_duration is None: + if self.off_duration is None: + raise ValueError(f'Need an off duration') + else: + return self.off_duration + else: + return state.off_duration class RoomController(Hass, Mqtt): @@ -20,26 +130,31 @@ class RoomController(Hass, Mqtt): - When the light comes on, check if it's attributes match what they should, given the time. """ - async def initialize(self): - self.app_entities = await self.gather_app_entities() - self.log(f'entities: {self.app_entities}') - await self.refresh_state_times() - await self.run_daily(callback=self.refresh_state_times, start='00:00:00') + @property + def states(self) -> List[RoomState]: + return self._room_config.states + + @states.setter + def states(self, new: List[RoomState]): + assert all(isinstance(s, RoomState) for s in new), f'Invalid: {new}' + self._room_config.states = new - # if (ha_button := self.args.get('ha_button')): - # self.log(f'Setting up input button: {self.friendly_name(ha_button)}') - # self.listen_state(callback=self.activate_any_on, entity_id=ha_button) + def initialize(self): + self.app_entities = self.gather_app_entities() + # self.log(f'entities: {self.app_entities}') + self.refresh_state_times() + self.run_daily(callback=self.refresh_state_times, start='00:00:00') - async def gather_app_entities(self) -> List[str]: + def gather_app_entities(self) -> List[str]: """Returns a list of all the entities involved in any of the states """ - async def async_generator(): + def generator(): for settings in deepcopy(self.args['states']): if (scene := settings.get('scene')): if isinstance(scene, str): assert scene.startswith('scene.'), f"Scene definition must start with 'scene.' for app {self.name}" entity: Entity = self.get_entity(scene) - entity_state = await entity.get_state('all') + entity_state = entity.get_state('all') attributes = entity_state['attributes'] for entity in attributes['entity_id']: yield entity @@ -49,100 +164,100 @@ class RoomController(Hass, Mqtt): else: yield self.args['entity'] - entities = [e async for e in async_generator()] + entities = [e for e in generator()] return set(entities) - async def refresh_state_times(self, *args, **kwargs): + def refresh_state_times(self, *args, **kwargs): """Resets the `self.states` attribute to a newly parsed version of the states. - Parsed states have an absolute time for a certain day. + Parsed states have an absolute time for the current day. """ # re-parse the state strings into times for the current day - self.states = await self.parse_states() + self._room_config = RoomConfig.from_app_config(self.args) + self.log(f'{len(self._room_config.states)} states in the RoomConfig') + + for state in self._room_config.states: + if state.time is None and state.elevation is not None: + state.time = self.AD.sched.location.time_at_elevation( + elevation=state.elevation, + direction=state.direction + ).time() + elif isinstance(state.time, str): + state.time = self.parse_time(state.time) + + assert isinstance(state.time, time), f'Invalid time: {state.time}' + + for state in self.states: + self.log(f'State: {state.time.strftime("%I:%M:%S %p")} {state.scene}') + + self.states = sorted(self.states, key=lambda s: s.time, reverse=True) # schedule the transitions - for state in self.states: - dt = str(state['time'])[:8] - self.log(f'Scheduling transition at: {dt}') + for state in self.states[::-1]: + # t: time = state['time'] + t: time = state.time try: - await self.run_at(callback=self.activate_any_on, start=dt) + self.run_at( + callback=self.activate_any_on, + start=t.strftime('%H:%M:%S'), + cause='scheduled transition' + ) except ValueError: # happens when the callback time is in the past pass except Exception as e: self.log(f'Failed with {type(e)}: {e}') - async def parse_states(self): - async def gen(): - for state in deepcopy(self.args['states']): - if (time := state.get('time')): - state['time'] = await self.parse_time(time) + def current_state(self, now: time = None) -> RoomState: + if self.sleep_bool(): + self.log(f'sleep: active') + if (state := self.args.get('sleep_state')): + return RoomState.from_json(state) + else: + return RoomState(scene={}) + else: + now = now or self.get_now().time() + self.log(f'Getting state for {now}', level='DEBUG') - elif isinstance((elevation := state.get('elevation')), (int, float)): - assert 'direction' in state, f'State needs a direction if it has an elevation' + state = self._room_config.current_state(now) + self.log(f'Current state: {state}', level='DEBUG') - if state['direction'] == 'rising': - dir = astral.SunDirection.RISING - elif state['direction'] == 'setting': - dir = astral.SunDirection.SETTING - else: - raise ValueError(f'Invalid sun direction: {state["direction"]}') + return state - state['time'] = self.AD.sched.location.time_at_elevation( - elevation=elevation, direction=dir - ).time() + def current_scene(self, now: time = None) -> Dict[str, Dict[str, str | int]]: + state = self.current_state(now) + assert isinstance(state, RoomState) + self.log(f'Current scene: {state}') + return state.scene - else: - raise ValueError(f'Missing time') - - yield state - - states = [s async for s in gen()] - states = sorted(states, key=lambda s: s['time']) + def app_entity_states(self) -> Dict[str, str]: + states = { + entity: self.get_state(entity) + for entity in self.app_entities + } return states - async def current_state(self, time: time = None): - if (await self.sleep_bool()): - if (state := self.args.get('sleep_state')): - return state - else: - return {} - else: - now = await self.get_now() - self.log(f'Getting state for datetime: {now}') - time = time or (await self.get_now()).time() - for state in self.states[::-1]: - if state['time'] <= time: - self.log(f'Selected state from {state["time"]}') - return state - else: - return self.states[-1] - - async def current_scene(self, time: time = None): - if (state := (await self.current_state(time=time))) is not None: - return state['scene'] - - @property def all_off(self) -> bool: """"All off" is the logic opposite of "any on" Returns: bool: Whether all the lights associated with the app are off """ - return all(self.get_state(entity) != 'on' for entity in self.app_entities) + states = self.app_entity_states() + return all(state != 'on' for entity, state in states.items()) - @property def any_on(self) -> bool: """"Any on" is the logic opposite of "all off" Returns: bool: Whether any of the lights associated with the app are on """ - return any(self.get_state(entity) == 'on' for entity in self.app_entities) + states = self.app_entity_states() + return any(state == 'on' for entity, state in states.items()) - async def sleep_bool(self) -> bool: + def sleep_bool(self) -> bool: if (sleep_var := self.args.get('sleep')): - return (await self.get_state(sleep_var)) == 'on' + return self.get_state(sleep_var) == 'on' else: return False @@ -156,7 +271,7 @@ class RoomController(Hass, Mqtt): # else: # raise ValueError('Sleep variable is undefined') - async def off_duration(self) -> timedelta: + def off_duration(self, now: time = None) -> timedelta: """Determines the time that the motion sensor has to be clear before deactivating Priority: @@ -166,21 +281,22 @@ class RoomController(Hass, Mqtt): - Sleep - 0 """ - current_state = await self.current_state() - duration_str = current_state.get( - 'off_duration', - self.args.get('off_duration', '00:00:00') - ) - - try: - hours, minutes, seconds = map(int, duration_str.split(':')) - return timedelta(hours=hours, minutes=minutes, seconds=seconds) - except Exception: + sleep_mode_active = self.sleep_bool() + if sleep_mode_active: + self.log(f'Sleeping mode active: {sleep_mode_active}') return timedelta() + else: + now = now or self.get_now().time() + return self._room_config.current_off_duration(now) - async def activate(self, *args, cause: str = 'unknown', **kwargs): + def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None): + if kwargs is not None: + cause = kwargs.get('cause', 'unknown') + else: + cause = 'unknown' + self.log(f'Activating: {cause}') - scene = await self.current_scene() + scene = self.current_scene() if isinstance(scene, str): self.turn_on(scene) @@ -202,26 +318,25 @@ class RoomController(Hass, Mqtt): else: self.log(f'ERROR: unknown scene: {scene}') - async def activate_all_off(self, *args, **kwargs): - """Activate if all of the entities are off + def activate_all_off(self, *args, **kwargs): + """Activate if all of the entities are off. Args and kwargs are passed directly to self.activate() """ - if self.all_off: - self.log(f'Activate all off kwargs: {kwargs}') - await self.activate(*args, **kwargs) + if self.all_off(): + self.activate(*args, **kwargs) else: self.log(f'Skipped activating - everything is not off') - async def activate_any_on(self, *args, **kwargs): - """Activate if any of the entities are on + def activate_any_on(self, *args, **kwargs): + """Activate if any of the entities are on. Args and kwargs are passed directly to self.activate() """ - if self.any_on: - await self.activate(*args, **kwargs) + if self.any_on(): + self.activate(*args, **kwargs) else: self.log(f'Skipped activating - everything is off') - def deactivate(self, *args, cause: str = 'unknown', **kwargs): + def deactivate(self, entity = None, attribute = None, old = None, new = None, kwargs = None): + cause = kwargs.get('cause', 'unknown') self.log(f'Deactivating: {cause}') - for entity in self.app_entities: - self.turn_off(entity) - self.log(f'Turned off {entity}') - + for e in self.app_entities: + self.turn_off(e) + self.log(f'Turned off {e}')