12 Commits
loki ... main

Author SHA1 Message Date
John Lancaster
72b4b7d72e pyproject updates 2025-11-16 09:26:56 -06:00
John Lancaster
b1638be692 added entity callbacks 2024-09-12 21:15:30 -05:00
John Lancaster
8b7294b445 broke out virtual button 2024-09-01 21:07:29 -05:00
John Lancaster
e4dada011c added virtual buttons 2024-09-01 20:38:47 -05:00
John Lancaster
e3186f1b5e removed unused import 2024-08-27 20:45:48 -05:00
John Lancaster
2180e544c0 another fix 2024-08-27 20:43:01 -05:00
John Lancaster
82f43cd029 simplifications 2024-08-27 19:24:38 -05:00
John Lancaster
1b3fc4afb7 changes for use_dictionary_unpacking 2024-08-27 00:41:25 -05:00
John Lancaster
0f58ac4cc6 fixed reconstruction of sun direction from state with a new field validator 2024-07-31 23:26:09 -05:00
John Lancaster
b278a3cda1 changed default of load_rich_config 2024-07-27 20:52:08 -05:00
John Lancaster
e4fcd757ce added log_level config per app 2024-07-27 20:48:57 -05:00
John Lancaster
7cde3c75d8 changed to use services 2024-07-27 17:37:33 -05:00
8 changed files with 366 additions and 362 deletions

View File

@@ -12,16 +12,17 @@ dependencies = [
"ruff>=0.4.2", "ruff>=0.4.2",
] ]
readme = "README.md" readme = "README.md"
requires-python = ">= 3.8,<3.12" requires-python = ">= 3.10,<3.13"
[tool.setuptools] [build-system]
include-package-data = true requires = ["hatchling"]
build-backend = "hatchling.build"
# [tool.setuptools.package-data] [tool.hatch.metadata]
# mypkg = ["*.yaml"] allow-direct-references = true
[tool.setuptools.data-files] [tool.hatch.build.targets.wheel]
config = ["config/default_config.yaml"] packages = ["src/room_control"]
[tool.ruff.format] [tool.ruff.format]
quote-style = 'single' quote-style = 'single'

View File

@@ -1,6 +1,6 @@
from .room_control import RoomController
from .motion import Motion
from .button import Button from .button import Button
from .door import Door from .door import Door
from .motion import MotionSensor
from .room_control import RoomController
__all__ = ['RoomController', 'Motion', 'Button', 'Door'] __all__ = ['RoomController', 'MotionSensor', 'Button', 'Door']

View File

