Compare commits

5 Commits

Author SHA1 Message Date
John Lancaster
1c5edf6cc7 button and door converted 2024-07-27 14:35:07 -05:00
John Lancaster
9ce8432bba added some services 2024-07-27 12:17:34 -05:00
John Lancaster
7f68c8cad2 WIP 2024-07-25 22:57:47 -05:00
John Lancaster
a703fd15fb started entities and custom services in new namespace 2024-07-25 00:25:08 -05:00
John Lancaster
043402ad2f changed datetime import 2024-07-25 00:24:07 -05:00
8 changed files with 222 additions and 234 deletions

View File

@@ -12,17 +12,16 @@ dependencies = [
"ruff>=0.4.2",
]
readme = "README.md"
requires-python = ">= 3.10,<3.13"
requires-python = ">= 3.8,<3.12"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.setuptools]
include-package-data = true
[tool.hatch.metadata]
allow-direct-references = true
# [tool.setuptools.package-data]
# mypkg = ["*.yaml"]
[tool.hatch.build.targets.wheel]
packages = ["src/room_control"]
[tool.setuptools.data-files]
config = ["config/default_config.yaml"]
[tool.ruff.format]
quote-style = 'single'

View File

@@ -1,6 +1,6 @@
from .room_control import RoomController
from .motion import Motion
from .button import Button
from .door import Door
from .motion import MotionSensor
from .room_control import RoomController
__all__ = ['RoomController', 'MotionSensor', 'Button', 'Door']
__all__ = ['RoomController', 'Motion', 'Button', 'Door']

View File

@@ -1,22 +1,17 @@
import json
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict
from typing import Any, Dict
from appdaemon.entity import Entity
from . import console
if TYPE_CHECKING:
from .room_control import RoomController
from appdaemon.adapi import ADAPI
@dataclass
class Button:
adapi: 'RoomController'
adapi: ADAPI
button_name: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Button')
self.log = self.adapi.log
topic = f'zigbee2mqtt/{self.button_name}'
self.adapi.listen_event(
self.handle_button,
@@ -25,71 +20,20 @@ class Button:
namespace='mqtt',
button=self.button_name,
)
self.logger.info(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
self.log(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
def handle_button(self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]):
if event_name == 'appd_started':
return
# self.logger.info(f'Button callback: {event_name}, {data}')
def handle_button(self, event_name: str, data: Dict[str, Any], kwargs: Dict[str, Any]):
try:
payload = json.loads(data['payload'])
action = payload['action']
except json.JSONDecodeError:
self.logger.error(f'Error decoding JSON from {data["payload"]}')
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
except KeyError:
return
else:
self.do_action(action)
def do_action(self, action: str):
"""Action can be single, double, or others"""
if isinstance(action, str) and action != '':
self.logger.info(f'Action: [yellow]{action}[/]')
self.log(f'Action: [yellow]{action}[/]')
if action == 'single':
self.adapi.call_service(
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
)
@dataclass
class VirtualButton(Button):
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Button')
friendly_name = self.adapi.name.title().replace('_', ' ') + ' Button'
kwargs = {'entity_id': self.eid, 'friendly_name': friendly_name}
if not self.adapi.entity_exists(self.eid):
self.adapi.set_state(state=self.adapi.get_now(), **kwargs)
self.logger.info(f'Created entity [green]{self.eid}[/]')
else:
self.adapi.set_state(**kwargs)
self.logger.info(f'Set friendly name [green]{self.virtual_entity.friendly_name}[/]')
self.adapi.listen_event(self.handle_virtual_button, entity_id=self.eid)
@property
def eid(self) -> str:
return f'input_button.{self.adapi.name}'
@property
def virtual_entity(self) -> Entity:
return self.adapi.get_entity(self.eid)
def handle_virtual_button(
self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]
):
if (
event_name == 'call_service'
and data.get('service') == 'press'
and (sd := data.get('service_data'))
and sd.get('entity_id') == self.eid
):
try:
if data['service_data']['entity_id'] == self.eid:
self.logger.info(f'Virtual button press: {event_name}')
# self.virtual_entity.set_state(state=datetime.now())
self.virtual_entity.set_state(state=self.adapi.get_now())
self.do_action('single')
except KeyError as e:
self.logger.error(f'Bad data from {event_name}: {json.dumps(data, indent=4)}')

