Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21e1431001 | ||
|
|
2cd02627d1 | ||
|
|
37c3a134de | ||
|
|
92ddcaa25d | ||
|
|
9af8db9198 | ||
|
|
9dfd3e7f38 | ||
|
|
1c5edf6cc7 | ||
|
|
9ce8432bba | ||
|
|
7f68c8cad2 | ||
|
|
a703fd15fb | ||
|
|
043402ad2f |
@@ -12,17 +12,16 @@ dependencies = [
|
||||
"ruff>=0.4.2",
|
||||
]
|
||||
readme = "README.md"
|
||||
requires-python = ">= 3.10,<3.13"
|
||||
requires-python = ">= 3.8,<3.12"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
|
||||
[tool.hatch.metadata]
|
||||
allow-direct-references = true
|
||||
# [tool.setuptools.package-data]
|
||||
# mypkg = ["*.yaml"]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/room_control"]
|
||||
[tool.setuptools.data-files]
|
||||
config = ["config/default_config.yaml"]
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = 'single'
|
||||
|
||||
@@ -2,8 +2,6 @@ import json
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
|
||||
from appdaemon.entity import Entity
|
||||
|
||||
from . import console
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -16,7 +14,7 @@ class Button:
|
||||
button_name: str
|
||||
|
||||
def __post_init__(self):
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Button')
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Button', 'DEBUG')
|
||||
topic = f'zigbee2mqtt/{self.button_name}'
|
||||
self.adapi.listen_event(
|
||||
self.handle_button,
|
||||
@@ -27,69 +25,18 @@ class Button:
|
||||
)
|
||||
self.logger.info(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
|
||||
|
||||
def handle_button(self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]):
|
||||
if event_name == 'appd_started':
|
||||
return
|
||||
|
||||
# self.logger.info(f'Button callback: {event_name}, {data}')
|
||||
def handle_button(self, event_name: str, data: Dict[str, Any], kwargs: Dict[str, Any]):
|
||||
try:
|
||||
payload = json.loads(data['payload'])
|
||||
action = payload['action']
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error(f'Error decoding JSON from {data["payload"]}')
|
||||
except KeyError:
|
||||
return
|
||||
else:
|
||||
self.do_action(action)
|
||||
|
||||
def do_action(self, action: str):
|
||||
"""Action can be single, double, or others"""
|
||||
if isinstance(action, str) and action != '':
|
||||
self.logger.info(f'Action: [yellow]{action}[/]')
|
||||
if action == 'single':
|
||||
self.adapi.call_service(
|
||||
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VirtualButton(Button):
|
||||
def __post_init__(self):
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Button')
|
||||
|
||||
friendly_name = self.adapi.name.title().replace('_', ' ') + ' Button'
|
||||
|
||||
kwargs = {'entity_id': self.eid, 'friendly_name': friendly_name}
|
||||
|
||||
if not self.adapi.entity_exists(self.eid):
|
||||
self.adapi.set_state(state=self.adapi.get_now(), **kwargs)
|
||||
self.logger.info(f'Created entity [green]{self.eid}[/]')
|
||||
else:
|
||||
self.adapi.set_state(**kwargs)
|
||||
self.logger.info(f'Set friendly name [green]{self.virtual_entity.friendly_name}[/]')
|
||||
|
||||
self.adapi.listen_event(self.handle_virtual_button, entity_id=self.eid)
|
||||
|
||||
@property
|
||||
def eid(self) -> str:
|
||||
return f'input_button.{self.adapi.name}'
|
||||
|
||||
@property
|
||||
def virtual_entity(self) -> Entity:
|
||||
return self.adapi.get_entity(self.eid)
|
||||
|
||||
def handle_virtual_button(
|
||||
self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]
|
||||
):
|
||||
if (
|
||||
event_name == 'call_service'
|
||||
and data.get('service') == 'press'
|
||||
and (sd := data.get('service_data'))
|
||||
and sd.get('entity_id') == self.eid
|
||||
):
|
||||
try:
|
||||
if data['service_data']['entity_id'] == self.eid:
|
||||
self.logger.info(f'Virtual button press: {event_name}')
|
||||
# self.virtual_entity.set_state(state=datetime.now())
|
||||
self.virtual_entity.set_state(state=self.adapi.get_now())
|
||||
self.do_action('single')
|
||||
except KeyError as e:
|
||||
self.logger.error(f'Bad data from {event_name}: {json.dumps(data, indent=4)}')
|
||||
|
||||
@@ -46,7 +46,7 @@ class RCHighlighter(RegexHighlighter):
|
||||
|
||||
|
||||
def load_rich_config(
|
||||
room: str = None, component: str = None, level: str = None
|
||||
room: str = None, component: str = None, level: str = 'INFO'
|
||||
) -> logging.LoggerAdapter:
|
||||
logger_name = f'Appdaemon.{room}'
|
||||
|
||||
@@ -64,6 +64,7 @@ def load_rich_config(
|
||||
logger_name: {
|
||||
'handlers': ['rich' if component is None else 'rich_component'],
|
||||
'propagate': False,
|
||||
'level': level,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,9 +73,6 @@ def load_rich_config(
|
||||
if component is not None:
|
||||
extra['component'] = component
|
||||
|
||||
if level is not None:
|
||||
RICH_CFG['loggers'][logger_name]['level'] = level
|
||||
|
||||
logging.config.dictConfig(RICH_CFG)
|
||||
logger = logging.getLogger(logger_name)
|
||||
adapter = logging.LoggerAdapter(logger, extra)
|
||||
|
||||
@@ -13,7 +13,7 @@ class Door:
|
||||
entity_id: str
|
||||
|
||||
def __post_init__(self):
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Door')
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Door', 'DEBUG')
|
||||
|
||||
self.adapi.listen_state(
|
||||
lambda *args, **kwargs: self.adapi.activate_all_off(cause='door open'),
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Dict, List, Optional, Self, Union
|
||||
from typing import Annotated, Dict, List, Optional, Self
|
||||
|
||||
import yaml
|
||||
from astral import SunDirection
|
||||
from pydantic import BaseModel, BeforeValidator, Field, field_validator, model_validator
|
||||
from pydantic import BaseModel, BeforeValidator, Field, model_validator
|
||||
from pydantic_core import PydanticCustomError
|
||||
from rich.console import Console, ConsoleOptions, RenderResult
|
||||
from rich.table import Column, Table
|
||||
@@ -47,7 +47,7 @@ class ApplyKwargs(BaseModel):
|
||||
class ControllerStateConfig(BaseModel):
|
||||
time: Optional[str | datetime.time | datetime.datetime] = None
|
||||
elevation: Optional[float] = None
|
||||
direction: Optional[SunDirection] = None
|
||||
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None
|
||||
off_duration: Optional[OffDuration] = None
|
||||
scene: dict[str, State] | str = Field(default_factory=dict)
|
||||
|
||||
@@ -57,35 +57,16 @@ class ControllerStateConfig(BaseModel):
|
||||
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
|
||||
return values
|
||||
|
||||
@field_validator('direction', mode='before')
|
||||
@classmethod
|
||||
def check_sun_dir(cls, val: int | str | SunDirection | None) -> SunDirection:
|
||||
if isinstance(val, str):
|
||||
print(f'Str sun direction: {val}')
|
||||
return str_to_direction(val)
|
||||
elif isinstance(val, int):
|
||||
return SunDirection.SETTING if val < 0 else SunDirection.RISING
|
||||
elif isinstance(val, SunDirection):
|
||||
return val
|
||||
|
||||
def to_apply_kwargs(self, transition: int = None):
|
||||
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
|
||||
|
||||
|
||||
class MotionSensorConfig(BaseModel):
|
||||
sensor: str
|
||||
ref_entity: str
|
||||
|
||||
|
||||
class RoomControllerConfig(BaseModel):
|
||||
states: List[ControllerStateConfig] = Field(default_factory=list)
|
||||
off_duration: Optional[OffDuration] = Field(default_factory=datetime.timedelta)
|
||||
sleep_state: Optional[ControllerStateConfig] = None
|
||||
rich: Optional[str] = None
|
||||
manual_mode: Optional[str] = None
|
||||
button: Optional[Union[str, List[str]]] = None
|
||||
motion: Optional[MotionSensorConfig] = None
|
||||
log_level: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls: Self, yaml_path: Path) -> Self:
|
||||
@@ -114,3 +95,38 @@ class RoomControllerConfig(BaseModel):
|
||||
]
|
||||
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
|
||||
yield table
|
||||
|
||||
def sort_states(self):
|
||||
"""Should only be called after all the times have been resolved"""
|
||||
assert all(
|
||||
isinstance(state.time, datetime.time) for state in self.states
|
||||
), 'Times have not all been resolved yet'
|
||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
||||
|
||||
def current_state(self, now: datetime.time) -> ControllerStateConfig:
|
||||
self.sort_states()
|
||||
for state in self.states:
|
||||
if state.time <= now:
|
||||
return state
|
||||
else:
|
||||
return self.states[0]
|
||||
|
||||
def current_scene(self, now: datetime.time) -> Dict:
|
||||
state = self.current_state(now)
|
||||
return state.scene
|
||||
|
||||
def current_off_duration(self, now: datetime.time) -> datetime.timedelta:
|
||||
state = self.current_state(now)
|
||||
if state.off_duration is None:
|
||||
if self.off_duration is None:
|
||||
raise ValueError('Need an off duration')
|
||||
else:
|
||||
return self.off_duration
|
||||
else:
|
||||
return state.off_duration
|
||||
|
||||
|
||||
class ButtonConfig(BaseModel):
|
||||
app: str
|
||||
button: str | List[str]
|
||||
ref_entity: str
|
||||
|
||||
@@ -32,28 +32,17 @@ class MotionSensor:
|
||||
ref_entity_id: str
|
||||
|
||||
def __post_init__(self):
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Motion')
|
||||
self.logger = console.load_rich_config(self.adapi.name, 'Motion', 'DEBUG')
|
||||
|
||||
assert self.sensor_entity.exists()
|
||||
assert self.ref_entity.exists()
|
||||
|
||||
self.ref_entity.listen_state(self.light_state_callback, immediate=True)
|
||||
self.match_new_state(new=self.sensor_entity.get_state())
|
||||
self.logger.info('Initialized motion sensor')
|
||||
|
||||
def entity_callbacks(self, entity_id: str | None = None) -> dict[str, dict]:
|
||||
callbacks = self.adapi.get_callback_entries()
|
||||
if self.adapi.name in callbacks:
|
||||
return {
|
||||
handle: cb
|
||||
for handle, cb in callbacks[self.adapi.name].items()
|
||||
if cb.get('entity') == entity_id
|
||||
}
|
||||
else:
|
||||
return {}
|
||||
|
||||
def sensor_callbacks(self) -> dict[str, dict]:
|
||||
return self.entity_callbacks(entity_id=self.sensor_entity_id)
|
||||
base_kwargs = dict(
|
||||
entity_id=self.ref_entity_id,
|
||||
immediate=True, # avoids needing to sync the state
|
||||
)
|
||||
self.ref_entity.listen_state(self.callback_light_on, attribute='all', **base_kwargs)
|
||||
self.ref_entity.listen_state(self.callback_light_off, new='off', **base_kwargs)
|
||||
|
||||
@property
|
||||
def sensor_entity(self) -> Entity:
|
||||
@@ -75,50 +64,47 @@ class MotionSensor:
|
||||
def state_mismatch(self) -> bool:
|
||||
return self.sensor_state != self.ref_state
|
||||
|
||||
def light_state_callback(self, entity: str, attribute: str, old: str, new: str, **kwargs: dict):
|
||||
for handle in self.sensor_callbacks():
|
||||
self.adapi.cancel_listen_state(handle)
|
||||
self.match_new_state(new)
|
||||
|
||||
def match_new_state(self, new: Literal['on', 'off']):
|
||||
match new:
|
||||
case 'on':
|
||||
def callback_light_on(self, entity: str, attribute: str, old: str, new: str, kwargs: dict):
|
||||
"""Called when the light turns on"""
|
||||
if new['state'] == 'on':
|
||||
self.logger.debug(f'Detected {entity} turning on')
|
||||
duration = self.adapi.off_duration()
|
||||
self.listen_motion_off(duration)
|
||||
case 'off':
|
||||
|
||||
def callback_light_off(self, entity: str, attribute: str, old: str, new: str, kwargs: dict):
|
||||
"""Called when the light turns off"""
|
||||
self.logger.debug(f'Detected {entity} turning off')
|
||||
self.listen_motion_on()
|
||||
|
||||
def listen_motion_on(self):
|
||||
"""Sets up the motion on callback to activate the room"""
|
||||
# self.cancel_motion_callback()
|
||||
self.sensor_entity.listen_state(
|
||||
self.adapi.listen_state(
|
||||
lambda *args, **kwargs: self.adapi.activate_all_off(cause='motion on'),
|
||||
entity_id=self.sensor_entity_id,
|
||||
new='on',
|
||||
oneshot=True,
|
||||
)
|
||||
self.logger.info(
|
||||
'Waiting for sensor motion on '
|
||||
f'[friendly_name]{self.sensor_entity.friendly_name}[/]'
|
||||
f'Waiting for sensor motion on [friendly_name]{self.sensor_entity.friendly_name}[/]'
|
||||
)
|
||||
if self.sensor_state:
|
||||
self.logger.warning(
|
||||
'Sensor '
|
||||
f'[friendly_name]{self.sensor_entity.friendly_name}[/] is already on',
|
||||
f'Sensor [friendly_name]{self.sensor_entity.friendly_name}[/] is already on',
|
||||
)
|
||||
|
||||
def listen_motion_off(self, duration: timedelta):
|
||||
"""Sets up the motion off callback to deactivate the room"""
|
||||
# self.cancel_motion_callback()
|
||||
self.sensor_entity.listen_state(
|
||||
self.adapi.listen_state(
|
||||
lambda *args, **kwargs: self.adapi.deactivate(cause='motion off'),
|
||||
entity_id=self.sensor_entity_id,
|
||||
new='off',
|
||||
duration=duration.total_seconds(),
|
||||
oneshot=True,
|
||||
)
|
||||
self.logger.debug(
|
||||
'Waiting for sensor '
|
||||
f'[friendly_name]{self.sensor_entity.friendly_name}[/] '
|
||||
f'to be clear for {duration}'
|
||||
f'Waiting for sensor [friendly_name]{self.sensor_entity.friendly_name}[/] to be clear for {duration}'
|
||||
)
|
||||
|
||||
if not self.sensor_state:
|
||||
|
||||
@@ -3,14 +3,14 @@ import logging
|
||||
import logging.config
|
||||
import traceback
|
||||
from functools import wraps
|
||||
from typing import Dict, List, Set
|
||||
from typing import Any, Dict, List, Set
|
||||
|
||||
from appdaemon.entity import Entity
|
||||
from appdaemon.plugins.hass.hassapi import Hass
|
||||
from astral.location import Location
|
||||
|
||||
from . import console
|
||||
from .button import Button, VirtualButton
|
||||
from .button import Button
|
||||
from .door import Door
|
||||
from .model import ControllerStateConfig, RoomControllerConfig
|
||||
from .motion import MotionSensor
|
||||
@@ -56,7 +56,8 @@ class RoomController(Hass):
|
||||
return self.get_entity(f'{self.name}.state', namespace='controller')
|
||||
|
||||
def initialize(self):
|
||||
self.logger = console.load_rich_config(self.name, level=self.args.get('log_level', 'INFO'))
|
||||
self.logger = console.load_rich_config(self.name)
|
||||
self.set_log_level('DEBUG')
|
||||
|
||||
self.refresh_state_times()
|
||||
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||
@@ -81,15 +82,13 @@ class RoomController(Hass):
|
||||
|
||||
if button := self.args.get('button'):
|
||||
if isinstance(button, str):
|
||||
Button(self, button_name=button)
|
||||
VirtualButton(self, button_name=button)
|
||||
self.button = Button(self, button_name=button)
|
||||
elif isinstance(button, list) and all(isinstance(b, str) for b in button):
|
||||
for b in button:
|
||||
Button(self, button_name=b)
|
||||
VirtualButton(self, button_name=button)
|
||||
self.button = [Button(self, button_name=b) for b in button]
|
||||
|
||||
if door := self.args.get('door'):
|
||||
if isinstance(door, str):
|
||||
self.log('door--')
|
||||
self.door = Door(self, entity_id=door)
|
||||
|
||||
if motion := self.args.get('motion'):
|
||||
@@ -97,7 +96,6 @@ class RoomController(Hass):
|
||||
self, sensor_entity_id=motion['sensor'], ref_entity_id=motion['ref_entity']
|
||||
)
|
||||
|
||||
state: ControllerStateConfig
|
||||
for state in sorted(self._room_config.states, key=lambda s: s.time, reverse=True):
|
||||
if isinstance(state.time, datetime.datetime):
|
||||
t = state.time.time()
|
||||
@@ -153,7 +151,7 @@ class RoomController(Hass):
|
||||
|
||||
try:
|
||||
self.run_at(
|
||||
callback=lambda **kwargs: self.set_controller_scene(kwargs['state']),
|
||||
callback=lambda cb_args: self.set_controller_scene(cb_args['state']),
|
||||
start=state.time.strftime('%H:%M:%S'),
|
||||
state=state,
|
||||
)
|
||||
@@ -187,9 +185,8 @@ class RoomController(Hass):
|
||||
try:
|
||||
attrs = self.state_entity.get_state('all')['attributes']
|
||||
state = ControllerStateConfig.model_validate(attrs)
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
state = ControllerStateConfig()
|
||||
logger.exception(e)
|
||||
finally:
|
||||
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
|
||||
return state
|
||||
@@ -197,7 +194,7 @@ class RoomController(Hass):
|
||||
def activate(self, **kwargs):
|
||||
self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
|
||||
|
||||
def _service_activate(self, namespace: str, domain: str, service: str, **kwargs):
|
||||
def _service_activate(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
|
||||
# self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
|
||||
state = self.current_state()
|
||||
if isinstance(state.scene, str):
|
||||
@@ -212,7 +209,9 @@ class RoomController(Hass):
|
||||
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
|
||||
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
|
||||
|
||||
def _service_activate_any_on(self, namespace: str, domain: str, service: str, **kwargs):
|
||||
def _service_activate_any_on(
|
||||
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
|
||||
):
|
||||
if self.any_on() and not self.manual_mode():
|
||||
self.activate(**kwargs)
|
||||
|
||||
@@ -220,21 +219,25 @@ class RoomController(Hass):
|
||||
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
|
||||
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
|
||||
|
||||
def _service_activate_all_off(self, namespace: str, domain: str, service: str, **kwargs):
|
||||
def _service_activate_all_off(
|
||||
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
|
||||
):
|
||||
if self.all_off() and not self.manual_mode():
|
||||
self.activate(**kwargs)
|
||||
|
||||
def deactivate(self, **kwargs):
|
||||
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
|
||||
|
||||
def _service_deactivate(self, namespace: str, domain: str, service: str, **kwargs):
|
||||
def _service_deactivate(
|
||||
self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]
|
||||
):
|
||||
for e in self.app_entities:
|
||||
self.turn_off(e)
|
||||
|
||||
def toggle(self, **kwargs):
|
||||
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
|
||||
|
||||
def _service_toggle(self, namespace: str, domain: str, service: str, **kwargs):
|
||||
def _service_toggle(self, namespace: str, domain: str, service: str, kwargs: Dict[str, Any]):
|
||||
if self.any_on():
|
||||
self.deactivate(**kwargs)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user