Compare commits

5 Commits

Author SHA1 Message Date
John Lancaster
1c5edf6cc7 button and door converted 2024-07-27 14:35:07 -05:00
John Lancaster
9ce8432bba added some services 2024-07-27 12:17:34 -05:00
John Lancaster
7f68c8cad2 WIP 2024-07-25 22:57:47 -05:00
John Lancaster
a703fd15fb started entities and custom services in new namespace 2024-07-25 00:25:08 -05:00
John Lancaster
043402ad2f changed datetime import 2024-07-25 00:24:07 -05:00
9 changed files with 407 additions and 488 deletions

View File

@@ -1,4 +1,4 @@
from .controller import RoomController
from .room_control import RoomController
from .motion import Motion
from .button import Button
from .door import Door

View File

@@ -1,150 +0,0 @@
import asyncio
import logging.config
from logging import Handler, LogRecord
from typing import TYPE_CHECKING
import aiohttp
from appdaemon.adapi import ADAPI
from .console import RCHighlighter, console
if TYPE_CHECKING:
from room_control import RoomController
class Singleton(type):
"""
https://en.wikipedia.org/wiki/Singleton_pattern
https://stackoverflow.com/q/6760685
https://realpython.com/python-metaclasses/
https://docs.python.org/3/reference/datamodel.html#metaclasses
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class AsyncLokiHandler(Handler, metaclass=Singleton):
loop: asyncio.BaseEventLoop
loki_url: str
queue: asyncio.Queue
consumer_task: asyncio.Task
def __init__(self, loop: asyncio.BaseEventLoop, loki_url: str):
console.print(f' [bold yellow]{hex(id(self))}[/] '.center(50, '-'))
super().__init__()
self.loop = loop
self.loki_url = loki_url
self.queue = asyncio.Queue()
self.consumer_task = self.loop.create_task(self.log_consumer())
def emit(self, record: LogRecord):
self.loop.create_task(self.send_to_loki(record))
async def send_to_loki(self, record: LogRecord):
labels = {'room': record.room}
if component := getattr(record, 'component', False):
labels['component'] = component
message = self.format(record)
ns = round(record.created * 1_000_000_000)
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
payload = {'streams': [{'stream': labels, 'values': [[str(ns), message]]}]}
await self.queue.put(payload)
async def log_consumer(self):
async with aiohttp.ClientSession() as session:
try:
while True:
payload = await self.queue.get()
await session.post(self.loki_url, json=payload, timeout=1)
# console.print('[bold yellow]Sent to Loki[/]')
except asyncio.CancelledError:
console.print('[bold red]Cancelled[/]')
class RoomControlBase(ADAPI):
app: 'RoomController'
logger: logging.LoggerAdapter
def initialize(self, room: str, component: str = None):
"""Sets up the logging"""
logger_name = f'Appdaemon.{room}'
extra_attributes = {'room': room}
# all the stuff that has to happen for a component
if component is not None:
self.app: 'RoomController' = self.get_app(self.args['app'])
logger_name += f'.{component}'
extra_attributes['component'] = component
rich_handler_args = {
'console': console,
'highlighter': RCHighlighter(),
'markup': True,
'show_path': False,
'omit_repeated_times': False,
}
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'filters': {'unmarkup': {'()': 'room_control.console.UnMarkupFilter'}},
'formatters': {
'basic': {'style': '{', 'format': '{message}'},
'rich': {
'style': '{',
'format': '[room]{room}[/] {message}',
'datefmt': '%H:%M:%S.%f',
},
'rich_component': {
'style': '{',
'format': '[room]{room}[/] [component]{component}[/] {message}',
'datefmt': '%H:%M:%S.%f',
},
},
'handlers': {
'rich': {
'formatter': 'rich',
'()': 'rich.logging.RichHandler',
**rich_handler_args,
},
'rich_component': {
'formatter': 'rich_component',
'()': 'rich.logging.RichHandler',
**rich_handler_args,
},
'async_queue': {
'formatter': 'basic',
'filters': ['unmarkup'],
'()': 'room_control.base.AsyncLokiHandler',
'loop': self.AD.loop,
'loki_url': self.args['loki_url'],
},
},
'loggers': {
logger_name: {
'level': 'INFO',
'propagate': False,
'handlers': [
'rich' if component is None else 'rich_component',
'async_queue',
],
}
},
}
)
self.logger = logging.LoggerAdapter(logging.getLogger(logger_name), extra_attributes)
self.handler: AsyncLokiHandler = self.logger.logger.handlers[-1]
def terminate(self):
status: bool = self.handler.consumer_task.cancel()
if status:
self.log('Cancelled consumer task')

View File

@@ -1,47 +1,28 @@
import json
from dataclasses import dataclass
from typing import List
from typing import Any, Dict
from appdaemon.plugins.mqtt.mqttapi import Mqtt
from .base import RoomControlBase
from .model import ButtonConfig
from appdaemon.adapi import ADAPI
@dataclass(init=False)
class Button(RoomControlBase, Mqtt):
button: str | List[str]
rich: bool = False
config: ButtonConfig
@dataclass
class Button:
adapi: ADAPI
button_name: str
def initialize(self):
super().initialize(room=self.args['app'], component='Button')
self.config = ButtonConfig(**self.args)
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
self.button = self.config.button
self.setup_buttons(self.button)
def setup_buttons(self, buttons):
if isinstance(buttons, list):
for button in buttons:
self.setup_button(button)
else:
self.setup_button(buttons)
def setup_button(self, name: str):
topic = f'zigbee2mqtt/{name}'
# self.mqtt_subscribe(topic, namespace='mqtt')
self.listen_event(
def __post_init__(self):
self.log = self.adapi.log
topic = f'zigbee2mqtt/{self.button_name}'
self.adapi.listen_event(
self.handle_button,
'MQTT_MESSAGE',
topic=topic,
namespace='mqtt',
button=name,
button=self.button_name,
)
self.log(f'MQTT topic [topic]{topic}[/] controls app [room]{self.app.name}[/]')
self.log(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
def handle_button(self, event_name, data, kwargs):
def handle_button(self, event_name: str, data: Dict[str, Any], kwargs: Dict[str, Any]):
try:
payload = json.loads(data['payload'])
action = payload['action']
@@ -52,19 +33,7 @@ class Button(RoomControlBase, Mqtt):
else:
if isinstance(action, str) and action != '':
self.log(f'Action: [yellow]{action}[/]')
self.handle_action(action)
def handle_action(self, action: str):
if action == 'single':
state = self.get_state(self.args['ref_entity'])
kwargs = {'kwargs': {'cause': f'button single click: toggle while {state}'}}
if manual_entity := self.args.get('manual_mode'):
self.set_state(entity_id=manual_entity, state='off')
if state == 'on':
self.app.deactivate(**kwargs)
else:
self.app.activate(**kwargs)
else:
pass
if action == 'single':
self.adapi.call_service(
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
)

View File

@@ -48,8 +48,6 @@ class RCHighlighter(RegexHighlighter):
def load_rich_config(
room: str = None, component: str = None, level: str = 'INFO'
) -> logging.LoggerAdapter:
"""Loads the config from the .rich_logging.yaml file, adds some bits, and returns a LoggerAdapter
"""
logger_name = f'Appdaemon.{room}'
if component is not None:
@@ -62,12 +60,6 @@ def load_rich_config(
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
RICH_CFG['handlers']['rich_component']['console'] = console
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
# RICH_CFG['handlers']['async_queue'] = {
# 'formatter': 'basic',
# '()': 'room_control.loki.AsyncQueueHandler',
# 'loop': self.AD.loop,
# 'queue': self.loki_queue,
# }
RICH_CFG['loggers'] = {
logger_name: {
'handlers': ['rich' if component is None else 'rich_component'],
@@ -87,11 +79,18 @@ def load_rich_config(
return adapter
class ContextSettingFilter(logging.Filter, ABC):
"""Adds all the dataclass fields as attributes to LogRecord objects.
RICH_HANDLER_CFG = {
'()': 'rich.logging.RichHandler',
'markup': True,
'show_path': False,
# 'show_time': False,
'omit_repeated_times': False,
'console': console,
'highlighter': RCHighlighter(),
}
Intended to be inherited from by something that uses the dataclasses.dataclass decorator.
"""
class ContextSettingFilter(logging.Filter, ABC):
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
for name, val in asdict(self).items():
if val is not None:
@@ -99,6 +98,12 @@ class ContextSettingFilter(logging.Filter, ABC):
return record
# @dataclass
# class RoomControllerFilter(ContextSettingFilter):
# room: str
# component: Optional[str] = None
class RoomFilter(logging.Filter):
"""Used to filter out messages that have a component field because they will have already been printed by their respective logger."""
@@ -106,6 +111,11 @@ class RoomFilter(logging.Filter):
return not hasattr(record, 'component')
# class RoomControllerFormatter(logging.Formatter):
# def format(self, record: logging.LogRecord):
# return super().format(record)
class UnMarkupFilter(logging.Filter):
md_regex = re.compile(r'(?P<open>\[.*?\])(?P<text>.*?)(?P<close>\[\/\])')
@@ -119,17 +129,6 @@ class JSONFormatter(logging.Formatter):
return json.dumps(record.__dict__)
RICH_HANDLER_CFG = {
'()': 'rich.logging.RichHandler',
'markup': True,
'show_path': False,
# 'show_time': False,
'omit_repeated_times': False,
'console': console,
'highlighter': RCHighlighter(),
}
## currently unused
def room_logging_config(name: str):
return {
'version': 1,
@@ -187,3 +186,52 @@ def room_logging_config(name: str):
},
},
}
# def component_logging_config(parent_room: str, component: str):
# logger_name = f'AppDaemon.{parent_room}.{component}'
# cfg = load_rich_config()
# LOG_CFG = {
# 'version': 1,
# 'disable_existing_loggers': False,
# 'formatters': {
# 'rich_component': {
# 'style': '{',
# 'format': '[room]{room}[/] [component]{component}[/] {message}',
# 'datefmt': '%H:%M:%S.%f',
# },
# },
# 'handlers': {
# 'rich_component': {
# 'formatter': 'rich_component',
# **RICH_HANDLER_CFG,
# },
# },
# 'loggers': {
# logger_name: {
# # 'level': 'INFO',
# 'propagate': True,
# 'handlers': ['rich_component'],
# }
# },
# }
# return LOG_CFG
# def setup_component_logging(self) -> logging.Logger:
# """Creates a logger for a subcomponent with a RichHandler"""
# component = type(self).__name__
# parent = self.args['app']
# cfg_dict = component_logging_config(parent_room=parent, component=component)
# logger_name = next(iter(cfg_dict['loggers']))
# try:
# logging.config.dictConfig(cfg_dict)
# except Exception:
# console.print_exception()
# else:
# logger = logging.getLogger(logger_name)
# logger = logging.LoggerAdapter(logger, {'room': parent, 'component': component})
# return logger

View File

@@ -1,235 +0,0 @@
import datetime
import json
import logging
import logging.config
from copy import deepcopy
from typing import Dict, List
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from .base import RoomControlBase
from .model import ControllerStateConfig, RoomControllerConfig
logger = logging.getLogger(__name__)
class RoomController(RoomControlBase, Hass):
"""Class for linking room's lights with a motion sensor.
- Separate the turning on and turning off functions.
- Use the state change of the light to set up the event for changing to the other state
- `handle_on`
- `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time.
"""
@property
def states(self) -> List[ControllerStateConfig]:
return self._room_config.states
@states.setter
def states(self, new: List[ControllerStateConfig]):
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
self._room_config.states = new
def initialize(self):
super().initialize(room=self.name)
self.app_entities = self.gather_app_entities()
self.refresh_state_times()
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def terminate(self):
self.log('[bold red]Terminating[/]', level='DEBUG')
def gather_app_entities(self) -> List[str]:
"""Returns a list of all the entities involved in any of the states"""
def generator():
for settings in deepcopy(self.args['states']):
if scene := settings.get('scene'):
if isinstance(scene, str):
assert scene.startswith(
'scene.'
), f"Scene definition must start with 'scene.' for app {self.name}"
entity: Entity = self.get_entity(scene)
entity_state = entity.get_state('all')
attributes = entity_state['attributes']
for entity in attributes['entity_id']:
yield entity
else:
for key in scene.keys():
yield key
else:
yield self.args['entity']
return set(list(generator()))
def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states.
Parsed states have an absolute time for the current day.
"""
# re-parse the state strings into times for the current day
self._room_config = RoomControllerConfig.model_validate(self.args)
self.log(
f'{len(self._room_config.states)} states in the app configuration',
level='DEBUG',
)
for state in self._room_config.states:
if state.time is None and state.elevation is not None:
state.time = self.AD.sched.location.time_at_elevation(
elevation=state.elevation, direction=state.direction
).time()
elif isinstance(state.time, str):
state.time = self.parse_time(state.time)
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
# schedule the transitions
for state in self.states[::-1]:
# t: datetime.time = state['time']
t: datetime.time = state.time
try:
self.run_at(
callback=self.activate_any_on,
start=t.strftime('%H:%M:%S'),
cause='scheduled transition',
)
except ValueError:
# happens when the callback time is in the past
pass
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
def current_state(self, now: datetime.time = None) -> ControllerStateConfig:
if self.sleep_bool():
self.log('sleep: active')
if state := self.args.get('sleep_state'):
return ControllerStateConfig(**state)
else:
return ControllerStateConfig(scene={})
else:
now = now or self.get_now().time().replace(microsecond=0)
self.log(f'Getting state for {now.strftime("%I:%M:%S %p")}', level='DEBUG')
state = self._room_config.current_state(now)
self.log(f'Current state: {state.time}', level='DEBUG')
return state
def app_entity_states(self) -> Dict[str, str]:
states = {entity: self.get_state(entity) for entity in self.app_entities}
return states
def all_off(self) -> bool:
""" "All off" is the logic opposite of "any on"
Returns:
bool: Whether all the lights associated with the app are off
"""
states = self.app_entity_states()
return all(state != 'on' for entity, state in states.items())
def any_on(self) -> bool:
""" "Any on" is the logic opposite of "all off"
Returns:
bool: Whether any of the lights associated with the app are on
"""
states = self.app_entity_states()
return any(state == 'on' for entity, state in states.items())
def sleep_bool(self) -> bool:
if sleep_var := self.args.get('sleep'):
return self.get_state(sleep_var) == 'on'
else:
return False
def manual_mode(self) -> bool:
if manual_entity := self.args.get('manual_mode'):
return self.get_state(manual_entity) == 'on'
else:
return False
# @sleep_bool.setter
# def sleep_bool(self, val) -> bool:
# if (sleep_var := self.args.get('sleep')):
# if isinstance(val, str):
# self.set_state(sleep_var, state=val)
# elif isinstance(val, bool):
# self.set_state(sleep_var, state='on' if val else 'off')
# else:
# raise ValueError('Sleep variable is undefined')
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating
Priority:
- Value in scene definition
- Default value
- Normal - value in app definition
- Sleep - 0
"""
sleep_mode_active = self.sleep_bool()
if sleep_mode_active:
self.log(f'Sleeping mode active: {sleep_mode_active}')
return datetime.timedelta()
else:
now = now or self.get_now().time()
return self._room_config.current_off_duration(now)
def activate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
if kwargs is not None:
cause = kwargs.get('cause', 'unknown')
else:
cause = 'unknown'
self.log(f'Activating: {cause}')
scene_kwargs = self.current_state().to_apply_kwargs(transition=0)
if isinstance(scene_kwargs, str):
self.turn_on(scene_kwargs)
self.log(f'Turned on scene: {scene_kwargs}')
elif isinstance(scene_kwargs, dict):
self.call_service('scene/apply', **scene_kwargs)
self.log(f'Applied scene:\n{json.dumps(scene_kwargs, indent=2)}', level='DEBUG')
elif scene_kwargs is None:
self.log('No scene, ignoring...')
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
# self.callback_light_off()
else:
self.log(f'ERROR: unknown scene: {scene_kwargs}')
def activate_all_off(self, *args, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
if self.all_off():
self.activate(*args, **kwargs)
else:
self.log('Skipped activating - everything is not off')
def activate_any_on(self, *args, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
if self.any_on() and not self.manual_mode():
self.activate(*args, **kwargs)
else:
self.log('Skipped activating - everything is off')
def toggle_activate(self, *args, **kwargs):
if self.any_on():
self.deactivate(*args, **kwargs)
else:
self.activate(*args, **kwargs)
def deactivate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
cause = kwargs.get('cause', 'unknown')
self.log(f'Deactivating: {cause}')
for e in self.app_entities:
self.turn_off(e)
self.log(f'Turned off {e}')

View File

@@ -1,17 +1,25 @@
from appdaemon.plugins.hass.hassapi import Hass
from dataclasses import dataclass
from logging import Logger, LoggerAdapter
from .base import RoomControlBase
from appdaemon.adapi import ADAPI
class Door(RoomControlBase, Hass):
def initialize(self):
super().initialize(room=self.args['app'], component='Door')
@dataclass
class Door:
adapi: ADAPI
entity_id: str
self.listen_state(
self.app.activate_all_off,
entity_id=self.args['door'],
def __post_init__(self):
self.logger = LoggerAdapter(
self.adapi.logger.logger.getChild('door'), self.adapi.logger.extra
)
self.adapi.listen_state(
callback=lambda *args, **kwargs: self.call_service(
f'{self.adapi.name}/activate_all_off', namespace='controller'
),
entity_id=self.entity_id,
new='on',
cause='door open',
)
self.log(f'Waiting for door to open: [bold green]{self.args["door"]}[/]')
self.logger.debug(f'Initialized door for [room]{self.adapi.name}[/]')

View File

@@ -1,21 +1,21 @@
from datetime import datetime, time, timedelta
import datetime
from pathlib import Path
from typing import Annotated, Dict, List, Optional, Self
import yaml
from astral import SunDirection
from pydantic import BaseModel, BeforeValidator, Field, model_validator
from pydantic import BaseModel, BeforeValidator, Field, model_validator, root_validator
from pydantic_core import PydanticCustomError
from rich.console import Console, ConsoleOptions, RenderResult
from rich.table import Column, Table
def str_to_timedelta(input_str: str) -> timedelta:
def str_to_timedelta(input_str: str) -> datetime.timedelta:
try:
hours, minutes, seconds = map(int, input_str.split(':'))
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
except Exception:
return timedelta()
return datetime.timedelta()
def str_to_direction(input_str: str) -> SunDirection:
@@ -27,7 +27,7 @@ def str_to_direction(input_str: str) -> SunDirection:
)
OffDuration = Annotated[timedelta, BeforeValidator(str_to_timedelta)]
OffDuration = Annotated[datetime.timedelta, BeforeValidator(str_to_timedelta)]
class State(BaseModel):
@@ -45,23 +45,23 @@ class ApplyKwargs(BaseModel):
class ControllerStateConfig(BaseModel):
time: Optional[str | datetime] = None
time: Optional[str | datetime.time | datetime.datetime] = None
elevation: Optional[float] = None
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None
off_duration: Optional[OffDuration] = None
scene: dict[str, State] | str
scene: dict[str, State] | str = Field(default_factory=dict)
@model_validator(mode='before')
def check_args(cls, values):
time, elevation = values.get('time'), values.get('elevation')
if time is not None and elevation is not None:
raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
elif elevation is not None and 'direction' not in values:
# if time is not None and elevation is not None:
# raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
if elevation is not None and 'direction' not in values:
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
return values
def to_apply_kwargs(self, **kwargs):
return ApplyKwargs(entities=self.scene, **kwargs).model_dump(exclude_none=True)
def to_apply_kwargs(self, transition: int = None):
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
class RoomControllerConfig(BaseModel):
@@ -102,11 +102,11 @@ class RoomControllerConfig(BaseModel):
def sort_states(self):
"""Should only be called after all the times have been resolved"""
assert all(
isinstance(state.time, time) for state in self.states
isinstance(state.time, datetime.time) for state in self.states
), 'Times have not all been resolved yet'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
def current_state(self, now: time) -> ControllerStateConfig:
def current_state(self, now: datetime.time) -> ControllerStateConfig:
self.sort_states()
for state in self.states:
if state.time <= now:
@@ -114,11 +114,11 @@ class RoomControllerConfig(BaseModel):
else:
return self.states[0]
def current_scene(self, now: time) -> Dict:
def current_scene(self, now: datetime.time) -> Dict:
state = self.current_state(now)
return state.scene
def current_off_duration(self, now: time) -> timedelta:
def current_off_duration(self, now: datetime.time) -> datetime.timedelta:
state = self.current_state(now)
if state.off_duration is None:
if self.off_duration is None:

View File

@@ -1,12 +1,16 @@
import re
from datetime import timedelta
from typing import Literal, Optional
from logging import Logger
from typing import TYPE_CHECKING, Literal, Optional
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from pydantic import BaseModel, ValidationError
from .base import RoomControlBase
from . import console
if TYPE_CHECKING:
from room_control import RoomController
class CallbackEntry(BaseModel):
@@ -23,7 +27,10 @@ class CallbackEntry(BaseModel):
Callbacks = dict[str, dict[str, CallbackEntry]]
class Motion(RoomControlBase, Hass):
class Motion(Hass):
logger: Logger
app: 'RoomController'
@property
def sensor(self) -> Entity:
return self.get_entity(self.args['sensor'])
@@ -45,7 +52,8 @@ class Motion(RoomControlBase, Hass):
return self.sensor_state != self.ref_entity_state
def initialize(self):
super().initialize(room=self.args['app'], component='Motion')
self.app: 'RoomController' = self.get_app(self.args['app'])
self.logger = console.load_rich_config(self.app.name, type(self).__name__)
assert self.entity_exists(self.args['sensor'])
assert self.entity_exists(self.args['ref_entity'])
@@ -63,6 +71,7 @@ class Motion(RoomControlBase, Hass):
if self.sensor_state:
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
self.listen_state(
**base_kwargs,
attribute='brightness',
@@ -70,6 +79,9 @@ class Motion(RoomControlBase, Hass):
)
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
for handle, cb in self.callbacks():
self.log(f'Handle [yellow]{handle[:4]}[/]: {cb.function}', level='DEBUG')
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def callbacks(self):

267
src/room_control/room_control.py Executable file
View File

@@ -0,0 +1,267 @@
import datetime
import logging
import logging.config
import traceback
from functools import wraps
from typing import Any, Dict, List, Set
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from astral.location import Location
from . import console
from .button import Button
from .door import Door
from .model import ControllerStateConfig, RoomControllerConfig
logger = logging.getLogger(__name__)
class RoomController(Hass):
"""Class for linking room's lights with a motion sensor.
- Separate the turning on and turning off functions.
- Use the state change of the light to set up the event for changing to the other state
- `handle_on`
- `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time.
## Services
- <name>/activate
- <name>/activate_all_off
- <name>/deactivate
- <name>/toggle
"""
@property
def states(self) -> List[ControllerStateConfig]:
return self._room_config.states
@states.setter
def states(self, new: List[ControllerStateConfig]):
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
self._room_config.states = new
@property
@wraps(Location.time_at_elevation)
def time_at_elevation(self):
return self.AD.sched.location.time_at_elevation
@property
def state_entity(self) -> Entity:
return self.get_entity(f'{self.name}.state', namespace='controller')
def initialize(self):
self.logger = console.load_rich_config(self.name)
self.set_log_level('DEBUG')
self.refresh_state_times()
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
self.register_service(
f'{self.name}/activate', self._service_activate, namespace='controller'
)
self.register_service(
f'{self.name}/activate_all_off', self._service_activate_all_off, namespace='controller'
)
self.register_service(
f'{self.name}/deactivate', self._service_deactivate, namespace='controller'
)
self.register_service(f'{self.name}/toggle', self._service_toggle, namespace='controller')
# This needs to come after this first call of refresh_state_times
self.app_entities = self.get_app_entities()
self.log(f'entities: {self.app_entities}', level='DEBUG')
if button := self.args.get('button'):
if isinstance(button, str):
self.button = Button(self, button_name=button)
if door := self.args.get('door'):
if isinstance(door, str):
self.log('door--')
self.door = Door(self, entity_id=door)
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def terminate(self):
self.log('[bold red]Terminating[/]', level='DEBUG')
def get_app_entities(self) -> Set[str]:
"""Gets a set of all the entities referenced by any of the state definitions"""
def gen():
for state in self._room_config.states:
if isinstance(state.scene, str):
assert state.scene.startswith(
'scene.'
), "Scene definition must start with 'scene.'"
entities = self.get_state(state.scene, attribute='entity_id')
yield from entities
else:
yield from state.scene.keys()
return set(gen())
def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states.
Parsed states have an absolute time for the current day.
"""
# re-parse the state strings into times for the current day
self._room_config = RoomControllerConfig.model_validate(self.args)
self.log(
f'{len(self._room_config.states)} states in the app configuration',
level='DEBUG',
)
for state in self._room_config.states:
if state.time is None and state.elevation is not None:
state.time = self.time_at_elevation(
elevation=state.elevation, direction=state.direction
).time()
elif isinstance(state.time, str):
state.time = self.parse_time(state.time)
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
try:
self.run_at(
callback=lambda cb_args: self.set_controller_scene(cb_args['state']),
start=state.time.strftime('%H:%M:%S'),
state=state,
)
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
def set_controller_scene(self, state: ControllerStateConfig):
try:
self.state_entity.set_state(attributes=state.model_dump())
except Exception:
self.logger.error(traceback.format_exc())
else:
self.log(f'Set controller state of {self.name}: {state.model_dump()}', level='DEBUG')
def current_state(self) -> ControllerStateConfig:
if self.sleep_bool():
self.log('sleep: active', level='DEBUG')
if state := self.args.get('sleep_state'):
return ControllerStateConfig(**state)
else:
return ControllerStateConfig()
else:
try:
attrs = self.state_entity.get_state('all')['attributes']
state = ControllerStateConfig.model_validate(attrs)
except Exception:
state = ControllerStateConfig()
finally:
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
return state
def activate(self, **kwargs):
self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
def _service_activate(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
state = self.current_state()
if isinstance(state.scene, str):
self.turn_on(state.scene)
# self.turn_on(state.scene, transition=0)
elif isinstance(state.scene, dict):
scene = state.to_apply_kwargs()
self.call_service('scene/apply', **scene)
# scene = state.to_apply_kwargs(transition=0)
def activate_any_on(self, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
def _service_activate_any_on(
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
):
if self.any_on() and not self.manual_mode():
self.activate(**kwargs)
def activate_all_off(self, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
def _service_activate_all_off(
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
):
if self.all_off() and not self.manual_mode():
self.activate(**kwargs)
def deactivate(self, **kwargs):
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
def _service_deactivate(
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
):
for e in self.app_entities:
self.turn_off(e)
def toggle(self, **kwargs):
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
def _service_toggle(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
if self.any_on():
self.deactivate(**kwargs)
else:
self.activate(**kwargs)
def app_entity_states(self) -> Dict[str, str]:
states = {entity: self.get_state(entity) for entity in self.app_entities}
return states
def all_off(self) -> bool:
""" "All off" is the logic opposite of "any on"
Returns:
bool: Whether all the lights associated with the app are off
"""
states = self.app_entity_states()
return all(state != 'on' for entity, state in states.items())
def any_on(self) -> bool:
""" "Any on" is the logic opposite of "all off"
Returns:
bool: Whether any of the lights associated with the app are on
"""
states = self.app_entity_states()
return any(state == 'on' for entity, state in states.items())
def sleep_bool(self) -> bool:
if sleep_var := self.args.get('sleep'):
return self.get_state(sleep_var) == 'on'
else:
return False
def manual_mode(self) -> bool:
if manual_entity := self.args.get('manual_mode'):
return self.get_state(manual_entity) == 'on'
else:
return False
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating
Priority:
- Value in scene definition
- Default value
- Normal - value in app definition
- Sleep - 0
"""
sleep_mode_active = self.sleep_bool()
if sleep_mode_active:
self.log(f'Sleeping mode active: {sleep_mode_active}')
return datetime.timedelta()
else:
now = now or self.get_now().time()
return self._room_config.current_off_duration(now)