From 6005608e89603cddf64bcf10a6af714b9688924d Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Fri, 24 Nov 2023 23:54:27 -0600 Subject: [PATCH] WIP breaking out motion --- button.py | 1 - motion.py | 120 ++++++++++++++++++++++++++++++++++++++++++++++++ room_control.py | 117 ++-------------------------------------------- 3 files changed, 124 insertions(+), 114 deletions(-) create mode 100644 motion.py diff --git a/button.py b/button.py index ca9f2c3..b52fbb8 100644 --- a/button.py +++ b/button.py @@ -1,7 +1,6 @@ import json -from appdaemon.plugins.hass.hassapi import Hass from appdaemon.plugins.mqtt.mqttapi import Mqtt from room_control import RoomController diff --git a/motion.py b/motion.py new file mode 100644 index 0000000..912be78 --- /dev/null +++ b/motion.py @@ -0,0 +1,120 @@ +from datetime import timedelta + +from appdaemon.entity import Entity +from appdaemon.plugins.hass.hassapi import Hass +from room_control import RoomController + + +class Motion(Hass): + @property + def sensor(self) -> Entity: + return self.get_entity(self.args['sensor']) + + @property + def sensor_state(self) -> bool: + return self.sensor.state == 'on' + + @property + def off_duration(self) -> timedelta: + """Determines the time that the motion sensor has to be clear before deactivating + + Priority: + - Value in scene definition + - Default value + - Normal - value in app definition + - Sleep - 0 + + """ + + duration_str = self.args.get('off_duration', '00:00:00') + + try: + hours, minutes, seconds = map(int, duration_str.split(':')) + return timedelta(hours=hours, minutes=minutes, seconds=seconds) + except Exception: + return timedelta() + + def initialize(self): + self.app: RoomController = self.get_app(self.args['app']) + self.log(f'Connected to app {self.app.name}') + + self.listen_state(self.callback_light_on, self.args['entity'], new='on') + self.listen_state(self.callback_light_on, self.args['entity'], new='off') + + self.listen_motion_on() + self.listen_motion_off(self.off_duration) + + def listen_motion_on(self): + """Sets up the motion on callback to activate the room + """ + self.log(f'Waiting for motion on {self.sensor.friendly_name}') + self.motion_on_handle = self.listen_state( + callback=self.app.activate_all_off, + entity_id=self.sensor.entity_id, + new='on', + oneshot=True, + cause='motion on' + ) + + def listen_motion_off(self, duration: timedelta): + """Sets up the motion off callback to deactivate the room + """ + self.log(f'Waiting for motion to stop on {self.sensor.friendly_name}') + self.motion_off_handle = self.listen_state( + callback=self.app.deactivate, + entity_id=self.sensor.entity_id, + new='off', + duration=duration.total_seconds(), + oneshot=True, + cause='motion off' + ) + + def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None): + """Called when the light turns on + """ + self.log('Light on callback') + self.cancel_motion_callback(new='on') + self.listen_motion_off(self.off_duration) + + def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None): + """Called when the light turns off + """ + self.log('Light off callback') + self.cancel_motion_callback(new='off') + self.listen_motion_on() + + def sync_state(self): + """Synchronizes the callbacks with the state of the light. + + Essentially mimics the `state_change` callback based on the current state of the light. + """ + if self.sensor_state: + self.callback_light_on() + else: + self.callback_light_off() + + def get_app_callbacks(self, name: str = None): + """Gets all the callbacks associated with the app + """ + name = name or self.name + for app_name, callbacks in self.get_callback_entries().items(): + if app_name == name: + return callbacks + + def get_motion_callback(self): + app_callbacks = self.get_app_callbacks() + if app_callbacks is not None: + return { + handle: info + for handle, info in app_callbacks.items() + if info['entity'] == self.sensor + } + else: + return {} + + def cancel_motion_callback(self, new: str): + for handle, info in self.get_motion_callback().items(): + if f'new={new}' in info['kwargs']: + self.log(f'Cancelling callback for {info}') + self.cancel_listen_state(handle) + diff --git a/room_control.py b/room_control.py index 6f697d6..903c5d0 100755 --- a/room_control.py +++ b/room_control.py @@ -25,44 +25,17 @@ class RoomController(Hass, Mqtt): self.run_daily(callback=self.refresh_state_times, start='00:00:00') # sets up motion callbacks - self.state_change_handle = self.listen_state(self.handle_state_change, self.entity) - self.sync_state() + # self.state_change_handle = self.listen_state(self.handle_state_change, self.entity) + # self.sync_state() if (ha_button := self.args.get('ha_button')): self.log(f'Setting up input button: {self.friendly_name(ha_button)}') self.listen_state(callback=self.activate_any_on, entity_id=ha_button) - - @property - def sensor(self) -> str: - return self.args['sensor'] - + @property def entity(self) -> str: return self.args['entity'] - @property - def off_duration(self) -> timedelta: - """Determines the time that the motion sensor has to be clear before deactivating - - Priority: - - Value in scene definition - - Default value - - Normal - value in app definition - - Sleep - 0 - - """ - - duration_str = self.current_state().get( - 'off_duration', - self.args.get('off_duration', '00:00:00') - ) - - try: - hours, minutes, seconds = map(int, duration_str.split(':')) - return timedelta(hours=hours, minutes=minutes, seconds=seconds) - except Exception: - return timedelta() - @property def entity_state(self) -> bool: return self.get_state(self.entity) == 'on' @@ -101,10 +74,6 @@ class RoomController(Hass, Mqtt): else: raise TypeError(f'Invalid type: {type(new)}: {new}') - @property - def is_stateful(self): - return 'scene' in self.args and isinstance(self.args['scene'], (list, dict)) - def parse_states(self): def gen(): for state in deepcopy(self.args['states']): @@ -211,65 +180,6 @@ class RoomController(Hass, Mqtt): else: raise ValueError('Sleep variable is undefined') - def sync_state(self): - """Synchronizes the callbacks with the state of the light. - - Essentially mimics the `state_change` callback based on the current state of the light. - """ - if self.entity_state: - self.callback_light_on() - else: - self.callback_light_off() - - def listen_motion_on(self): - """Sets up the motion on callback to activate the room - """ - self.log(f'Waiting for motion on {self.friendly_name(self.sensor)} to turn on {self.friendly_name(self.entity)}') - self.motion_on_handle = self.listen_state( - callback=self.activate, - entity_id=self.sensor, - new='on', - oneshot=True, - cause='motion on' - ) - - def listen_motion_off(self, duration: timedelta): - """Sets up the motion off callback to deactivate the room - """ - self.log(f'Waiting for motion to stop on {self.friendly_name(self.sensor)} for {duration} to turn off {self.friendly_name(self.entity)}') - self.motion_off_handle = self.listen_state( - callback=self.deactivate, - entity_id=self.sensor, - new='off', - duration=duration.total_seconds(), - oneshot=True, - cause='motion off' - ) - - def handle_state_change(self, entity=None, attribute=None, old=None, new=None, kwargs=None): - """Callback attached to the state change of the light. - """ - if new == 'on': - self.callback_light_on(entity, attribute, old, new, kwargs) - elif new == 'off': - self.callback_light_off(entity, attribute, old, new, kwargs) - else: - self.log(f'Unknown state: {new}') - - def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None): - """Called when the light turns on - """ - self.log('Light on callback') - self.cancel_motion_callback(new='on') - self.listen_motion_off(self.off_duration) - - def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None): - """Called when the light turns off - """ - self.log('Light off callback') - self.cancel_motion_callback(new='off') - self.listen_motion_on() - def activate(self, *args, cause: str = 'unknown', **kwargs): self.log(f'Activating: {cause}') scene = self.current_scene() @@ -295,7 +205,7 @@ class RoomController(Hass, Mqtt): self.log(f'ERROR: unknown scene: {scene}') def activate_all_off(self, *args, **kwargs): - """Activate if all of the entities are on + """Activate if all of the entities are off """ if self.all_off: self.activate(*args, **kwargs) @@ -335,22 +245,3 @@ class RoomController(Hass, Mqtt): pass except Exception as e: self.log(f'Failed with {type(e)}: {e}') - - def get_app_callbacks(self, name: str = None): - name = name or self.name - for app_name, callbacks in self.get_callback_entries().items(): - if app_name == name: - return callbacks - - def get_motion_callback(self): - return { - handle: info - for handle, info in self.get_app_callbacks().items() - if info['entity'] == self.sensor - } - - def cancel_motion_callback(self, new: str): - for handle, info in self.get_motion_callback().items(): - if f'new={new}' in info['kwargs']: - self.log(f'Cancelling callback for {info}') - self.cancel_listen_state(handle)