@@ -1,76 +1,95 @@
import json import json
from dataclasses import dataclass from dataclasses import dataclass
from logging import Logger from typing import TYPE_CHECKING, Any, Dict
from typing import TYPE_CHECKING, List
from appdaemon.plugins.mqtt.mqttapi import Mqtt from appdaemon.entity import Entity
from . import console from . import console
from .model import ButtonConfig
if TYPE_CHECKING: if TYPE_CHECKING:
from room_control import RoomController from .room_control import RoomController
@dataclass(init=False) @dataclass
class Button(Mqtt): class Button:
button: str | List[str] adapi: 'RoomController'
rich: bool = False button_name: str
config: ButtonConfig
logger: Logger
async def initialize(self): def __post_init__(self):
self.app: 'RoomController' = await self.get_app(self.args['app']) self.logger = console.load_rich_config(self.adapi.name, 'Button')
self.logger = console.load_rich_config(self.app.name, type(self).__name__) topic = f'zigbee2mqtt/{self.button_name}'
self.config = ButtonConfig(**self.args) self.adapi.listen_event(
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
self.button = self.config.button
self.setup_buttons(self.button)
def setup_buttons(self, buttons):
if isinstance(buttons, list):
for button in buttons:
self.setup_button(button)
else:
self.setup_button(buttons)
def setup_button(self, name: str):
topic = f'zigbee2mqtt/{name}'
# self.mqtt_subscribe(topic, namespace='mqtt')
self.listen_event(
self.handle_button, self.handle_button,
'MQTT_MESSAGE', 'MQTT_MESSAGE',
topic=topic, topic=topic,
namespace='mqtt', namespace='mqtt',
button=name, button=self.button_name,
) )
self.log(f'MQTT topic [topic]{topic}[/] controls app [room]{self.app.name}[/]') self.logger.info(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
def handle_button(self, event_name, data, kwargs): def handle_button(self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]):
if event_name == 'appd_started':
return
# self.logger.info(f'Button callback: {event_name}, {data}')
try: try:
payload = json.loads(data['payload']) payload = json.loads(data['payload'])
action = payload['action'] action = payload['action']
except json.JSONDecodeError: except json.JSONDecodeError:
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR') self.logger.error(f'Error decoding JSON from {data["payload"]}')
except KeyError:
return
else: else:
if isinstance(action, str) and action != '': self.do_action(action)
self.log(f'Action: [yellow]{action}[/]')
self.handle_action(action)
def handle_action(self, action: str): def do_action(self, action: str):
if action == 'single': """Action can be single, double, or others"""
state = self.get_state(self.args['ref_entity']) if isinstance(action, str) and action != '':
kwargs = {'kwargs': {'cause': f'button single click: toggle while {state}'}} self.logger.info(f'Action: [yellow]{action}[/]')
if action == 'single':
self.adapi.call_service(
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
)
if manual_entity := self.args.get('manual_mode'):
self.set_state(entity_id=manual_entity, state='off')
if state == 'on': @dataclass
self.app.deactivate(**kwargs) class VirtualButton(Button):
else: def __post_init__(self):
self.app.activate(**kwargs) self.logger = console.load_rich_config(self.adapi.name, 'Button')
friendly_name = self.adapi.name.title().replace('_', ' ') + ' Button'
kwargs = {'entity_id': self.eid, 'friendly_name': friendly_name}
if not self.adapi.entity_exists(self.eid):
self.adapi.set_state(state=self.adapi.get_now(), **kwargs)
self.logger.info(f'Created entity [green]{self.eid}[/]')
else: else:
pass self.adapi.set_state(**kwargs)
self.logger.info(f'Set friendly name [green]{self.virtual_entity.friendly_name}[/]')
self.adapi.listen_event(self.handle_virtual_button, entity_id=self.eid)
@property
def eid(self) -> str:
return f'input_button.{self.adapi.name}'
@property
def virtual_entity(self) -> Entity:
return self.adapi.get_entity(self.eid)
def handle_virtual_button(
self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]
):
if (
event_name == 'call_service'
and data.get('service') == 'press'
and (sd := data.get('service_data'))
and sd.get('entity_id') == self.eid
):
try:
if data['service_data']['entity_id'] == self.eid:
self.logger.info(f'Virtual button press: {event_name}')
# self.virtual_entity.set_state(state=datetime.now())
self.virtual_entity.set_state(state=self.adapi.get_now())
self.do_action('single')
except KeyError as e:
self.logger.error(f'Bad data from {event_name}: {json.dumps(data, indent=4)}')

View File