View File

@@ -46,7 +46,7 @@ class RCHighlighter(RegexHighlighter):
def load_rich_config(
room: str = None, component: str = None, level: str = None
room: str = None, component: str = None, level: str = 'INFO'
) -> logging.LoggerAdapter:
logger_name = f'Appdaemon.{room}'
@@ -64,6 +64,7 @@ def load_rich_config(
logger_name: {
'handlers': ['rich' if component is None else 'rich_component'],
'propagate': False,
'level': level,
}
}
@@ -72,9 +73,6 @@ def load_rich_config(
if component is not None:
extra['component'] = component
if level is not None:
RICH_CFG['loggers'][logger_name]['level'] = level
logging.config.dictConfig(RICH_CFG)
logger = logging.getLogger(logger_name)
adapter = logging.LoggerAdapter(logger, extra)

View File

@@ -1,23 +1,25 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING
from logging import Logger, LoggerAdapter
from . import console
if TYPE_CHECKING:
from .room_control import RoomController
from appdaemon.adapi import ADAPI
@dataclass
class Door:
adapi: 'RoomController'
adapi: ADAPI
entity_id: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Door')
self.logger = LoggerAdapter(
self.adapi.logger.logger.getChild('door'), self.adapi.logger.extra
)
self.adapi.listen_state(
lambda *args, **kwargs: self.adapi.activate_all_off(cause='door open'),
callback=lambda *args, **kwargs: self.call_service(
f'{self.adapi.name}/activate_all_off', namespace='controller'
),
entity_id=self.entity_id,
new='on',
cause='door open',
)
self.logger.debug(f'Initialized door for [room]{self.adapi.name}[/]')

View File

@@ -1,10 +1,10 @@
import datetime
from pathlib import Path
from typing import Annotated, Dict, List, Optional, Self, Union
from typing import Annotated, Dict, List, Optional, Self
import yaml
from astral import SunDirection
from pydantic import BaseModel, BeforeValidator, Field, field_validator, model_validator
from pydantic import BaseModel, BeforeValidator, Field, model_validator, root_validator
from pydantic_core import PydanticCustomError
from rich.console import Console, ConsoleOptions, RenderResult
from rich.table import Column, Table
@@ -47,45 +47,29 @@ class ApplyKwargs(BaseModel):
class ControllerStateConfig(BaseModel):
time: Optional[str | datetime.time | datetime.datetime] = None
elevation: Optional[float] = None
direction: Optional[SunDirection] = None
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None
off_duration: Optional[OffDuration] = None
scene: dict[str, State] | str = Field(default_factory=dict)
@model_validator(mode='before')
def check_args(cls, values):
if values.get('elevation') is not None and values.get('direction') is None:
time, elevation = values.get('time'), values.get('elevation')
# if time is not None and elevation is not None:
# raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
if elevation is not None and 'direction' not in values:
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
return values
@field_validator('direction', mode='before')
@classmethod
def check_sun_dir(cls, val: int | str | SunDirection | None) -> SunDirection:
if isinstance(val, str):
print(f'Str sun direction: {val}')
return str_to_direction(val)
elif isinstance(val, int):
return SunDirection.SETTING if val < 0 else SunDirection.RISING
elif isinstance(val, SunDirection):
return val
def to_apply_kwargs(self, transition: int = None):
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
class MotionSensorConfig(BaseModel):
sensor: str
ref_entity: str
class RoomControllerConfig(BaseModel):
states: List[ControllerStateConfig] = Field(default_factory=list)
off_duration: Optional[OffDuration] = Field(default_factory=datetime.timedelta)
off_duration: Optional[OffDuration] = None
sleep_state: Optional[ControllerStateConfig] = None
rich: Optional[str] = None
manual_mode: Optional[str] = None
button: Optional[Union[str, List[str]]] = None
motion: Optional[MotionSensorConfig] = None
log_level: Optional[str] = None
@classmethod
def from_yaml(cls: Self, yaml_path: Path) -> Self:
@@ -114,3 +98,38 @@ class RoomControllerConfig(BaseModel):
]
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
yield table
def sort_states(self):
"""Should only be called after all the times have been resolved"""
assert all(
isinstance(state.time, datetime.time) for state in self.states
), 'Times have not all been resolved yet'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
def current_state(self, now: datetime.time) -> ControllerStateConfig:
self.sort_states()
for state in self.states:
if state.time <= now:
return state
else:
return self.states[0]
def current_scene(self, now: datetime.time) -> Dict:
state = self.current_state(now)
return state.scene
def current_off_duration(self, now: datetime.time) -> datetime.timedelta:
state = self.current_state(now)
if state.off_duration is None:
if self.off_duration is None:
raise ValueError('Need an off duration')
else:
return self.off_duration
else:
return state.off_duration
class ButtonConfig(BaseModel):
app: str
button: str | List[str]
ref_entity: str

