Compare commits
27 Commits
1c5edf6cc7
...
dataclass
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c1990b632 | ||
|
|
826e866a52 | ||
|
|
6ef45830fb | ||
|
|
53cb116372 | ||
|
|
e973205b06 | ||
|
|
f00421d273 | ||
|
|
061ad8d322 | ||
|
|
12ac6c4cc4 | ||
|
|
dda9d2b501 | ||
|
|
f088a23da7 | ||
|
|
4f11d9bdcc | ||
|
|
cf9f0f3244 | ||
|
|
e0caaedc15 | ||
|
|
0d70649bb8 | ||
|
|
145aeca667 | ||
|
|
8b00faceb6 | ||
|
|
148645094a | ||
|
|
e659629c71 | ||
|
|
4886eb29d6 | ||
|
|
7bd46ffc42 | ||
|
|
945abc91c3 | ||
|
|
af28cda9a5 | ||
|
|
afc5e45642 | ||
|
|
8a5431a72b | ||
|
|
50b79c8d13 | ||
|
|
5f9218311c | ||
|
|
7de5dfa3a8 |
50
button.py
Normal file
50
button.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||
from room_control import RoomController
|
||||
|
||||
|
||||
class Button(Mqtt):
|
||||
async def initialize(self):
|
||||
self.app: RoomController = await self.get_app(self.args['app'])
|
||||
self.setup_buttons(self.args['button'])
|
||||
|
||||
def setup_buttons(self, buttons):
|
||||
if isinstance(buttons, list):
|
||||
for button in buttons:
|
||||
self.setup_button(button)
|
||||
else:
|
||||
self.setup_button(buttons)
|
||||
|
||||
def setup_button(self, name: str):
|
||||
topic = f'zigbee2mqtt/{name}'
|
||||
self.mqtt_subscribe(topic, namespace='mqtt')
|
||||
self.listen_event(self.handle_button, "MQTT_MESSAGE", topic=topic, namespace='mqtt', button=name)
|
||||
self.log(f'"{topic}" controls app {self.app.name}')
|
||||
|
||||
async def handle_button(self, event_name, data, kwargs):
|
||||
try:
|
||||
payload = json.loads(data['payload'])
|
||||
action = payload['action']
|
||||
except json.JSONDecodeError:
|
||||
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
|
||||
except KeyError as e:
|
||||
return
|
||||
else:
|
||||
if action != '':
|
||||
await self.handle_action(action)
|
||||
|
||||
async def handle_action(self, action: str):
|
||||
if action == 'single':
|
||||
self.log(f' {action.upper()} '.center(50, '='))
|
||||
state = await self.get_state(self.args['ref_entity'])
|
||||
kwargs = {
|
||||
'kwargs': {'cause': f'button single click: toggle while {state}'}
|
||||
}
|
||||
if state == 'on':
|
||||
self.app.deactivate(**kwargs)
|
||||
else:
|
||||
await self.app.activate(**kwargs)
|
||||
else:
|
||||
pass
|
||||
8
door.py
Normal file
8
door.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from room_control import RoomController
|
||||
|
||||
|
||||
class Door(Hass):
|
||||
async def initialize(self):
|
||||
app: RoomController = await self.get_app(self.args['app'])
|
||||
await self.listen_state(app.activate_all_off, entity_id=self.args['door'], new='on', cause='door open')
|
||||
111
motion.py
Normal file
111
motion.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import re
|
||||
from datetime import timedelta
|
||||
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from room_control import RoomController
|
||||
|
||||
from appdaemon import utils
|
||||
|
||||
|
||||
class Motion(Hass):
|
||||
@property
|
||||
def sensor(self) -> Entity:
|
||||
return self.get_entity(self.args['sensor'])
|
||||
|
||||
@property
|
||||
def sensor_state(self) -> bool:
|
||||
return self.sensor.state == 'on'
|
||||
|
||||
@property
|
||||
def ref_entity(self) -> Entity:
|
||||
return self.get_entity(self.args['ref_entity'])
|
||||
|
||||
@property
|
||||
async def ref_entity_state(self) -> bool:
|
||||
return (await self.ref_entity.get_state()) == 'on'
|
||||
|
||||
async def initialize(self):
|
||||
self.app: RoomController = await self.get_app(self.args['app'])
|
||||
self.log(f'Connected to app {self.app.name}')
|
||||
|
||||
base_kwargs = dict(
|
||||
entity_id=self.ref_entity.entity_id,
|
||||
immediate=True, # avoids needing to sync the state
|
||||
)
|
||||
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
|
||||
self.listen_state(**base_kwargs, attribute='brightness', callback=self.callback_light_on)
|
||||
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
|
||||
|
||||
async def listen_motion_on(self):
|
||||
"""Sets up the motion on callback to activate the room
|
||||
"""
|
||||
await self.cancel_motion_callback()
|
||||
await self.listen_state(
|
||||
callback=self.app.activate_all_off,
|
||||
entity_id=self.sensor.entity_id,
|
||||
new='on',
|
||||
oneshot=True,
|
||||
cause='motion on'
|
||||
)
|
||||
self.log(f'Waiting for motion on {self.sensor.friendly_name}')
|
||||
|
||||
async def listen_motion_off(self, duration: timedelta):
|
||||
"""Sets up the motion off callback to deactivate the room
|
||||
"""
|
||||
await self.cancel_motion_callback()
|
||||
await self.listen_state(
|
||||
callback=self.app.deactivate,
|
||||
entity_id=self.sensor.entity_id,
|
||||
new='off',
|
||||
duration=duration.total_seconds(),
|
||||
oneshot=True,
|
||||
cause='motion off'
|
||||
)
|
||||
self.log(f'Waiting for motion to stop on {self.sensor.friendly_name} for {duration}')
|
||||
|
||||
@utils.sync_wrapper
|
||||
async def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
||||
"""Called when the light turns on
|
||||
"""
|
||||
if new is not None:
|
||||
self.log(f'{entity} turned on')
|
||||
duration = await self.app.off_duration()
|
||||
await self.listen_motion_off(duration)
|
||||
|
||||
@utils.sync_wrapper
|
||||
async def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
||||
"""Called when the light turns off
|
||||
"""
|
||||
self.log(f'{entity} turned off')
|
||||
await self.listen_motion_on()
|
||||
|
||||
async def get_app_callbacks(self, name: str = None):
|
||||
"""Gets all the callbacks associated with the app
|
||||
"""
|
||||
name = name or self.name
|
||||
callbacks = {
|
||||
handle: info
|
||||
for app_name, callbacks in (await self.get_callback_entries()).items()
|
||||
for handle, info in callbacks.items()
|
||||
if app_name == name
|
||||
}
|
||||
return callbacks
|
||||
|
||||
async def get_sensor_callbacks(self):
|
||||
return {
|
||||
handle: info
|
||||
for handle, info in (await self.get_app_callbacks()).items()
|
||||
if info['entity'] == self.sensor.entity_id
|
||||
}
|
||||
|
||||
async def cancel_motion_callback(self):
|
||||
callbacks = await self.get_sensor_callbacks()
|
||||
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
|
||||
for handle, info in callbacks.items():
|
||||
entity = info["entity"]
|
||||
kwargs = info['kwargs']
|
||||
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
|
||||
new = m.group('new')
|
||||
await self.cancel_listen_state(handle)
|
||||
self.log(f'cancelled callback for sensor {entity} turning {new}')
|
||||
@@ -1,27 +0,0 @@
|
||||
[project]
|
||||
name = "room-control"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
authors = [
|
||||
{ name = "John Lancaster", email = "32917998+jsl12@users.noreply.github.com" }
|
||||
]
|
||||
dependencies = [
|
||||
"appdaemon>=4.4.2",
|
||||
"rich>=13.7.1",
|
||||
"pydantic>=2.7.1",
|
||||
"ruff>=0.4.2",
|
||||
]
|
||||
readme = "README.md"
|
||||
requires-python = ">= 3.8,<3.12"
|
||||
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
|
||||
# [tool.setuptools.package-data]
|
||||
# mypkg = ["*.yaml"]
|
||||
|
||||
[tool.setuptools.data-files]
|
||||
config = ["config/default_config.yaml"]
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = 'single'
|
||||
345
room_control.py
Executable file
345
room_control.py
Executable file
@@ -0,0 +1,345 @@
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, time, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import appdaemon.utils as utils
|
||||
import yaml
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||
from astral import SunDirection
|
||||
|
||||
|
||||
def str_to_timedelta(input_str: str) -> timedelta:
|
||||
try:
|
||||
hours, minutes, seconds = map(int, input_str.split(':'))
|
||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
except Exception:
|
||||
return timedelta()
|
||||
|
||||
@dataclass
|
||||
class RoomState:
|
||||
scene: Dict[str, Dict[str, str | int]]
|
||||
off_duration: timedelta = None
|
||||
time: time = None
|
||||
time_fmt: List[str] = field(default_factory=lambda : ['%H:%M:%S', '%I:%M:%S %p'], repr=False)
|
||||
elevation: int | float = None
|
||||
direction: SunDirection = None
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.time, str):
|
||||
for fmt in self.time_fmt:
|
||||
try:
|
||||
self.time = datetime.strptime(self.time, fmt).time()
|
||||
except:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
if self.elevation is not None:
|
||||
assert self.direction is not None, f'Elevation setting requires a direction'
|
||||
if self.direction.lower() == 'setting':
|
||||
self.direction = SunDirection.SETTING
|
||||
elif self.direction.lower() == 'rising':
|
||||
self.direction = SunDirection.RISING
|
||||
else:
|
||||
raise ValueError(f'Invalid sun direction: {self.direction}')
|
||||
|
||||
if isinstance(self.elevation, str):
|
||||
self.elevation = float(self.elevation)
|
||||
|
||||
if isinstance(self.off_duration, str):
|
||||
self.off_duration = str_to_timedelta(self.off_duration)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_input):
|
||||
return cls(**json_input)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RoomConfig:
|
||||
states: List[RoomState]
|
||||
off_duration: timedelta = None
|
||||
|
||||
def __post_init__(self):
|
||||
if isinstance(self.off_duration, str):
|
||||
self.off_duration = str_to_timedelta(self.off_duration)
|
||||
|
||||
@classmethod
|
||||
def from_app_config(cls, app_cfg: Dict[str, Dict]):
|
||||
if 'off_duration' in app_cfg:
|
||||
kwargs = {'off_duration': app_cfg['off_duration']}
|
||||
else:
|
||||
kwargs = {}
|
||||
|
||||
self = cls(
|
||||
states=[RoomState.from_json(s) for s in app_cfg['states']],
|
||||
**kwargs
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls, yaml_path: Path, app_name: str):
|
||||
with yaml_path.open('r') as f:
|
||||
cfg: Dict = yaml.load(f, Loader=yaml.SafeLoader)[app_name]
|
||||
return cls.from_app_config(cfg)
|
||||
|
||||
def sort_states(self):
|
||||
"""Should only be called after all the times have been resolved
|
||||
"""
|
||||
assert all(isinstance(state.time, time) for state in self.states), 'Times have not all been resolved yet'
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
def current_state(self, now: time) -> RoomState:
|
||||
time_fmt = "%I:%M:%S %p"
|
||||
print(now.strftime(time_fmt))
|
||||
|
||||
self.sort_states()
|
||||
for state in self.states:
|
||||
if state.time <= now:
|
||||
return state
|
||||
else:
|
||||
# self.log(f'Defaulting to first state')
|
||||
return self.states[0]
|
||||
|
||||
def current_scene(self, now: time) -> Dict:
|
||||
state = self.current_state(now)
|
||||
return state.scene
|
||||
|
||||
def current_off_duration(self, now: time) -> timedelta:
|
||||
state = self.current_state(now)
|
||||
if state.off_duration is None:
|
||||
if self.off_duration is None:
|
||||
raise ValueError(f'Need an off duration')
|
||||
else:
|
||||
return self.off_duration
|
||||
else:
|
||||
return state.off_duration
|
||||
|
||||
|
||||
class RoomController(Hass, Mqtt):
|
||||
"""Class for linking an light with a motion sensor.
|
||||
|
||||
- Separate the turning on and turning off functions.
|
||||
- Use the state change of the light to set up the event for changing to the other state
|
||||
- `handle_on`
|
||||
- `handle_off`
|
||||
- When the light comes on, check if it's attributes match what they should, given the time.
|
||||
"""
|
||||
|
||||
@property
|
||||
def states(self) -> List[RoomState]:
|
||||
return self._room_config.states
|
||||
|
||||
@states.setter
|
||||
def states(self, new: List[RoomState]):
|
||||
assert all(isinstance(s, RoomState) for s in new), f'Invalid: {new}'
|
||||
self._room_config.states = new
|
||||
|
||||
async def initialize(self):
|
||||
self.app_entities = await self.gather_app_entities()
|
||||
# self.log(f'entities: {self.app_entities}')
|
||||
await self.refresh_state_times()
|
||||
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||
|
||||
async def gather_app_entities(self) -> List[str]:
|
||||
"""Returns a list of all the entities involved in any of the states
|
||||
"""
|
||||
async def async_generator():
|
||||
for settings in deepcopy(self.args['states']):
|
||||
if (scene := settings.get('scene')):
|
||||
if isinstance(scene, str):
|
||||
assert scene.startswith('scene.'), f"Scene definition must start with 'scene.' for app {self.name}"
|
||||
entity: Entity = self.get_entity(scene)
|
||||
entity_state = await entity.get_state('all')
|
||||
attributes = entity_state['attributes']
|
||||
for entity in attributes['entity_id']:
|
||||
yield entity
|
||||
else:
|
||||
for key in scene.keys():
|
||||
yield key
|
||||
else:
|
||||
yield self.args['entity']
|
||||
|
||||
entities = [e async for e in async_generator()]
|
||||
return set(entities)
|
||||
|
||||
async def refresh_state_times(self, *args, **kwargs):
|
||||
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
||||
|
||||
Parsed states have an absolute time for the current day.
|
||||
"""
|
||||
# re-parse the state strings into times for the current day
|
||||
self._room_config = RoomConfig.from_app_config(self.args)
|
||||
self.log(f'{len(self._room_config.states)} states in the RoomConfig')
|
||||
|
||||
for state in self._room_config.states:
|
||||
if state.time is None and state.elevation is not None:
|
||||
state.time = self.AD.sched.location.time_at_elevation(
|
||||
elevation=state.elevation,
|
||||
direction=state.direction
|
||||
).time()
|
||||
elif isinstance(state.time, str):
|
||||
state.time = await self.parse_time(state.time)
|
||||
|
||||
assert isinstance(state.time, time), f'Invalid time: {state.time}'
|
||||
|
||||
for state in self.states:
|
||||
self.log(f'State: {state.time.strftime("%I:%M:%S %p")} {state.scene}')
|
||||
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
# schedule the transitions
|
||||
for state in self.states[::-1]:
|
||||
# t: time = state['time']
|
||||
t: time = state.time
|
||||
try:
|
||||
await self.run_at(
|
||||
callback=self.activate_any_on,
|
||||
start=t.strftime('%H:%M:%S'),
|
||||
cause='scheduled transition'
|
||||
)
|
||||
except ValueError:
|
||||
# happens when the callback time is in the past
|
||||
pass
|
||||
except Exception as e:
|
||||
self.log(f'Failed with {type(e)}: {e}')
|
||||
|
||||
async def current_state(self, now: time = None) -> RoomState:
|
||||
if (await self.sleep_bool()):
|
||||
self.log(f'sleep: active')
|
||||
if (state := self.args.get('sleep_state')):
|
||||
return RoomState.from_json(state)
|
||||
else:
|
||||
return RoomState(scene={})
|
||||
else:
|
||||
now = now or (await self.get_now()).time()
|
||||
self.log(f'Getting state for {now}', level='DEBUG')
|
||||
|
||||
state = self._room_config.current_state(now)
|
||||
self.log(f'Current state: {state}', level='DEBUG')
|
||||
|
||||
return state
|
||||
|
||||
async def current_scene(self, now: time = None) -> Dict[str, Dict[str, str | int]]:
|
||||
state = await self.current_state(now)
|
||||
assert isinstance(state, RoomState)
|
||||
self.log(f'Current scene: {state}')
|
||||
return state.scene
|
||||
|
||||
async def app_entity_states(self) -> Dict[str, str]:
|
||||
states = {
|
||||
entity: (await self.get_state(entity))
|
||||
for entity in self.app_entities
|
||||
}
|
||||
return states
|
||||
|
||||
async def all_off(self) -> bool:
|
||||
""""All off" is the logic opposite of "any on"
|
||||
|
||||
Returns:
|
||||
bool: Whether all the lights associated with the app are off
|
||||
"""
|
||||
states = await self.app_entity_states()
|
||||
return all(state != 'on' for entity, state in states.items())
|
||||
|
||||
async def any_on(self) -> bool:
|
||||
""""Any on" is the logic opposite of "all off"
|
||||
|
||||
Returns:
|
||||
bool: Whether any of the lights associated with the app are on
|
||||
"""
|
||||
states = await self.app_entity_states()
|
||||
return any(state == 'on' for entity, state in states.items())
|
||||
|
||||
async def sleep_bool(self) -> bool:
|
||||
if (sleep_var := self.args.get('sleep')):
|
||||
return (await self.get_state(sleep_var)) == 'on'
|
||||
else:
|
||||
return False
|
||||
|
||||
# @sleep_bool.setter
|
||||
# def sleep_bool(self, val) -> bool:
|
||||
# if (sleep_var := self.args.get('sleep')):
|
||||
# if isinstance(val, str):
|
||||
# self.set_state(sleep_var, state=val)
|
||||
# elif isinstance(val, bool):
|
||||
# self.set_state(sleep_var, state='on' if val else 'off')
|
||||
# else:
|
||||
# raise ValueError('Sleep variable is undefined')
|
||||
|
||||
async def off_duration(self, now: time = None) -> timedelta:
|
||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||
|
||||
Priority:
|
||||
- Value in scene definition
|
||||
- Default value
|
||||
- Normal - value in app definition
|
||||
- Sleep - 0
|
||||
|
||||
"""
|
||||
sleep_mode_active = await self.sleep_bool()
|
||||
if sleep_mode_active:
|
||||
self.log(f'Sleeping mode active: {sleep_mode_active}')
|
||||
return timedelta()
|
||||
else:
|
||||
now = now or (await self.get_now()).time()
|
||||
return self._room_config.current_off_duration(now)
|
||||
|
||||
@utils.sync_wrapper
|
||||
async def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
||||
if kwargs is not None:
|
||||
cause = kwargs.get('cause', 'unknown')
|
||||
else:
|
||||
cause = 'unknown'
|
||||
|
||||
self.log(f'Activating: {cause}')
|
||||
scene = await self.current_scene()
|
||||
|
||||
if isinstance(scene, str):
|
||||
self.turn_on(scene)
|
||||
self.log(f'Turned on scene: {scene}')
|
||||
|
||||
elif isinstance(scene, dict):
|
||||
# makes setting the state to 'on' optional in the yaml definition
|
||||
for entity, settings in scene.items():
|
||||
if 'state' not in settings:
|
||||
scene[entity]['state'] = 'on'
|
||||
|
||||
self.call_service('scene/apply', entities=scene, transition=0)
|
||||
self.log(f'Applied scene: {scene}')
|
||||
|
||||
elif scene is None:
|
||||
self.log(f'No scene, ignoring...')
|
||||
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
|
||||
# self.callback_light_off()
|
||||
else:
|
||||
self.log(f'ERROR: unknown scene: {scene}')
|
||||
|
||||
@utils.sync_wrapper
|
||||
async def activate_all_off(self, *args, **kwargs):
|
||||
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()
|
||||
"""
|
||||
if (await self.all_off()):
|
||||
self.activate(*args, **kwargs)
|
||||
else:
|
||||
self.log(f'Skipped activating - everything is not off')
|
||||
|
||||
@utils.sync_wrapper
|
||||
async def activate_any_on(self, *args, **kwargs):
|
||||
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()
|
||||
"""
|
||||
if (await self.any_on()):
|
||||
self.activate(*args, **kwargs)
|
||||
else:
|
||||
self.log(f'Skipped activating - everything is off')
|
||||
|
||||
def deactivate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
|
||||
cause = kwargs.get('cause', 'unknown')
|
||||
self.log(f'Deactivating: {cause}')
|
||||
for e in self.app_entities:
|
||||
self.turn_off(e)
|
||||
self.log(f'Turned off {e}')
|
||||
@@ -1,26 +0,0 @@
|
||||
version: 1
|
||||
disable_existing_loggers: false
|
||||
formatters:
|
||||
rich:
|
||||
style: "{"
|
||||
format: "[room]{room}[/] {message}"
|
||||
# format: "{message}"
|
||||
datefmt: '%H:%M:%S.%f'
|
||||
rich_component:
|
||||
style: "{"
|
||||
format: "[room]{room}[/] [component]{component}[/] {message}"
|
||||
# format: "{message}"
|
||||
datefmt: '%H:%M:%S.%f'
|
||||
handlers:
|
||||
rich:
|
||||
formatter: rich
|
||||
'()': 'rich.logging.RichHandler'
|
||||
markup: True
|
||||
show_path: false
|
||||
omit_repeated_times: false
|
||||
rich_component:
|
||||
formatter: rich_component
|
||||
'()': 'rich.logging.RichHandler'
|
||||
markup: True
|
||||
show_path: false
|
||||
omit_repeated_times: false
|
||||
@@ -1,6 +0,0 @@
|
||||
from .room_control import RoomController
|
||||
from .motion import Motion
|
||||
from .button import Button
|
||||
from .door import Door
|
||||
|
||||
__all__ = ['RoomController', 'Motion', 'Button', 'Door']
|
||||
@@ -1,39 +0,0 @@
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict
|
||||
|
||||
from appdaemon.adapi import ADAPI
|
||||
|
||||
|
||||
@dataclass
|
||||
class Button:
|
||||
adapi: ADAPI
|
||||
button_name: str
|
||||
|
||||
def __post_init__(self):
|
||||
self.log = self.adapi.log
|
||||
topic = f'zigbee2mqtt/{self.button_name}'
|
||||
self.adapi.listen_event(
|
||||
self.handle_button,
|
||||
'MQTT_MESSAGE',
|
||||
topic=topic,
|
||||
namespace='mqtt',
|
||||
button=self.button_name,
|
||||
)
|
||||
self.log(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
|
||||
|
||||
def handle_button(self, event_name: str, data: Dict[str, Any], kwargs: Dict[str, Any]):
|
||||
try:
|
||||
payload = json.loads(data['payload'])
|
||||
action = payload['action']
|
||||
except json.JSONDecodeError:
|
||||
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
|
||||
except KeyError:
|
||||
return
|
||||
else:
|
||||
if isinstance(action, str) and action != '':
|
||||
self.log(f'Action: [yellow]{action}[/]')
|
||||
if action == 'single':
|
||||
self.adapi.call_service(
|
||||
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
|
||||
)
|
||||
@@ -1,237 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
import logging.config
|
||||
import re
|
||||
from abc import ABC
|
||||
from dataclasses import asdict
|
||||
from importlib.resources import files
|
||||
|
||||
import yaml
|
||||
from rich.console import Console
|
||||
from rich.highlighter import RegexHighlighter
|
||||
from rich.theme import Theme
|
||||
|
||||
console = Console(
|
||||
width=100,
|
||||
theme=Theme(
|
||||
{
|
||||
'log.time': 'none',
|
||||
# 'logging.level.info': 'none',
|
||||
'room': 'italic bright_cyan',
|
||||
'component': 'dark_violet',
|
||||
'friendly_name': 'yellow',
|
||||
'light': 'light_slate_blue',
|
||||
'sensor': 'green',
|
||||
'time': 'yellow',
|
||||
'z2m': 'bright_black',
|
||||
'topic': 'chartreuse2',
|
||||
'true': 'green',
|
||||
'false': 'red',
|
||||
}
|
||||
),
|
||||
log_time_format='%Y-%m-%d %I:%M:%S %p',
|
||||
# highlighter=RCHighlighter(),
|
||||
)
|
||||
|
||||
|
||||
class RCHighlighter(RegexHighlighter):
|
||||
highlights = [
|
||||
r'(?P<light>(light|switch)\.\w+)',
|
||||
r'(?P<time>\d+:\d+:\d+)',
|
||||
r'(?P<z2m>zigbee2mqtt/)',
|
||||
r'(?P<sensor>binary_sensor\.\w+)',
|
||||
# r"'state': '(?P<on>on)|(?P<off>off)'"
|
||||
r'(?P<true>True)|(?P<false>False)',
|
||||
]
|
||||
|
||||
|
||||
def load_rich_config(
|
||||
room: str = None, component: str = None, level: str = 'INFO'
|
||||
) -> logging.LoggerAdapter:
|
||||
logger_name = f'Appdaemon.{room}'
|
||||
|
||||
if component is not None:
|
||||
logger_name += f'.{component}'
|
||||
|
||||
with files('room_control').joinpath('.rich_logging.yaml').open('r') as f:
|
||||
RICH_CFG = yaml.safe_load(f)
|
||||
|
||||
RICH_CFG['handlers']['rich']['console'] = console
|
||||
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
|
||||
RICH_CFG['handlers']['rich_component']['console'] = console
|
||||
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
|
||||
RICH_CFG['loggers'] = {
|
||||
logger_name: {
|
||||
'handlers': ['rich' if component is None else 'rich_component'],
|
||||
'propagate': False,
|
||||
'level': level,
|
||||
}
|
||||
}
|
||||
|
||||
extra = {'room': room}
|
||||
|
||||
if component is not None:
|
||||
extra['component'] = component
|
||||
|
||||
logging.config.dictConfig(RICH_CFG)
|
||||
logger = logging.getLogger(logger_name)
|
||||
adapter = logging.LoggerAdapter(logger, extra)
|
||||
return adapter
|
||||
|
||||
|
||||
RICH_HANDLER_CFG = {
|
||||
'()': 'rich.logging.RichHandler',
|
||||
'markup': True,
|
||||
'show_path': False,
|
||||
# 'show_time': False,
|
||||
'omit_repeated_times': False,
|
||||
'console': console,
|
||||
'highlighter': RCHighlighter(),
|
||||
}
|
||||
|
||||
|
||||
class ContextSettingFilter(logging.Filter, ABC):
|
||||
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
|
||||
for name, val in asdict(self).items():
|
||||
if val is not None:
|
||||
setattr(record, name, val)
|
||||
return record
|
||||
|
||||
|
||||
# @dataclass
|
||||
# class RoomControllerFilter(ContextSettingFilter):
|
||||
# room: str
|
||||
# component: Optional[str] = None
|
||||
|
||||
|
||||
class RoomFilter(logging.Filter):
|
||||
"""Used to filter out messages that have a component field because they will have already been printed by their respective logger."""
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
return not hasattr(record, 'component')
|
||||
|
||||
|
||||
# class RoomControllerFormatter(logging.Formatter):
|
||||
# def format(self, record: logging.LogRecord):
|
||||
# return super().format(record)
|
||||
|
||||
|
||||
class UnMarkupFilter(logging.Filter):
|
||||
md_regex = re.compile(r'(?P<open>\[.*?\])(?P<text>.*?)(?P<close>\[\/\])')
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
|
||||
record.msg = self.md_regex.sub(r'\g<text>', record.msg)
|
||||
return record
|
||||
|
||||
|
||||
class JSONFormatter(logging.Formatter):
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
return json.dumps(record.__dict__)
|
||||
|
||||
|
||||
def room_logging_config(name: str):
|
||||
return {
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False,
|
||||
'filters': {
|
||||
'room': {'()': 'room_control.console.RoomFilter'},
|
||||
'unmarkup': {'()': 'room_control.console.UnMarkupFilter'},
|
||||
},
|
||||
'formatters': {
|
||||
'rich_room': {
|
||||
'style': '{',
|
||||
'format': '[room]{room}[/] {message}',
|
||||
'datefmt': '%H:%M:%S.%f',
|
||||
},
|
||||
'file': {
|
||||
'style': '{',
|
||||
'format': '{asctime}.{msecs:03.0f} {levelname:8} {name}: {message}',
|
||||
'datefmt': '%Y-%m-%d %H:%M:%S',
|
||||
},
|
||||
'json': {'()': 'room_control.console.JSONFormatter'},
|
||||
},
|
||||
'handlers': {
|
||||
'rich_room': {
|
||||
'formatter': 'rich_room',
|
||||
'filters': ['room'],
|
||||
**RICH_HANDLER_CFG,
|
||||
},
|
||||
'file': {
|
||||
'filters': ['unmarkup'],
|
||||
'formatter': 'file',
|
||||
'class': 'logging.handlers.RotatingFileHandler',
|
||||
# 'class': 'logging.FileHandler',
|
||||
'filename': f'/logs/{name}.log',
|
||||
'maxBytes': 1000000,
|
||||
'backupCount': 3,
|
||||
},
|
||||
'json': {
|
||||
'filters': ['unmarkup'],
|
||||
'formatter': 'json',
|
||||
'filename': f'/logs/{name}.jsonl',
|
||||
'class': 'logging.handlers.RotatingFileHandler',
|
||||
'maxBytes': 1000000,
|
||||
'backupCount': 3,
|
||||
},
|
||||
},
|
||||
'loggers': {
|
||||
f'AppDaemon.{name}': {
|
||||
'level': 'INFO',
|
||||
'propagate': False,
|
||||
'handlers': [
|
||||
'rich_room',
|
||||
'file',
|
||||
'json',
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# def component_logging_config(parent_room: str, component: str):
|
||||
# logger_name = f'AppDaemon.{parent_room}.{component}'
|
||||
|
||||
# cfg = load_rich_config()
|
||||
|
||||
# LOG_CFG = {
|
||||
# 'version': 1,
|
||||
# 'disable_existing_loggers': False,
|
||||
# 'formatters': {
|
||||
# 'rich_component': {
|
||||
# 'style': '{',
|
||||
# 'format': '[room]{room}[/] [component]{component}[/] {message}',
|
||||
# 'datefmt': '%H:%M:%S.%f',
|
||||
# },
|
||||
# },
|
||||
# 'handlers': {
|
||||
# 'rich_component': {
|
||||
# 'formatter': 'rich_component',
|
||||
# **RICH_HANDLER_CFG,
|
||||
# },
|
||||
# },
|
||||
# 'loggers': {
|
||||
# logger_name: {
|
||||
# # 'level': 'INFO',
|
||||
# 'propagate': True,
|
||||
# 'handlers': ['rich_component'],
|
||||
# }
|
||||
# },
|
||||
# }
|
||||
# return LOG_CFG
|
||||
|
||||
|
||||
# def setup_component_logging(self) -> logging.Logger:
|
||||
# """Creates a logger for a subcomponent with a RichHandler"""
|
||||
# component = type(self).__name__
|
||||
# parent = self.args['app']
|
||||
# cfg_dict = component_logging_config(parent_room=parent, component=component)
|
||||
# logger_name = next(iter(cfg_dict['loggers']))
|
||||
|
||||
# try:
|
||||
# logging.config.dictConfig(cfg_dict)
|
||||
# except Exception:
|
||||
# console.print_exception()
|
||||
# else:
|
||||
# logger = logging.getLogger(logger_name)
|
||||
# logger = logging.LoggerAdapter(logger, {'room': parent, 'component': component})
|
||||
# return logger
|
||||
@@ -1,25 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
from logging import Logger, LoggerAdapter
|
||||
|
||||
from appdaemon.adapi import ADAPI
|
||||
|
||||
|
||||
@dataclass
|
||||
class Door:
|
||||
adapi: ADAPI
|
||||
entity_id: str
|
||||
|
||||
def __post_init__(self):
|
||||
self.logger = LoggerAdapter(
|
||||
self.adapi.logger.logger.getChild('door'), self.adapi.logger.extra
|
||||
)
|
||||
|
||||
self.adapi.listen_state(
|
||||
callback=lambda *args, **kwargs: self.call_service(
|
||||
f'{self.adapi.name}/activate_all_off', namespace='controller'
|
||||
),
|
||||
entity_id=self.entity_id,
|
||||
new='on',
|
||||
cause='door open',
|
||||
)
|
||||
self.logger.debug(f'Initialized door for [room]{self.adapi.name}[/]')
|
||||
@@ -1,135 +0,0 @@
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Dict, List, Optional, Self
|
||||
|
||||
import yaml
|
||||
from astral import SunDirection
|
||||
from pydantic import BaseModel, BeforeValidator, Field, model_validator, root_validator
|
||||
from pydantic_core import PydanticCustomError
|
||||
from rich.console import Console, ConsoleOptions, RenderResult
|
||||
from rich.table import Column, Table
|
||||
|
||||
|
||||
def str_to_timedelta(input_str: str) -> datetime.timedelta:
|
||||
try:
|
||||
hours, minutes, seconds = map(int, input_str.split(':'))
|
||||
return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
except Exception:
|
||||
return datetime.timedelta()
|
||||
|
||||
|
||||
def str_to_direction(input_str: str) -> SunDirection:
|
||||
try:
|
||||
return getattr(SunDirection, input_str.upper())
|
||||
except AttributeError:
|
||||
raise PydanticCustomError(
|
||||
'invalid_dir', 'Invalid sun direction: {dir}', dict(dir=input_str)
|
||||
)
|
||||
|
||||
|
||||
OffDuration = Annotated[datetime.timedelta, BeforeValidator(str_to_timedelta)]
|
||||
|
||||
|
||||
class State(BaseModel):
|
||||
state: bool = True
|
||||
brightness: Optional[int] = Field(default=None, ge=1, le=255)
|
||||
color_temp: Optional[int] = Field(default=None, ge=200, le=650)
|
||||
rgb_color: Optional[list[int]] = Field(default=None, min_length=3, max_length=3)
|
||||
|
||||
|
||||
class ApplyKwargs(BaseModel):
|
||||
"""Arguments to call with the 'scene/apply' service"""
|
||||
|
||||
entities: Dict[str, State]
|
||||
transition: Optional[int] = None
|
||||
|
||||
|
||||
class ControllerStateConfig(BaseModel):
|
||||
time: Optional[str | datetime.time | datetime.datetime] = None
|
||||
elevation: Optional[float] = None
|
||||
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None
|
||||
off_duration: Optional[OffDuration] = None
|
||||
scene: dict[str, State] | str = Field(default_factory=dict)
|
||||
|
||||
@model_validator(mode='before')
|
||||
def check_args(cls, values):
|
||||
time, elevation = values.get('time'), values.get('elevation')
|
||||
# if time is not None and elevation is not None:
|
||||
# raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
|
||||
if elevation is not None and 'direction' not in values:
|
||||
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
|
||||
return values
|
||||
|
||||
def to_apply_kwargs(self, transition: int = None):
|
||||
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
|
||||
|
||||
|
||||
class RoomControllerConfig(BaseModel):
|
||||
states: List[ControllerStateConfig] = Field(default_factory=list)
|
||||
off_duration: Optional[OffDuration] = None
|
||||
sleep_state: Optional[ControllerStateConfig] = None
|
||||
rich: Optional[str] = None
|
||||
manual_mode: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls: Self, yaml_path: Path) -> Self:
|
||||
yaml_path = Path(yaml_path)
|
||||
with yaml_path.open('r') as f:
|
||||
for appname, app_cfg in yaml.load(f, Loader=yaml.SafeLoader).items():
|
||||
if app_cfg['class'] == 'RoomController':
|
||||
return cls.model_validate(app_cfg)
|
||||
|
||||
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
|
||||
table = Table(
|
||||
Column('Time', width=15),
|
||||
Column('Scene'),
|
||||
highlight=True,
|
||||
padding=1,
|
||||
collapse_padding=True,
|
||||
)
|
||||
for state in self.states:
|
||||
if isinstance(state, str):
|
||||
pass
|
||||
elif isinstance(state, dict):
|
||||
scene_json = state.to_apply_kwargs()
|
||||
lines = [
|
||||
f'{name:20}{state["state"]} Brightness: {state.get("brightness", ""):<4} Temp: {state.get("color_temp", "")}'
|
||||
for name, state in scene_json['entities'].items()
|
||||
]
|
||||
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
|
||||
yield table
|
||||
|
||||
def sort_states(self):
|
||||
"""Should only be called after all the times have been resolved"""
|
||||
assert all(
|
||||
isinstance(state.time, datetime.time) for state in self.states
|
||||
), 'Times have not all been resolved yet'
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
def current_state(self, now: datetime.time) -> ControllerStateConfig:
|
||||
self.sort_states()
|
||||
for state in self.states:
|
||||
if state.time <= now:
|
||||
return state
|
||||
else:
|
||||
return self.states[0]
|
||||
|
||||
def current_scene(self, now: datetime.time) -> Dict:
|
||||
state = self.current_state(now)
|
||||
return state.scene
|
||||
|
||||
def current_off_duration(self, now: datetime.time) -> datetime.timedelta:
|
||||
state = self.current_state(now)
|
||||
if state.off_duration is None:
|
||||
if self.off_duration is None:
|
||||
raise ValueError('Need an off duration')
|
||||
else:
|
||||
return self.off_duration
|
||||
else:
|
||||
return state.off_duration
|
||||
|
||||
|
||||
class ButtonConfig(BaseModel):
|
||||
app: str
|
||||
button: str | List[str]
|
||||
ref_entity: str
|
||||
@@ -1,177 +0,0 @@
|
||||
import re
|
||||
from datetime import timedelta
|
||||
from logging import Logger
|
||||
from typing import TYPE_CHECKING, Literal, Optional
|
||||
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from . import console
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from room_control import RoomController
|
||||
|
||||
|
||||
class CallbackEntry(BaseModel):
|
||||
entity: str
|
||||
event: Optional[str] = None
|
||||
type: Literal['state', 'event']
|
||||
kwargs: str
|
||||
function: str
|
||||
name: str
|
||||
pin_app: bool
|
||||
pin_thread: int
|
||||
|
||||
|
||||
Callbacks = dict[str, dict[str, CallbackEntry]]
|
||||
|
||||
|
||||
class Motion(Hass):
|
||||
logger: Logger
|
||||
app: 'RoomController'
|
||||
|
||||
@property
|
||||
def sensor(self) -> Entity:
|
||||
return self.get_entity(self.args['sensor'])
|
||||
|
||||
@property
|
||||
def sensor_state(self) -> bool:
|
||||
return self.sensor.state == 'on'
|
||||
|
||||
@property
|
||||
def ref_entity(self) -> Entity:
|
||||
return self.get_entity(self.args['ref_entity'])
|
||||
|
||||
@property
|
||||
def ref_entity_state(self) -> bool:
|
||||
return self.ref_entity.get_state() == 'on'
|
||||
|
||||
@property
|
||||
def state_mismatch(self) -> bool:
|
||||
return self.sensor_state != self.ref_entity_state
|
||||
|
||||
def initialize(self):
|
||||
self.app: 'RoomController' = self.get_app(self.args['app'])
|
||||
self.logger = console.load_rich_config(self.app.name, type(self).__name__)
|
||||
|
||||
assert self.entity_exists(self.args['sensor'])
|
||||
assert self.entity_exists(self.args['ref_entity'])
|
||||
|
||||
base_kwargs = dict(
|
||||
entity_id=self.ref_entity.entity_id,
|
||||
immediate=True, # avoids needing to sync the state
|
||||
)
|
||||
|
||||
if self.state_mismatch:
|
||||
self.log(
|
||||
f'Sensor is {self.sensor_state} ' f'and light is {self.ref_entity_state}',
|
||||
level='WARNING',
|
||||
)
|
||||
if self.sensor_state:
|
||||
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
|
||||
|
||||
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
|
||||
self.listen_state(
|
||||
**base_kwargs,
|
||||
attribute='brightness',
|
||||
callback=self.callback_light_on,
|
||||
)
|
||||
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
|
||||
|
||||
for handle, cb in self.callbacks():
|
||||
self.log(f'Handle [yellow]{handle[:4]}[/]: {cb.function}', level='DEBUG')
|
||||
|
||||
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
||||
|
||||
def callbacks(self):
|
||||
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
|
||||
self_callbacks = self.get_callback_entries().get(self.name, {})
|
||||
for handle, cb_dict in self_callbacks.items():
|
||||
try:
|
||||
yield handle, CallbackEntry.model_validate(cb_dict)
|
||||
except ValidationError as e:
|
||||
self.logger.error('Error parsing callback dictionary')
|
||||
self.logger.error(e)
|
||||
|
||||
def listen_motion_on(self):
|
||||
"""Sets up the motion on callback to activate the room"""
|
||||
self.cancel_motion_callback()
|
||||
self.listen_state(
|
||||
callback=self.app.activate_all_off,
|
||||
entity_id=self.sensor.entity_id,
|
||||
new='on',
|
||||
oneshot=True,
|
||||
cause='motion on',
|
||||
)
|
||||
self.log(f'Waiting for sensor motion on [friendly_name]{self.sensor.friendly_name}[/]')
|
||||
if self.sensor_state:
|
||||
self.log(
|
||||
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is already on',
|
||||
level='WARNING',
|
||||
)
|
||||
|
||||
def listen_motion_off(self, duration: timedelta):
|
||||
"""Sets up the motion off callback to deactivate the room"""
|
||||
self.cancel_motion_callback()
|
||||
self.listen_state(
|
||||
callback=self.app.deactivate,
|
||||
entity_id=self.sensor.entity_id,
|
||||
new='off',
|
||||
duration=duration.total_seconds(),
|
||||
oneshot=True,
|
||||
cause='motion off',
|
||||
)
|
||||
self.log(
|
||||
f'Waiting for sensor [friendly_name]{self.sensor.friendly_name}[/] to be clear for {duration}'
|
||||
)
|
||||
|
||||
if not self.sensor_state:
|
||||
self.log(
|
||||
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is currently off',
|
||||
level='WARNING',
|
||||
)
|
||||
|
||||
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
||||
"""Called when the light turns on"""
|
||||
if new is not None:
|
||||
self.log(f'Detected {entity} turning on', level='DEBUG')
|
||||
duration = self.app.off_duration()
|
||||
self.listen_motion_off(duration)
|
||||
|
||||
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
||||
"""Called when the light turns off"""
|
||||
self.log(f'Detected {entity} turning off', level='DEBUG')
|
||||
self.listen_motion_on()
|
||||
|
||||
def get_app_callbacks(self, name: str = None):
|
||||
"""Gets all the callbacks associated with the app"""
|
||||
name = name or self.name
|
||||
callbacks = {
|
||||
handle: info
|
||||
for app_name, callbacks in self.get_callback_entries().items()
|
||||
for handle, info in callbacks.items()
|
||||
if app_name == name
|
||||
}
|
||||
return callbacks
|
||||
|
||||
def get_sensor_callbacks(self):
|
||||
return {
|
||||
handle: info
|
||||
for handle, info in self.get_app_callbacks().items()
|
||||
if info['entity'] == self.sensor.entity_id
|
||||
}
|
||||
|
||||
def cancel_motion_callback(self):
|
||||
callbacks = self.get_sensor_callbacks()
|
||||
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
|
||||
for handle, info in callbacks.items():
|
||||
entity = info['entity']
|
||||
kwargs = info['kwargs']
|
||||
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
|
||||
new = m.group('new')
|
||||
self.cancel_listen_state(handle)
|
||||
self.log(
|
||||
f'cancelled callback for sensor {entity} turning {new}',
|
||||
level='DEBUG',
|
||||
)
|
||||
@@ -1,267 +0,0 @@
|
||||
import datetime
|
||||
import logging
|
||||
import logging.config
|
||||
import traceback
|
||||
from functools import wraps
|
||||
from typing import Any, Dict, List, Set
|
||||
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from astral.location import Location
|
||||
|
||||
from . import console
|
||||
from .button import Button
|
||||
from .door import Door
|
||||
from .model import ControllerStateConfig, RoomControllerConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RoomController(Hass):
|
||||
"""Class for linking room's lights with a motion sensor.
|
||||
|
||||
- Separate the turning on and turning off functions.
|
||||
- Use the state change of the light to set up the event for changing to the other state
|
||||
- `handle_on`
|
||||
- `handle_off`
|
||||
- When the light comes on, check if it's attributes match what they should, given the time.
|
||||
|
||||
|
||||
## Services
|
||||
|
||||
- <name>/activate
|
||||
- <name>/activate_all_off
|
||||
- <name>/deactivate
|
||||
- <name>/toggle
|
||||
|
||||
"""
|
||||
|
||||
@property
|
||||
def states(self) -> List[ControllerStateConfig]:
|
||||
return self._room_config.states
|
||||
|
||||
@states.setter
|
||||
def states(self, new: List[ControllerStateConfig]):
|
||||
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
|
||||
self._room_config.states = new
|
||||
|
||||
@property
|
||||
@wraps(Location.time_at_elevation)
|
||||
def time_at_elevation(self):
|
||||
return self.AD.sched.location.time_at_elevation
|
||||
|
||||
@property
|
||||
def state_entity(self) -> Entity:
|
||||
return self.get_entity(f'{self.name}.state', namespace='controller')
|
||||
|
||||
def initialize(self):
|
||||
self.logger = console.load_rich_config(self.name)
|
||||
self.set_log_level('DEBUG')
|
||||
|
||||
self.refresh_state_times()
|
||||
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||
|
||||
self.register_service(
|
||||
f'{self.name}/activate', self._service_activate, namespace='controller'
|
||||
)
|
||||
self.register_service(
|
||||
f'{self.name}/activate_all_off', self._service_activate_all_off, namespace='controller'
|
||||
)
|
||||
self.register_service(
|
||||
f'{self.name}/deactivate', self._service_deactivate, namespace='controller'
|
||||
)
|
||||
self.register_service(f'{self.name}/toggle', self._service_toggle, namespace='controller')
|
||||
|
||||
# This needs to come after this first call of refresh_state_times
|
||||
self.app_entities = self.get_app_entities()
|
||||
self.log(f'entities: {self.app_entities}', level='DEBUG')
|
||||
|
||||
if button := self.args.get('button'):
|
||||
if isinstance(button, str):
|
||||
self.button = Button(self, button_name=button)
|
||||
|
||||
if door := self.args.get('door'):
|
||||
if isinstance(door, str):
|
||||
self.log('door--')
|
||||
self.door = Door(self, entity_id=door)
|
||||
|
||||
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
||||
|
||||
def terminate(self):
|
||||
self.log('[bold red]Terminating[/]', level='DEBUG')
|
||||
|
||||
def get_app_entities(self) -> Set[str]:
|
||||
"""Gets a set of all the entities referenced by any of the state definitions"""
|
||||
|
||||
def gen():
|
||||
for state in self._room_config.states:
|
||||
if isinstance(state.scene, str):
|
||||
assert state.scene.startswith(
|
||||
'scene.'
|
||||
), "Scene definition must start with 'scene.'"
|
||||
entities = self.get_state(state.scene, attribute='entity_id')
|
||||
yield from entities
|
||||
else:
|
||||
yield from state.scene.keys()
|
||||
|
||||
return set(gen())
|
||||
|
||||
def refresh_state_times(self, *args, **kwargs):
|
||||
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
||||
|
||||
Parsed states have an absolute time for the current day.
|
||||
"""
|
||||
# re-parse the state strings into times for the current day
|
||||
self._room_config = RoomControllerConfig.model_validate(self.args)
|
||||
self.log(
|
||||
f'{len(self._room_config.states)} states in the app configuration',
|
||||
level='DEBUG',
|
||||
)
|
||||
|
||||
for state in self._room_config.states:
|
||||
if state.time is None and state.elevation is not None:
|
||||
state.time = self.time_at_elevation(
|
||||
elevation=state.elevation, direction=state.direction
|
||||
).time()
|
||||
elif isinstance(state.time, str):
|
||||
state.time = self.parse_time(state.time)
|
||||
|
||||
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
|
||||
|
||||
try:
|
||||
self.run_at(
|
||||
callback=lambda cb_args: self.set_controller_scene(cb_args['state']),
|
||||
start=state.time.strftime('%H:%M:%S'),
|
||||
state=state,
|
||||
)
|
||||
except Exception as e:
|
||||
self.log(f'Failed with {type(e)}: {e}')
|
||||
|
||||
def set_controller_scene(self, state: ControllerStateConfig):
|
||||
try:
|
||||
self.state_entity.set_state(attributes=state.model_dump())
|
||||
except Exception:
|
||||
self.logger.error(traceback.format_exc())
|
||||
else:
|
||||
self.log(f'Set controller state of {self.name}: {state.model_dump()}', level='DEBUG')
|
||||
|
||||
def current_state(self) -> ControllerStateConfig:
|
||||
if self.sleep_bool():
|
||||
self.log('sleep: active', level='DEBUG')
|
||||
if state := self.args.get('sleep_state'):
|
||||
return ControllerStateConfig(**state)
|
||||
else:
|
||||
return ControllerStateConfig()
|
||||
else:
|
||||
try:
|
||||
attrs = self.state_entity.get_state('all')['attributes']
|
||||
state = ControllerStateConfig.model_validate(attrs)
|
||||
except Exception:
|
||||
state = ControllerStateConfig()
|
||||
finally:
|
||||
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
|
||||
return state
|
||||
|
||||
def activate(self, **kwargs):
|
||||
self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
|
||||
|
||||
def _service_activate(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
|
||||
self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
|
||||
state = self.current_state()
|
||||
if isinstance(state.scene, str):
|
||||
self.turn_on(state.scene)
|
||||
# self.turn_on(state.scene, transition=0)
|
||||
elif isinstance(state.scene, dict):
|
||||
scene = state.to_apply_kwargs()
|
||||
self.call_service('scene/apply', **scene)
|
||||
# scene = state.to_apply_kwargs(transition=0)
|
||||
|
||||
def activate_any_on(self, **kwargs):
|
||||
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
|
||||
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
|
||||
|
||||
def _service_activate_any_on(
|
||||
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
|
||||
):
|
||||
if self.any_on() and not self.manual_mode():
|
||||
self.activate(**kwargs)
|
||||
|
||||
def activate_all_off(self, **kwargs):
|
||||
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
|
||||
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
|
||||
|
||||
def _service_activate_all_off(
|
||||
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
|
||||
):
|
||||
if self.all_off() and not self.manual_mode():
|
||||
self.activate(**kwargs)
|
||||
|
||||
def deactivate(self, **kwargs):
|
||||
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
|
||||
|
||||
def _service_deactivate(
|
||||
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
|
||||
):
|
||||
for e in self.app_entities:
|
||||
self.turn_off(e)
|
||||
|
||||
def toggle(self, **kwargs):
|
||||
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
|
||||
|
||||
def _service_toggle(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
|
||||
if self.any_on():
|
||||
self.deactivate(**kwargs)
|
||||
else:
|
||||
self.activate(**kwargs)
|
||||
|
||||
def app_entity_states(self) -> Dict[str, str]:
|
||||
states = {entity: self.get_state(entity) for entity in self.app_entities}
|
||||
return states
|
||||
|
||||
def all_off(self) -> bool:
|
||||
""" "All off" is the logic opposite of "any on"
|
||||
|
||||
Returns:
|
||||
bool: Whether all the lights associated with the app are off
|
||||
"""
|
||||
states = self.app_entity_states()
|
||||
return all(state != 'on' for entity, state in states.items())
|
||||
|
||||
def any_on(self) -> bool:
|
||||
""" "Any on" is the logic opposite of "all off"
|
||||
|
||||
Returns:
|
||||
bool: Whether any of the lights associated with the app are on
|
||||
"""
|
||||
states = self.app_entity_states()
|
||||
return any(state == 'on' for entity, state in states.items())
|
||||
|
||||
def sleep_bool(self) -> bool:
|
||||
if sleep_var := self.args.get('sleep'):
|
||||
return self.get_state(sleep_var) == 'on'
|
||||
else:
|
||||
return False
|
||||
|
||||
def manual_mode(self) -> bool:
|
||||
if manual_entity := self.args.get('manual_mode'):
|
||||
return self.get_state(manual_entity) == 'on'
|
||||
else:
|
||||
return False
|
||||
|
||||
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
|
||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||
|
||||
Priority:
|
||||
- Value in scene definition
|
||||
- Default value
|
||||
- Normal - value in app definition
|
||||
- Sleep - 0
|
||||
|
||||
"""
|
||||
sleep_mode_active = self.sleep_bool()
|
||||
if sleep_mode_active:
|
||||
self.log(f'Sleeping mode active: {sleep_mode_active}')
|
||||
return datetime.timedelta()
|
||||
else:
|
||||
now = now or self.get_now().time()
|
||||
return self._room_config.current_off_duration(now)
|
||||
Reference in New Issue
Block a user