@@ -46,7 +46,7 @@ class RCHighlighter(RegexHighlighter):
def load_rich_config( def load_rich_config(
room: str = None, component: str = None, level: str = 'INFO' room: str = None, component: str = None, level: str = None
) -> logging.LoggerAdapter: ) -> logging.LoggerAdapter:
logger_name = f'Appdaemon.{room}' logger_name = f'Appdaemon.{room}'
@@ -64,7 +64,6 @@ def load_rich_config(
logger_name: { logger_name: {
'handlers': ['rich' if component is None else 'rich_component'], 'handlers': ['rich' if component is None else 'rich_component'],
'propagate': False, 'propagate': False,
'level': level,
} }
} }
@@ -73,6 +72,9 @@ def load_rich_config(
if component is not None: if component is not None:
extra['component'] = component extra['component'] = component
if level is not None:
RICH_CFG['loggers'][logger_name]['level'] = level
logging.config.dictConfig(RICH_CFG) logging.config.dictConfig(RICH_CFG)
logger = logging.getLogger(logger_name) logger = logging.getLogger(logger_name)
adapter = logging.LoggerAdapter(logger, extra) adapter = logging.LoggerAdapter(logger, extra)

View File

@@ -1,26 +1,23 @@
from logging import Logger from dataclasses import dataclass
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from appdaemon.plugins.hass.hassapi import Hass
from . import console from . import console
if TYPE_CHECKING: if TYPE_CHECKING:
from room_control import RoomController from .room_control import RoomController
class Door(Hass): @dataclass
app: 'RoomController' class Door:
logger: Logger adapi: 'RoomController'
entity_id: str
async def initialize(self): def __post_init__(self):
self.app: 'RoomController' = await self.get_app(self.args['app']) self.logger = console.load_rich_config(self.adapi.name, 'Door')
self.logger = console.load_rich_config(room=self.app.name, component=type(self).__name__)
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
await self.listen_state( self.adapi.listen_state(
self.app.activate_all_off, lambda *args, **kwargs: self.adapi.activate_all_off(cause='door open'),
entity_id=self.args['door'], entity_id=self.entity_id,
new='on', new='on',
cause='door open',
) )
self.logger.debug(f'Initialized door for [room]{self.adapi.name}[/]')

View File

@@ -1,21 +1,21 @@
from datetime import datetime, time, timedelta import datetime
from pathlib import Path from pathlib import Path
from typing import Annotated, Dict, List, Optional, Self from typing import Annotated, Dict, List, Optional, Self, Union
import yaml import yaml
from astral import SunDirection from astral import SunDirection
from pydantic import BaseModel, BeforeValidator, Field, root_validator from pydantic import BaseModel, BeforeValidator, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError from pydantic_core import PydanticCustomError
from rich.console import Console, ConsoleOptions, RenderResult from rich.console import Console, ConsoleOptions, RenderResult
from rich.table import Column, Table from rich.table import Column, Table
def str_to_timedelta(input_str: str) -> timedelta: def str_to_timedelta(input_str: str) -> datetime.timedelta:
try: try:
hours, minutes, seconds = map(int, input_str.split(':')) hours, minutes, seconds = map(int, input_str.split(':'))
return timedelta(hours=hours, minutes=minutes, seconds=seconds) return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
except Exception: except Exception:
return timedelta() return datetime.timedelta()
def str_to_direction(input_str: str) -> SunDirection: def str_to_direction(input_str: str) -> SunDirection:
@@ -27,7 +27,7 @@ def str_to_direction(input_str: str) -> SunDirection:
) )
OffDuration = Annotated[timedelta, BeforeValidator(str_to_timedelta)] OffDuration = Annotated[datetime.timedelta, BeforeValidator(str_to_timedelta)]
class State(BaseModel): class State(BaseModel):
@@ -45,31 +45,47 @@ class ApplyKwargs(BaseModel):
class ControllerStateConfig(BaseModel): class ControllerStateConfig(BaseModel):
time: Optional[str | datetime] = None time: Optional[str | datetime.time | datetime.datetime] = None
elevation: Optional[float] = None elevation: Optional[float] = None
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None direction: Optional[SunDirection] = None
off_duration: Optional[OffDuration] = None off_duration: Optional[OffDuration] = None
scene: dict[str, State] | str scene: dict[str, State] | str = Field(default_factory=dict)
@root_validator(pre=True) @model_validator(mode='before')
def check_args(cls, values): def check_args(cls, values):
time, elevation = values.get('time'), values.get('elevation') if values.get('elevation') is not None and values.get('direction') is None:
if time is not None and elevation is not None:
raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
elif elevation is not None and 'direction' not in values:
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation') raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
return values return values
def to_apply_kwargs(self, **kwargs): @field_validator('direction', mode='before')
return ApplyKwargs(entities=self.scene, **kwargs).model_dump(exclude_none=True) @classmethod
def check_sun_dir(cls, val: int | str | SunDirection | None) -> SunDirection:
if isinstance(val, str):
print(f'Str sun direction: {val}')
return str_to_direction(val)
elif isinstance(val, int):
return SunDirection.SETTING if val < 0 else SunDirection.RISING
elif isinstance(val, SunDirection):
return val
def to_apply_kwargs(self, transition: int = None):
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
class MotionSensorConfig(BaseModel):
sensor: str
ref_entity: str
class RoomControllerConfig(BaseModel): class RoomControllerConfig(BaseModel):
states: List[ControllerStateConfig] = Field(default_factory=list) states: List[ControllerStateConfig] = Field(default_factory=list)
off_duration: Optional[OffDuration] = None off_duration: Optional[OffDuration] = Field(default_factory=datetime.timedelta)
sleep_state: Optional[ControllerStateConfig] = None sleep_state: Optional[ControllerStateConfig] = None
rich: Optional[str] = None rich: Optional[str] = None
manual_mode: Optional[str] = None manual_mode: Optional[str] = None
button: Optional[Union[str, List[str]]] = None
motion: Optional[MotionSensorConfig] = None
log_level: Optional[str] = None
@classmethod @classmethod
def from_yaml(cls: Self, yaml_path: Path) -> Self: def from_yaml(cls: Self, yaml_path: Path) -> Self:
@@ -98,38 +114,3 @@ class RoomControllerConfig(BaseModel):
] ]
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines)) table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
yield table yield table
def sort_states(self):
"""Should only be called after all the times have been resolved"""
assert all(
isinstance(state.time, time) for state in self.states
), 'Times have not all been resolved yet'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
def current_state(self, now: time) -> ControllerStateConfig:
self.sort_states()
for state in self.states:
if state.time <= now:
return state
else:
return self.states[0]
def current_scene(self, now: time) -> Dict:
state = self.current_state(now)
return state.scene
def current_off_duration(self, now: time) -> timedelta:
state = self.current_state(now)
if state.off_duration is None:
if self.off_duration is None:
raise ValueError('Need an off duration')
else:
return self.off_duration
else:
return state.off_duration
class ButtonConfig(BaseModel):
app: str
button: str | List[str]
ref_entity: str