View File

@@ -1,9 +1,11 @@
from dataclasses import dataclass
import re
from datetime import timedelta
from logging import Logger
from typing import TYPE_CHECKING, Literal, Optional
from appdaemon.entity import Entity
from pydantic import BaseModel
from appdaemon.plugins.hass.hassapi import Hass
from pydantic import BaseModel, ValidationError
from . import console
@@ -25,103 +27,151 @@ class CallbackEntry(BaseModel):
Callbacks = dict[str, dict[str, CallbackEntry]]
@dataclass
class MotionSensor:
adapi: 'RoomController'
sensor_entity_id: str
ref_entity_id: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Motion')
assert self.sensor_entity.exists()
assert self.ref_entity.exists()
self.ref_entity.listen_state(self.light_state_callback, immediate=True)
self.match_new_state(new=self.sensor_entity.get_state())
self.logger.info('Initialized motion sensor')
def entity_callbacks(self, entity_id: str | None = None) -> dict[str, dict]:
callbacks = self.adapi.get_callback_entries()
if self.adapi.name in callbacks:
return {
handle: cb
for handle, cb in callbacks[self.adapi.name].items()
if cb.get('entity') == entity_id
}
else:
return {}
def sensor_callbacks(self) -> dict[str, dict]:
return self.entity_callbacks(entity_id=self.sensor_entity_id)
class Motion(Hass):
logger: Logger
app: 'RoomController'
@property
def sensor_entity(self) -> Entity:
return self.adapi.get_entity(self.sensor_entity_id)
def sensor(self) -> Entity:
return self.get_entity(self.args['sensor'])
@property
def sensor_state(self) -> bool:
return self.sensor_entity.get_state() == 'on'
return self.sensor.state == 'on'
@property
def ref_entity(self) -> Entity:
return self.adapi.get_entity(self.ref_entity_id)
return self.get_entity(self.args['ref_entity'])
@property
def ref_state(self) -> bool:
def ref_entity_state(self) -> bool:
return self.ref_entity.get_state() == 'on'
@property
def state_mismatch(self) -> bool:
return self.sensor_state != self.ref_state
return self.sensor_state != self.ref_entity_state
def light_state_callback(self, entity: str, attribute: str, old: str, new: str, **kwargs: dict):
for handle in self.sensor_callbacks():
self.adapi.cancel_listen_state(handle)
self.match_new_state(new)
def initialize(self):
self.app: 'RoomController' = self.get_app(self.args['app'])
self.logger = console.load_rich_config(self.app.name, type(self).__name__)
def match_new_state(self, new: Literal['on', 'off']):
match new:
case 'on':
duration = self.adapi.off_duration()
self.listen_motion_off(duration)
case 'off':
self.listen_motion_on()
assert self.entity_exists(self.args['sensor'])
assert self.entity_exists(self.args['ref_entity'])
base_kwargs = dict(
entity_id=self.ref_entity.entity_id,
immediate=True, # avoids needing to sync the state
)
if self.state_mismatch:
self.log(
f'Sensor is {self.sensor_state} ' f'and light is {self.ref_entity_state}',
level='WARNING',
)
if self.sensor_state:
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
self.listen_state(
**base_kwargs,
attribute='brightness',
callback=self.callback_light_on,
)
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
for handle, cb in self.callbacks():
self.log(f'Handle [yellow]{handle[:4]}[/]: {cb.function}', level='DEBUG')
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def callbacks(self):
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
self_callbacks = self.get_callback_entries().get(self.name, {})
for handle, cb_dict in self_callbacks.items():
try:
yield handle, CallbackEntry.model_validate(cb_dict)
except ValidationError as e:
self.logger.error('Error parsing callback dictionary')
self.logger.error(e)
def listen_motion_on(self):
"""Sets up the motion on callback to activate the room"""
# self.cancel_motion_callback()
self.sensor_entity.listen_state(
lambda *args, **kwargs: self.adapi.activate_all_off(cause='motion on'),
self.cancel_motion_callback()
self.listen_state(
callback=self.app.activate_all_off,
entity_id=self.sensor.entity_id,
new='on',
oneshot=True,
cause='motion on',
)
self.logger.info(
'Waiting for sensor motion on '
f'[friendly_name]{self.sensor_entity.friendly_name}[/]'
)
self.log(f'Waiting for sensor motion on [friendly_name]{self.sensor.friendly_name}[/]')
if self.sensor_state:
self.logger.warning(
'Sensor '
f'[friendly_name]{self.sensor_entity.friendly_name}[/] is already on',
self.log(
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is already on',
level='WARNING',
)
def listen_motion_off(self, duration: timedelta):
"""Sets up the motion off callback to deactivate the room"""
# self.cancel_motion_callback()
self.sensor_entity.listen_state(
lambda *args, **kwargs: self.adapi.deactivate(cause='motion off'),
self.cancel_motion_callback()
self.listen_state(
callback=self.app.deactivate,
entity_id=self.sensor.entity_id,
new='off',
duration=duration.total_seconds(),
oneshot=True,
cause='motion off',
)
self.logger.debug(
'Waiting for sensor '
f'[friendly_name]{self.sensor_entity.friendly_name}[/] '
f'to be clear for {duration}'
self.log(
f'Waiting for sensor [friendly_name]{self.sensor.friendly_name}[/] to be clear for {duration}'
)
if not self.sensor_state:
self.logger.warning(
f'Sensor [friendly_name]{self.sensor_entity.friendly_name}[/] is currently off',
self.log(
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is currently off',
level='WARNING',
)
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns on"""
if new is not None:
self.log(f'Detected {entity} turning on', level='DEBUG')
duration = self.app.off_duration()
self.listen_motion_off(duration)
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns off"""
self.log(f'Detected {entity} turning off', level='DEBUG')
self.listen_motion_on()
def get_app_callbacks(self, name: str = None):
"""Gets all the callbacks associated with the app"""
name = name or self.name
callbacks = {
handle: info
for app_name, callbacks in self.get_callback_entries().items()
for handle, info in callbacks.items()
if app_name == name
}
return callbacks
def get_sensor_callbacks(self):
return {
handle: info
for handle, info in self.get_app_callbacks().items()
if info['entity'] == self.sensor.entity_id
}
def cancel_motion_callback(self):
callbacks = self.get_sensor_callbacks()
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
for handle, info in callbacks.items():
entity = info['entity']
kwargs = info['kwargs']
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
new = m.group('new')
self.cancel_listen_state(handle)
self.log(
f'cancelled callback for sensor {entity} turning {new}',
level='DEBUG',
)

View File

@@ -3,17 +3,16 @@ import logging
import logging.config
import traceback
from functools import wraps
from typing import Dict, List, Set
from typing import Any, Dict, List, Set
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from astral.location import Location
from . import console
from .button import Button, VirtualButton
from .button import Button
from .door import Door
from .model import ControllerStateConfig, RoomControllerConfig
from .motion import MotionSensor
logger = logging.getLogger(__name__)
@@ -56,7 +55,8 @@ class RoomController(Hass):
return self.get_entity(f'{self.name}.state', namespace='controller')
def initialize(self):
self.logger = console.load_rich_config(self.name, level=self.args.get('log_level', 'INFO'))
self.logger = console.load_rich_config(self.name)
self.set_log_level('DEBUG')
self.refresh_state_times()
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
@@ -67,9 +67,6 @@ class RoomController(Hass):
self.register_service(
f'{self.name}/activate_all_off', self._service_activate_all_off, namespace='controller'
)
self.register_service(
f'{self.name}/activate_any_on', self._service_activate_any_on, namespace='controller'
)
self.register_service(
f'{self.name}/deactivate', self._service_deactivate, namespace='controller'
)
@@ -81,33 +78,13 @@ class RoomController(Hass):
if button := self.args.get('button'):
if isinstance(button, str):
Button(self, button_name=button)
VirtualButton(self, button_name=button)
elif isinstance(button, list) and all(isinstance(b, str) for b in button):
for b in button:
Button(self, button_name=b)
VirtualButton(self, button_name=button)
self.button = Button(self, button_name=button)
if door := self.args.get('door'):
if isinstance(door, str):
self.log('door--')
self.door = Door(self, entity_id=door)
if motion := self.args.get('motion'):
self.motion = MotionSensor(
self, sensor_entity_id=motion['sensor'], ref_entity_id=motion['ref_entity']
)
state: ControllerStateConfig
for state in sorted(self._room_config.states, key=lambda s: s.time, reverse=True):
if isinstance(state.time, datetime.datetime):
t = state.time.time()
else:
t = state.time
if t < self.get_now().time():
self.log(f'Initial state: {state.time}', level='DEBUG')
self.set_controller_scene(state)
break
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def terminate(self):
@@ -153,22 +130,14 @@ class RoomController(Hass):
try:
self.run_at(
callback=lambda **kwargs: self.set_controller_scene(kwargs['state']),
callback=lambda cb_args: self.set_controller_scene(cb_args['state']),
start=state.time.strftime('%H:%M:%S'),
state=state,
)
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
self.state_entity.listen_state(
lambda *args, **kwargs: self.call_service(
f'{self.name}/activate_any_on', namespace='controller', cause='state transition'
),
attribute='all',
)
def set_controller_scene(self, state: ControllerStateConfig):
"""Sets the internal state for the app. Only used by the scheduled transition"""
try:
self.state_entity.set_state(attributes=state.model_dump())
except Exception:
@@ -187,9 +156,8 @@ class RoomController(Hass):
try:
attrs = self.state_entity.get_state('all')['attributes']
state = ControllerStateConfig.model_validate(attrs)
except Exception as e:
except Exception:
state = ControllerStateConfig()
logger.exception(e)
finally:
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
return state
@@ -197,8 +165,8 @@ class RoomController(Hass):
def activate(self, **kwargs):
self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
def _service_activate(self, namespace: str, domain: str, service: str, **kwargs):
# self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
def _service_activate(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
state = self.current_state()
if isinstance(state.scene, str):
self.turn_on(state.scene)
@@ -212,7 +180,9 @@ class RoomController(Hass):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
def _service_activate_any_on(self, namespace: str, domain: str, service: str, **kwargs):
def _service_activate_any_on(
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
):
if self.any_on() and not self.manual_mode():
self.activate(**kwargs)
@@ -220,21 +190,25 @@ class RoomController(Hass):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
def _service_activate_all_off(self, namespace: str, domain: str, service: str, **kwargs):
def _service_activate_all_off(
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
):
if self.all_off() and not self.manual_mode():
self.activate(**kwargs)
def deactivate(self, **kwargs):
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
def _service_deactivate(self, namespace: str, domain: str, service: str, **kwargs):
def _service_deactivate(
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
):
for e in self.app_entities:
self.turn_off(e)
def toggle(self, **kwargs):
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
def _service_toggle(self, namespace: str, domain: str, service: str, **kwargs):
def _service_toggle(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
if self.any_on():
self.deactivate(**kwargs)
else:
@@ -284,8 +258,10 @@ class RoomController(Hass):
- Sleep - 0
"""
if sleep_active := self.sleep_bool():
self.log(f'Sleeping mode active: {sleep_active}', level='DEBUG')
sleep_mode_active = self.sleep_bool()
if sleep_mode_active:
self.log(f'Sleeping mode active: {sleep_mode_active}')
return datetime.timedelta()
else:
return self.current_state().off_duration or self._room_config.off_duration
now = now or self.get_now().time()
return self._room_config.current_off_duration(now)