WIP breaking out motion
This commit is contained in:
@@ -1,7 +1,6 @@
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
|
||||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||||
from room_control import RoomController
|
from room_control import RoomController
|
||||||
|
|
||||||
|
|||||||
120
motion.py
Normal file
120
motion.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from appdaemon.entity import Entity
|
||||||
|
from appdaemon.plugins.hass.hassapi import Hass
|
||||||
|
from room_control import RoomController
|
||||||
|
|
||||||
|
|
||||||
|
class Motion(Hass):
|
||||||
|
@property
|
||||||
|
def sensor(self) -> Entity:
|
||||||
|
return self.get_entity(self.args['sensor'])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sensor_state(self) -> bool:
|
||||||
|
return self.sensor.state == 'on'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def off_duration(self) -> timedelta:
|
||||||
|
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||||
|
|
||||||
|
Priority:
|
||||||
|
- Value in scene definition
|
||||||
|
- Default value
|
||||||
|
- Normal - value in app definition
|
||||||
|
- Sleep - 0
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
duration_str = self.args.get('off_duration', '00:00:00')
|
||||||
|
|
||||||
|
try:
|
||||||
|
hours, minutes, seconds = map(int, duration_str.split(':'))
|
||||||
|
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||||
|
except Exception:
|
||||||
|
return timedelta()
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
self.app: RoomController = self.get_app(self.args['app'])
|
||||||
|
self.log(f'Connected to app {self.app.name}')
|
||||||
|
|
||||||
|
self.listen_state(self.callback_light_on, self.args['entity'], new='on')
|
||||||
|
self.listen_state(self.callback_light_on, self.args['entity'], new='off')
|
||||||
|
|
||||||
|
self.listen_motion_on()
|
||||||
|
self.listen_motion_off(self.off_duration)
|
||||||
|
|
||||||
|
def listen_motion_on(self):
|
||||||
|
"""Sets up the motion on callback to activate the room
|
||||||
|
"""
|
||||||
|
self.log(f'Waiting for motion on {self.sensor.friendly_name}')
|
||||||
|
self.motion_on_handle = self.listen_state(
|
||||||
|
callback=self.app.activate_all_off,
|
||||||
|
entity_id=self.sensor.entity_id,
|
||||||
|
new='on',
|
||||||
|
oneshot=True,
|
||||||
|
cause='motion on'
|
||||||
|
)
|
||||||
|
|
||||||
|
def listen_motion_off(self, duration: timedelta):
|
||||||
|
"""Sets up the motion off callback to deactivate the room
|
||||||
|
"""
|
||||||
|
self.log(f'Waiting for motion to stop on {self.sensor.friendly_name}')
|
||||||
|
self.motion_off_handle = self.listen_state(
|
||||||
|
callback=self.app.deactivate,
|
||||||
|
entity_id=self.sensor.entity_id,
|
||||||
|
new='off',
|
||||||
|
duration=duration.total_seconds(),
|
||||||
|
oneshot=True,
|
||||||
|
cause='motion off'
|
||||||
|
)
|
||||||
|
|
||||||
|
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
||||||
|
"""Called when the light turns on
|
||||||
|
"""
|
||||||
|
self.log('Light on callback')
|
||||||
|
self.cancel_motion_callback(new='on')
|
||||||
|
self.listen_motion_off(self.off_duration)
|
||||||
|
|
||||||
|
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
||||||
|
"""Called when the light turns off
|
||||||
|
"""
|
||||||
|
self.log('Light off callback')
|
||||||
|
self.cancel_motion_callback(new='off')
|
||||||
|
self.listen_motion_on()
|
||||||
|
|
||||||
|
def sync_state(self):
|
||||||
|
"""Synchronizes the callbacks with the state of the light.
|
||||||
|
|
||||||
|
Essentially mimics the `state_change` callback based on the current state of the light.
|
||||||
|
"""
|
||||||
|
if self.sensor_state:
|
||||||
|
self.callback_light_on()
|
||||||
|
else:
|
||||||
|
self.callback_light_off()
|
||||||
|
|
||||||
|
def get_app_callbacks(self, name: str = None):
|
||||||
|
"""Gets all the callbacks associated with the app
|
||||||
|
"""
|
||||||
|
name = name or self.name
|
||||||
|
for app_name, callbacks in self.get_callback_entries().items():
|
||||||
|
if app_name == name:
|
||||||
|
return callbacks
|
||||||
|
|
||||||
|
def get_motion_callback(self):
|
||||||
|
app_callbacks = self.get_app_callbacks()
|
||||||
|
if app_callbacks is not None:
|
||||||
|
return {
|
||||||
|
handle: info
|
||||||
|
for handle, info in app_callbacks.items()
|
||||||
|
if info['entity'] == self.sensor
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def cancel_motion_callback(self, new: str):
|
||||||
|
for handle, info in self.get_motion_callback().items():
|
||||||
|
if f'new={new}' in info['kwargs']:
|
||||||
|
self.log(f'Cancelling callback for {info}')
|
||||||
|
self.cancel_listen_state(handle)
|
||||||
|
|
||||||
115
room_control.py
115
room_control.py
@@ -25,44 +25,17 @@ class RoomController(Hass, Mqtt):
|
|||||||
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||||
|
|
||||||
# sets up motion callbacks
|
# sets up motion callbacks
|
||||||
self.state_change_handle = self.listen_state(self.handle_state_change, self.entity)
|
# self.state_change_handle = self.listen_state(self.handle_state_change, self.entity)
|
||||||
self.sync_state()
|
# self.sync_state()
|
||||||
|
|
||||||
if (ha_button := self.args.get('ha_button')):
|
if (ha_button := self.args.get('ha_button')):
|
||||||
self.log(f'Setting up input button: {self.friendly_name(ha_button)}')
|
self.log(f'Setting up input button: {self.friendly_name(ha_button)}')
|
||||||
self.listen_state(callback=self.activate_any_on, entity_id=ha_button)
|
self.listen_state(callback=self.activate_any_on, entity_id=ha_button)
|
||||||
|
|
||||||
@property
|
|
||||||
def sensor(self) -> str:
|
|
||||||
return self.args['sensor']
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def entity(self) -> str:
|
def entity(self) -> str:
|
||||||
return self.args['entity']
|
return self.args['entity']
|
||||||
|
|
||||||
@property
|
|
||||||
def off_duration(self) -> timedelta:
|
|
||||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
|
||||||
|
|
||||||
Priority:
|
|
||||||
- Value in scene definition
|
|
||||||
- Default value
|
|
||||||
- Normal - value in app definition
|
|
||||||
- Sleep - 0
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
duration_str = self.current_state().get(
|
|
||||||
'off_duration',
|
|
||||||
self.args.get('off_duration', '00:00:00')
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
hours, minutes, seconds = map(int, duration_str.split(':'))
|
|
||||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
||||||
except Exception:
|
|
||||||
return timedelta()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def entity_state(self) -> bool:
|
def entity_state(self) -> bool:
|
||||||
return self.get_state(self.entity) == 'on'
|
return self.get_state(self.entity) == 'on'
|
||||||
@@ -101,10 +74,6 @@ class RoomController(Hass, Mqtt):
|
|||||||
else:
|
else:
|
||||||
raise TypeError(f'Invalid type: {type(new)}: {new}')
|
raise TypeError(f'Invalid type: {type(new)}: {new}')
|
||||||
|
|
||||||
@property
|
|
||||||
def is_stateful(self):
|
|
||||||
return 'scene' in self.args and isinstance(self.args['scene'], (list, dict))
|
|
||||||
|
|
||||||
def parse_states(self):
|
def parse_states(self):
|
||||||
def gen():
|
def gen():
|
||||||
for state in deepcopy(self.args['states']):
|
for state in deepcopy(self.args['states']):
|
||||||
@@ -211,65 +180,6 @@ class RoomController(Hass, Mqtt):
|
|||||||
else:
|
else:
|
||||||
raise ValueError('Sleep variable is undefined')
|
raise ValueError('Sleep variable is undefined')
|
||||||
|
|
||||||
def sync_state(self):
|
|
||||||
"""Synchronizes the callbacks with the state of the light.
|
|
||||||
|
|
||||||
Essentially mimics the `state_change` callback based on the current state of the light.
|
|
||||||
"""
|
|
||||||
if self.entity_state:
|
|
||||||
self.callback_light_on()
|
|
||||||
else:
|
|
||||||
self.callback_light_off()
|
|
||||||
|
|
||||||
def listen_motion_on(self):
|
|
||||||
"""Sets up the motion on callback to activate the room
|
|
||||||
"""
|
|
||||||
self.log(f'Waiting for motion on {self.friendly_name(self.sensor)} to turn on {self.friendly_name(self.entity)}')
|
|
||||||
self.motion_on_handle = self.listen_state(
|
|
||||||
callback=self.activate,
|
|
||||||
entity_id=self.sensor,
|
|
||||||
new='on',
|
|
||||||
oneshot=True,
|
|
||||||
cause='motion on'
|
|
||||||
)
|
|
||||||
|
|
||||||
def listen_motion_off(self, duration: timedelta):
|
|
||||||
"""Sets up the motion off callback to deactivate the room
|
|
||||||
"""
|
|
||||||
self.log(f'Waiting for motion to stop on {self.friendly_name(self.sensor)} for {duration} to turn off {self.friendly_name(self.entity)}')
|
|
||||||
self.motion_off_handle = self.listen_state(
|
|
||||||
callback=self.deactivate,
|
|
||||||
entity_id=self.sensor,
|
|
||||||
new='off',
|
|
||||||
duration=duration.total_seconds(),
|
|
||||||
oneshot=True,
|
|
||||||
cause='motion off'
|
|
||||||
)
|
|
||||||
|
|
||||||
def handle_state_change(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
"""Callback attached to the state change of the light.
|
|
||||||
"""
|
|
||||||
if new == 'on':
|
|
||||||
self.callback_light_on(entity, attribute, old, new, kwargs)
|
|
||||||
elif new == 'off':
|
|
||||||
self.callback_light_off(entity, attribute, old, new, kwargs)
|
|
||||||
else:
|
|
||||||
self.log(f'Unknown state: {new}')
|
|
||||||
|
|
||||||
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
"""Called when the light turns on
|
|
||||||
"""
|
|
||||||
self.log('Light on callback')
|
|
||||||
self.cancel_motion_callback(new='on')
|
|
||||||
self.listen_motion_off(self.off_duration)
|
|
||||||
|
|
||||||
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
"""Called when the light turns off
|
|
||||||
"""
|
|
||||||
self.log('Light off callback')
|
|
||||||
self.cancel_motion_callback(new='off')
|
|
||||||
self.listen_motion_on()
|
|
||||||
|
|
||||||
def activate(self, *args, cause: str = 'unknown', **kwargs):
|
def activate(self, *args, cause: str = 'unknown', **kwargs):
|
||||||
self.log(f'Activating: {cause}')
|
self.log(f'Activating: {cause}')
|
||||||
scene = self.current_scene()
|
scene = self.current_scene()
|
||||||
@@ -295,7 +205,7 @@ class RoomController(Hass, Mqtt):
|
|||||||
self.log(f'ERROR: unknown scene: {scene}')
|
self.log(f'ERROR: unknown scene: {scene}')
|
||||||
|
|
||||||
def activate_all_off(self, *args, **kwargs):
|
def activate_all_off(self, *args, **kwargs):
|
||||||
"""Activate if all of the entities are on
|
"""Activate if all of the entities are off
|
||||||
"""
|
"""
|
||||||
if self.all_off:
|
if self.all_off:
|
||||||
self.activate(*args, **kwargs)
|
self.activate(*args, **kwargs)
|
||||||
@@ -335,22 +245,3 @@ class RoomController(Hass, Mqtt):
|
|||||||
pass
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log(f'Failed with {type(e)}: {e}')
|
self.log(f'Failed with {type(e)}: {e}')
|
||||||
|
|
||||||
def get_app_callbacks(self, name: str = None):
|
|
||||||
name = name or self.name
|
|
||||||
for app_name, callbacks in self.get_callback_entries().items():
|
|
||||||
if app_name == name:
|
|
||||||
return callbacks
|
|
||||||
|
|
||||||
def get_motion_callback(self):
|
|
||||||
return {
|
|
||||||
handle: info
|
|
||||||
for handle, info in self.get_app_callbacks().items()
|
|
||||||
if info['entity'] == self.sensor
|
|
||||||
}
|
|
||||||
|
|
||||||
def cancel_motion_callback(self, new: str):
|
|
||||||
for handle, info in self.get_motion_callback().items():
|
|
||||||
if f'new={new}' in info['kwargs']:
|
|
||||||
self.log(f'Cancelling callback for {info}')
|
|
||||||
self.cancel_listen_state(handle)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user