View File

@@ -1,11 +1,9 @@
import re from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
from logging import Logger
from typing import TYPE_CHECKING, Literal, Optional from typing import TYPE_CHECKING, Literal, Optional
from appdaemon.entity import Entity from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from . import console from . import console
@@ -27,151 +25,103 @@ class CallbackEntry(BaseModel):
Callbacks = dict[str, dict[str, CallbackEntry]] Callbacks = dict[str, dict[str, CallbackEntry]]
class Motion(Hass): @dataclass
logger: Logger class MotionSensor:
app: 'RoomController' adapi: 'RoomController'
sensor_entity_id: str
ref_entity_id: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Motion')
assert self.sensor_entity.exists()
assert self.ref_entity.exists()
self.ref_entity.listen_state(self.light_state_callback, immediate=True)
self.match_new_state(new=self.sensor_entity.get_state())
self.logger.info('Initialized motion sensor')
def entity_callbacks(self, entity_id: str | None = None) -> dict[str, dict]:
callbacks = self.adapi.get_callback_entries()
if self.adapi.name in callbacks:
return {
handle: cb
for handle, cb in callbacks[self.adapi.name].items()
if cb.get('entity') == entity_id
}
else:
return {}
def sensor_callbacks(self) -> dict[str, dict]:
return self.entity_callbacks(entity_id=self.sensor_entity_id)
@property @property
def sensor(self) -> Entity: def sensor_entity(self) -> Entity:
return self.get_entity(self.args['sensor']) return self.adapi.get_entity(self.sensor_entity_id)
@property @property
def sensor_state(self) -> bool: def sensor_state(self) -> bool:
return self.sensor.state == 'on' return self.sensor_entity.get_state() == 'on'
@property @property
def ref_entity(self) -> Entity: def ref_entity(self) -> Entity:
return self.get_entity(self.args['ref_entity']) return self.adapi.get_entity(self.ref_entity_id)
@property @property
def ref_entity_state(self) -> bool: def ref_state(self) -> bool:
return self.ref_entity.get_state() == 'on' return self.ref_entity.get_state() == 'on'
@property @property
def state_mismatch(self) -> bool: def state_mismatch(self) -> bool:
return self.sensor_state != self.ref_entity_state return self.sensor_state != self.ref_state
def initialize(self): def light_state_callback(self, entity: str, attribute: str, old: str, new: str, **kwargs: dict):
self.app: 'RoomController' = self.get_app(self.args['app']) for handle in self.sensor_callbacks():
self.logger = console.load_rich_config(self.app.name, type(self).__name__) self.adapi.cancel_listen_state(handle)
self.match_new_state(new)
assert self.entity_exists(self.args['sensor']) def match_new_state(self, new: Literal['on', 'off']):
assert self.entity_exists(self.args['ref_entity']) match new:
case 'on':
base_kwargs = dict( duration = self.adapi.off_duration()
entity_id=self.ref_entity.entity_id, self.listen_motion_off(duration)
immediate=True, # avoids needing to sync the state case 'off':
) self.listen_motion_on()
if self.state_mismatch:
self.log(
f'Sensor is {self.sensor_state} ' f'and light is {self.ref_entity_state}',
level='WARNING',
)
if self.sensor_state:
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
self.listen_state(
**base_kwargs,
attribute='brightness',
callback=self.callback_light_on,
)
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
for handle, cb in self.callbacks():
self.log(f'Handle [yellow]{handle[:4]}[/]: {cb.function}', level='DEBUG')
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def callbacks(self):
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
self_callbacks = self.get_callback_entries().get(self.name, {})
for handle, cb_dict in self_callbacks.items():
try:
yield handle, CallbackEntry.model_validate(cb_dict)
except ValidationError as e:
self.logger.error('Error parsing callback dictionary')
self.logger.error(e)
def listen_motion_on(self): def listen_motion_on(self):
"""Sets up the motion on callback to activate the room""" """Sets up the motion on callback to activate the room"""
self.cancel_motion_callback() # self.cancel_motion_callback()
self.listen_state( self.sensor_entity.listen_state(
callback=self.app.activate_all_off, lambda *args, **kwargs: self.adapi.activate_all_off(cause='motion on'),
entity_id=self.sensor.entity_id,
new='on', new='on',
oneshot=True, oneshot=True,
cause='motion on',
) )
self.log(f'Waiting for sensor motion on [friendly_name]{self.sensor.friendly_name}[/]') self.logger.info(
'Waiting for sensor motion on '
f'[friendly_name]{self.sensor_entity.friendly_name}[/]'
)
if self.sensor_state: if self.sensor_state:
self.log( self.logger.warning(
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is already on', 'Sensor '
level='WARNING', f'[friendly_name]{self.sensor_entity.friendly_name}[/] is already on',
) )
def listen_motion_off(self, duration: timedelta): def listen_motion_off(self, duration: timedelta):
"""Sets up the motion off callback to deactivate the room""" """Sets up the motion off callback to deactivate the room"""
self.cancel_motion_callback() # self.cancel_motion_callback()
self.listen_state( self.sensor_entity.listen_state(
callback=self.app.deactivate, lambda *args, **kwargs: self.adapi.deactivate(cause='motion off'),
entity_id=self.sensor.entity_id,
new='off', new='off',
duration=duration.total_seconds(), duration=duration.total_seconds(),
oneshot=True, oneshot=True,
cause='motion off',
) )
self.log( self.logger.debug(
f'Waiting for sensor [friendly_name]{self.sensor.friendly_name}[/] to be clear for {duration}' 'Waiting for sensor '
f'[friendly_name]{self.sensor_entity.friendly_name}[/] '
f'to be clear for {duration}'
) )
if not self.sensor_state: if not self.sensor_state:
self.log( self.logger.warning(
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is currently off', f'Sensor [friendly_name]{self.sensor_entity.friendly_name}[/] is currently off',
level='WARNING',
) )
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns on"""
if new is not None:
self.log(f'Detected {entity} turning on', level='DEBUG')
duration = self.app.off_duration()
self.listen_motion_off(duration)
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns off"""
self.log(f'Detected {entity} turning off', level='DEBUG')
self.listen_motion_on()
def get_app_callbacks(self, name: str = None):
"""Gets all the callbacks associated with the app"""
name = name or self.name
callbacks = {
handle: info
for app_name, callbacks in self.get_callback_entries().items()
for handle, info in callbacks.items()
if app_name == name
}
return callbacks
def get_sensor_callbacks(self):
return {
handle: info
for handle, info in self.get_app_callbacks().items()
if info['entity'] == self.sensor.entity_id
}
def cancel_motion_callback(self):
callbacks = self.get_sensor_callbacks()
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
for handle, info in callbacks.items():
entity = info['entity']
kwargs = info['kwargs']
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
new = m.group('new')
self.cancel_listen_state(handle)
self.log(
f'cancelled callback for sensor {entity} turning {new}',
level='DEBUG',
)

View File

@@ -1,21 +1,24 @@
import datetime import datetime
import json
import logging import logging
import logging.config import logging.config
from copy import deepcopy import traceback
from typing import Dict, List from functools import wraps
from typing import Dict, List, Set
from appdaemon.entity import Entity from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass from appdaemon.plugins.hass.hassapi import Hass
from appdaemon.plugins.mqtt.mqttapi import Mqtt from astral.location import Location
from . import console from . import console
from .button import Button, VirtualButton
from .door import Door
from .model import ControllerStateConfig, RoomControllerConfig from .model import ControllerStateConfig, RoomControllerConfig
from .motion import MotionSensor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RoomController(Hass, Mqtt): class RoomController(Hass):
"""Class for linking room's lights with a motion sensor. """Class for linking room's lights with a motion sensor.
- Separate the turning on and turning off functions. - Separate the turning on and turning off functions.
@@ -23,6 +26,15 @@ class RoomController(Hass, Mqtt):
- `handle_on` - `handle_on`
- `handle_off` - `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time. - When the light comes on, check if it's attributes match what they should, given the time.
## Services
- <name>/activate
- <name>/activate_all_off
- <name>/deactivate
- <name>/toggle
""" """
@property @property
@@ -34,39 +46,88 @@ class RoomController(Hass, Mqtt):
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}' assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
self._room_config.states = new self._room_config.states = new
@property
@wraps(Location.time_at_elevation)
def time_at_elevation(self):
return self.AD.sched.location.time_at_elevation
@property
def state_entity(self) -> Entity:
return self.get_entity(f'{self.name}.state', namespace='controller')
def initialize(self): def initialize(self):
self.logger = console.load_rich_config(self.name) self.logger = console.load_rich_config(self.name, level=self.args.get('log_level', 'INFO'))
self.app_entities = self.gather_app_entities()
# self.log(f'entities: {self.app_entities}')
self.refresh_state_times() self.refresh_state_times()
self.run_daily(callback=self.refresh_state_times, start='00:00:00') self.run_daily(callback=self.refresh_state_times, start='00:00:00')
self.register_service(
f'{self.name}/activate', self._service_activate, namespace='controller'
)
self.register_service(
f'{self.name}/activate_all_off', self._service_activate_all_off, namespace='controller'
)
self.register_service(
f'{self.name}/activate_any_on', self._service_activate_any_on, namespace='controller'
)
self.register_service(
f'{self.name}/deactivate', self._service_deactivate, namespace='controller'
)
self.register_service(f'{self.name}/toggle', self._service_toggle, namespace='controller')
# This needs to come after this first call of refresh_state_times
self.app_entities = self.get_app_entities()
self.log(f'entities: {self.app_entities}', level='DEBUG')
if button := self.args.get('button'):
if isinstance(button, str):
Button(self, button_name=button)
VirtualButton(self, button_name=button)
elif isinstance(button, list) and all(isinstance(b, str) for b in button):
for b in button:
Button(self, button_name=b)
VirtualButton(self, button_name=button)
if door := self.args.get('door'):
if isinstance(door, str):
self.door = Door(self, entity_id=door)
if motion := self.args.get('motion'):
self.motion = MotionSensor(
self, sensor_entity_id=motion['sensor'], ref_entity_id=motion['ref_entity']
)
state: ControllerStateConfig
for state in sorted(self._room_config.states, key=lambda s: s.time, reverse=True):
if isinstance(state.time, datetime.datetime):
t = state.time.time()
else:
t = state.time
if t < self.get_now().time():
self.log(f'Initial state: {state.time}', level='DEBUG')
self.set_controller_scene(state)
break
self.log(f'Initialized [bold green]{type(self).__name__}[/]') self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def terminate(self): def terminate(self):
self.log('[bold red]Terminating[/]', level='DEBUG') self.log('[bold red]Terminating[/]', level='DEBUG')
def gather_app_entities(self) -> List[str]: def get_app_entities(self) -> Set[str]:
"""Returns a list of all the entities involved in any of the states""" """Gets a set of all the entities referenced by any of the state definitions"""
def generator(): def gen():
for settings in deepcopy(self.args['states']): for state in self._room_config.states:
if scene := settings.get('scene'): if isinstance(state.scene, str):
if isinstance(scene, str): assert state.scene.startswith(
assert scene.startswith( 'scene.'
'scene.' ), "Scene definition must start with 'scene.'"
), f"Scene definition must start with 'scene.' for app {self.name}" entities = self.get_state(state.scene, attribute='entity_id')
entity: Entity = self.get_entity(scene) yield from entities
entity_state = entity.get_state('all')
attributes = entity_state['attributes']
for entity in attributes['entity_id']:
yield entity
else:
for key in scene.keys():
yield key
else: else:
yield self.args['entity'] yield from state.scene.keys()
return set(list(generator())) return set(gen())
def refresh_state_times(self, *args, **kwargs): def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states. """Resets the `self.states` attribute to a newly parsed version of the states.
@@ -82,7 +143,7 @@ class RoomController(Hass, Mqtt):
for state in self._room_config.states: for state in self._room_config.states:
if state.time is None and state.elevation is not None: if state.time is None and state.elevation is not None:
state.time = self.AD.sched.location.time_at_elevation( state.time = self.time_at_elevation(
elevation=state.elevation, direction=state.direction elevation=state.elevation, direction=state.direction
).time() ).time()
elif isinstance(state.time, str): elif isinstance(state.time, str):
@@ -90,38 +151,94 @@ class RoomController(Hass, Mqtt):
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}' assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
# schedule the transitions
for state in self.states[::-1]:
# t: datetime.time = state['time']
t: datetime.time = state.time
try: try:
self.run_at( self.run_at(
callback=self.activate_any_on, callback=lambda **kwargs: self.set_controller_scene(kwargs['state']),
start=t.strftime('%H:%M:%S'), start=state.time.strftime('%H:%M:%S'),
cause='scheduled transition', state=state,
) )
except ValueError:
# happens when the callback time is in the past
pass
except Exception as e: except Exception as e:
self.log(f'Failed with {type(e)}: {e}') self.log(f'Failed with {type(e)}: {e}')
def current_state(self, now: datetime.time = None) -> ControllerStateConfig: self.state_entity.listen_state(
lambda *args, **kwargs: self.call_service(
f'{self.name}/activate_any_on', namespace='controller', cause='state transition'
),
attribute='all',
)
def set_controller_scene(self, state: ControllerStateConfig):
"""Sets the internal state for the app. Only used by the scheduled transition"""
try:
self.state_entity.set_state(attributes=state.model_dump())
except Exception:
self.logger.error(traceback.format_exc())
else:
self.log(f'Set controller state of {self.name}: {state.model_dump()}', level='DEBUG')
def current_state(self) -> ControllerStateConfig:
if self.sleep_bool(): if self.sleep_bool():
self.log('sleep: active') self.log('sleep: active', level='DEBUG')
if state := self.args.get('sleep_state'): if state := self.args.get('sleep_state'):
return ControllerStateConfig(**state) return ControllerStateConfig(**state)
else: else:
return ControllerStateConfig(scene={}) return ControllerStateConfig()
else: else:
now = now or self.get_now().time().replace(microsecond=0) try:
self.log(f'Getting state for {now.strftime("%I:%M:%S %p")}', level='DEBUG') attrs = self.state_entity.get_state('all')['attributes']
state = ControllerStateConfig.model_validate(attrs)
except Exception as e:
state = ControllerStateConfig()
logger.exception(e)
finally:
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
return state
state = self._room_config.current_state(now) def activate(self, **kwargs):
self.log(f'Current state: {state.time}', level='DEBUG') self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
return state
def _service_activate(self, namespace: str, domain: str, service: str, **kwargs):
# self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
state = self.current_state()
if isinstance(state.scene, str):
self.turn_on(state.scene)
# self.turn_on(state.scene, transition=0)
elif isinstance(state.scene, dict):
scene = state.to_apply_kwargs()
self.call_service('scene/apply', **scene)
# scene = state.to_apply_kwargs(transition=0)
def activate_any_on(self, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
def _service_activate_any_on(self, namespace: str, domain: str, service: str, **kwargs):
if self.any_on() and not self.manual_mode():
self.activate(**kwargs)
def activate_all_off(self, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
def _service_activate_all_off(self, namespace: str, domain: str, service: str, **kwargs):
if self.all_off() and not self.manual_mode():
self.activate(**kwargs)
def deactivate(self, **kwargs):
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
def _service_deactivate(self, namespace: str, domain: str, service: str, **kwargs):
for e in self.app_entities:
self.turn_off(e)
def toggle(self, **kwargs):
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
def _service_toggle(self, namespace: str, domain: str, service: str, **kwargs):
if self.any_on():
self.deactivate(**kwargs)
else:
self.activate(**kwargs)
def app_entity_states(self) -> Dict[str, str]: def app_entity_states(self) -> Dict[str, str]:
states = {entity: self.get_state(entity) for entity in self.app_entities} states = {entity: self.get_state(entity) for entity in self.app_entities}
@@ -157,16 +274,6 @@ class RoomController(Hass, Mqtt):
else: else:
return False return False
# @sleep_bool.setter
# def sleep_bool(self, val) -> bool:
# if (sleep_var := self.args.get('sleep')):
# if isinstance(val, str):
# self.set_state(sleep_var, state=val)
# elif isinstance(val, bool):
# self.set_state(sleep_var, state='on' if val else 'off')
# else:
# raise ValueError('Sleep variable is undefined')
def off_duration(self, now: datetime.time = None) -> datetime.timedelta: def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating """Determines the time that the motion sensor has to be clear before deactivating
@@ -177,61 +284,8 @@ class RoomController(Hass, Mqtt):
- Sleep - 0 - Sleep - 0
""" """
sleep_mode_active = self.sleep_bool() if sleep_active := self.sleep_bool():
if sleep_mode_active: self.log(f'Sleeping mode active: {sleep_active}', level='DEBUG')
self.log(f'Sleeping mode active: {sleep_mode_active}')
return datetime.timedelta() return datetime.timedelta()
else: else:
now = now or self.get_now().time() return self.current_state().off_duration or self._room_config.off_duration
return self._room_config.current_off_duration(now)
def activate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
if kwargs is not None:
cause = kwargs.get('cause', 'unknown')
else:
cause = 'unknown'
self.log(f'Activating: {cause}')
scene_kwargs = self.current_state().to_apply_kwargs(transition=0)
if isinstance(scene_kwargs, str):
self.turn_on(scene_kwargs)
self.log(f'Turned on scene: {scene_kwargs}')
elif isinstance(scene_kwargs, dict):
self.call_service('scene/apply', **scene_kwargs)
self.log(f'Applied scene:\n{json.dumps(scene_kwargs, indent=2)}', level='DEBUG')
elif scene_kwargs is None:
self.log('No scene, ignoring...')
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
# self.callback_light_off()
else:
self.log(f'ERROR: unknown scene: {scene_kwargs}')
def activate_all_off(self, *args, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
if self.all_off():
self.activate(*args, **kwargs)
else:
self.log('Skipped activating - everything is not off')
def activate_any_on(self, *args, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
if self.any_on() and not self.manual_mode():
self.activate(*args, **kwargs)
else:
self.log('Skipped activating - everything is off')
def toggle_activate(self, *args, **kwargs):
if self.any_on():
self.deactivate(*args, **kwargs)
else:
self.activate(*args, **kwargs)
def deactivate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
cause = kwargs.get('cause', 'unknown')
self.log(f'Deactivating: {cause}')
for e in self.app_entities:
self.turn_off(e)
self.log(f'Turned off {e}')