40 Commits

Author SHA1 Message Date
John Lancaster
72b4b7d72e pyproject updates 2025-11-16 09:26:56 -06:00
John Lancaster
b1638be692 added entity callbacks 2024-09-12 21:15:30 -05:00
John Lancaster
8b7294b445 broke out virtual button 2024-09-01 21:07:29 -05:00
John Lancaster
e4dada011c added virtual buttons 2024-09-01 20:38:47 -05:00
John Lancaster
e3186f1b5e removed unused import 2024-08-27 20:45:48 -05:00
John Lancaster
2180e544c0 another fix 2024-08-27 20:43:01 -05:00
John Lancaster
82f43cd029 simplifications 2024-08-27 19:24:38 -05:00
John Lancaster
1b3fc4afb7 changes for use_dictionary_unpacking 2024-08-27 00:41:25 -05:00
John Lancaster
0f58ac4cc6 fixed reconstruction of sun direction from state with a new field validator 2024-07-31 23:26:09 -05:00
John Lancaster
b278a3cda1 changed default of load_rich_config 2024-07-27 20:52:08 -05:00
John Lancaster
e4fcd757ce added log_level config per app 2024-07-27 20:48:57 -05:00
John Lancaster
7cde3c75d8 changed to use services 2024-07-27 17:37:33 -05:00
John Lancaster
463a68a27a rich logging config in yaml file in package data 2024-05-07 23:38:42 -05:00
John Lancaster
7b67d062a0 removed rye stuff 2024-05-06 23:36:38 -05:00
John Lancaster
124ff5c227 fixed some import issues 2024-05-02 22:47:18 -05:00
John Lancaster
6c313a64bf formatting 2024-05-02 22:29:57 -05:00
John Lancaster
069e0c0f42 packaged with rye for export_mode 2024-05-02 22:27:46 -05:00
John Lancaster
d42afb1829 obsolete import 2024-04-29 23:42:33 -05:00
John Lancaster
0c97afb0da removed obsolete imports 2024-04-29 23:42:03 -05:00
John Lancaster
fe0b3057ed greatly improved from implementing the "logging endgame" notebook 2024-04-29 23:39:58 -05:00
John Lancaster
866b4a1cfb changed log format 2024-04-28 11:27:34 -05:00
John Lancaster
68edfde755 improved logging 2024-04-27 13:41:29 -05:00
John Lancaster
6250b6b20c simplified rich logging with dictconfig 2024-04-27 10:24:45 -05:00
Jim Lancaster
dc7151549b added manual mode 2024-04-13 11:34:22 -05:00
John Lancaster
02726b0766 ruff formatting 2024-04-02 22:32:59 -05:00
John Lancaster
e421046a04 narrowed console to 100 characters 2024-04-02 22:30:52 -05:00
John Lancaster
7f8d0311ab pydantic work 2024-04-02 22:30:23 -05:00
John Lancaster
c0a22b63f8 obsoleted previous dataclasses 2024-03-10 19:12:40 -05:00
John Lancaster
9845368159 expanded pydantic model 2024-03-10 18:17:14 -05:00
John Lancaster
541475d68b more work 2024-03-10 17:33:22 -05:00
John Lancaster
e90ad5a071 more rich work 2024-03-10 15:44:42 -05:00
John Lancaster
b8e5a65347 more rich formatting 2024-03-10 13:23:28 -05:00
John Lancaster
4e2557e714 seperate rich logging system 2024-03-10 12:31:50 -05:00
John Lancaster
1fa0df360c adding the unmarkupformatter to all handlers of Appdaemon 2024-03-09 20:11:54 -06:00
John Lancaster
96be16e0bb added the UnMarkupFormatter 2024-03-09 19:56:34 -06:00
John Lancaster
17670b983e button work 2024-03-06 20:17:38 -06:00
John Lancaster
0939c21554 added deinit logging 2024-03-03 15:25:39 -06:00
John Lancaster
215d47ea00 added some rich logging 2024-03-03 14:54:11 -06:00
John Lancaster
d8a9d3d83a added toggle_activate 2024-01-28 08:47:32 -06:00
John Lancaster
9bb4df069e dataclasses and bugfixes 2024-01-23 21:03:30 -06:00
13 changed files with 951 additions and 427 deletions

View File

@@ -1,54 +0,0 @@
import asyncio
import json
from appdaemon.plugins.mqtt.mqttapi import Mqtt
from room_control import RoomController
class ButtonController(Mqtt):
def initialize(self):
task = self.get_app(self.args['app'])
self.app: RoomController = asyncio.get_event_loop().run_until_complete(task)
self.setup_buttons(self.args['button'])
# self.log(f'Done')
def setup_buttons(self, buttons):
if isinstance(buttons, list):
for button in buttons:
self.setup_button(button)
else:
self.setup_button(buttons)
def setup_button(self, name: str):
topic = f'zigbee2mqtt/{name}'
self.mqtt_subscribe(topic, namespace='mqtt')
self.listen_event(self.handle_button, "MQTT_MESSAGE", topic=topic, namespace='mqtt', button=name)
self.log(f'"{topic}" controls app {self.app.name}')
async def handle_button(self, event_name, data, kwargs):
topic = data['topic']
self.log(f'Button event for: {topic}')
try:
payload = json.loads(data['payload'])
action = payload['action']
button = kwargs['button']
except json.JSONDecodeError:
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
except KeyError as e:
return
else:
self.log(f'{button}: {action}')
await self.handle_action(action)
async def handle_action(self, action: str):
if action == '':
return
elif action == 'single':
cause = 'button single click'
state = await self.get_state(entity_id=self.args['ref_entity'])
if state == 'on':
self.app.deactivate(cause=cause)
else:
await self.app.activate(cause=cause)
else:
pass

11
door.py
View File

@@ -1,11 +0,0 @@
from appdaemon.plugins.hass.hassapi import Hass
from room_control import RoomController
class Door(Hass):
async def initialize(self):
await self.listen_state(self.door_open, entity_id=self.args['door'], new='on')
async def door_open(self, entity, attribute, old, new, kwargs):
app: RoomController = await self.get_app(self.args['app'])
await app.activate_all_off()

135
motion.py
View File

@@ -1,135 +0,0 @@
import asyncio
from datetime import timedelta
import re
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from room_control import RoomController
class CustomEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
try:
# Try to get the current event loop
loop = super().get_event_loop()
except RuntimeError as ex:
if "There is no current event loop" in str(ex):
# If there's no current loop, create a new one and set it
loop = self.new_event_loop()
self.set_event_loop(loop)
else:
raise
return loop
# Set the custom event loop policy
asyncio.set_event_loop_policy(CustomEventLoopPolicy())
class Motion(Hass):
@property
def sensor(self) -> Entity:
return self.get_entity(self.args['sensor'])
@property
def sensor_state(self) -> bool:
return self.sensor.state == 'on'
@property
def ref_entity(self) -> Entity:
return self.get_entity(self.args['ref_entity'])
@property
def ref_entity_state(self) -> bool:
return self.ref_entity.get_state() == 'on'
def initialize(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
self.app: RoomController = loop.run_until_complete(self.get_app(self.args['app']))
self.log(f'Connected to app {self.app.name}')
self.listen_state(self.callback_light_on, self.ref_entity.entity_id, new='on')
self.listen_state(self.callback_light_off, self.ref_entity.entity_id, new='off')
loop.run_until_complete(self.sync_state())
async def sync_state(self):
"""Synchronizes the callbacks with the state of the light.
Essentially mimics the `state_change` callback based on the current state of the light.
"""
if self.ref_entity_state:
await self.callback_light_on()
else:
await self.callback_light_off()
async def listen_motion_on(self):
"""Sets up the motion on callback to activate the room
"""
self.log(f'Waiting for motion on {self.sensor.friendly_name}')
self.motion_on_handle = await self.listen_state(
callback=self.app.activate_all_off,
entity_id=self.sensor.entity_id,
new='on',
oneshot=True,
cause='motion on'
)
async def listen_motion_off(self, duration: timedelta):
"""Sets up the motion off callback to deactivate the room
"""
self.log(f'Waiting for motion to stop on {self.sensor.friendly_name}')
self.motion_off_handle = await self.listen_state(
callback=self.app.deactivate,
entity_id=self.sensor.entity_id,
new='off',
duration=duration.total_seconds(),
oneshot=True,
cause='motion off'
)
async def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns on
"""
self.log('Light on callback')
await self.cancel_motion_callback(new='on')
await self.listen_motion_off(await self.app.off_duration())
async def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns off
"""
self.log('Light off callback')
await self.cancel_motion_callback(new='off')
await self.listen_motion_on()
async def get_app_callbacks(self, name: str = None):
"""Gets all the callbacks associated with the app
"""
name = name or self.name
callbacks = {
handle: info
for app_name, callbacks in (await self.get_callback_entries()).items()
for handle, info in callbacks.items()
if app_name == name
}
return callbacks
async def get_sensor_callbacks(self):
return {
handle: info
for handle, info in (await self.get_app_callbacks()).items()
if info['entity'] == self.sensor.entity_id
}
async def cancel_motion_callback(self, new: str):
callbacks = await self.get_sensor_callbacks()
# self.log(f'Found {len(callbacks)}')
for handle, info in callbacks.items():
entity = info["entity"]
new_match = re.match('new=(?P<new>.*?)\s', info['kwargs'])
# self.log(f'{handle}: {info["entity"]}: {info["kwargs"]}')
if new_match is not None and new_match.group("new") == new:
await self.cancel_listen_state(handle)
self.log(f'cancelled: {await self.friendly_name(entity)}: {new}')

28
pyproject.toml Normal file
View File

@@ -0,0 +1,28 @@
[project]
name = "room-control"
version = "0.1.0"
description = "Add your description here"
authors = [
{ name = "John Lancaster", email = "32917998+jsl12@users.noreply.github.com" }
]
dependencies = [
"appdaemon>=4.4.2",
"rich>=13.7.1",
"pydantic>=2.7.1",
"ruff>=0.4.2",
]
readme = "README.md"
requires-python = ">= 3.10,<3.13"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["src/room_control"]
[tool.ruff.format]
quote-style = 'single'

View File

@@ -1,227 +0,0 @@
import asyncio
from copy import deepcopy
from datetime import time, timedelta
from typing import List
import appdaemon.utils as utils
import astral
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from appdaemon.plugins.mqtt.mqttapi import Mqtt
class RoomController(Hass, Mqtt):
"""Class for linking an light with a motion sensor.
- Separate the turning on and turning off functions.
- Use the state change of the light to set up the event for changing to the other state
- `handle_on`
- `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time.
"""
async def initialize(self):
self.app_entities = await self.gather_app_entities()
self.log(f'entities: {self.app_entities}')
await self.refresh_state_times()
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
# if (ha_button := self.args.get('ha_button')):
# self.log(f'Setting up input button: {self.friendly_name(ha_button)}')
# self.listen_state(callback=self.activate_any_on, entity_id=ha_button)
async def gather_app_entities(self) -> List[str]:
"""Returns a list of all the entities involved in any of the states
"""
async def async_generator():
for settings in deepcopy(self.args['states']):
if (scene := settings.get('scene')):
if isinstance(scene, str):
assert scene.startswith('scene.'), f"Scene definition must start with 'scene.' for app {self.name}"
entity: Entity = self.get_entity(scene)
entity_state = await entity.get_state('all')
attributes = entity_state['attributes']
for entity in attributes['entity_id']:
yield entity
else:
for key in scene.keys():
yield key
else:
yield self.args['entity']
entities = [e async for e in async_generator()]
return set(entities)
async def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states.
Parsed states have an absolute time for a certain day.
"""
# re-parse the state strings into times for the current day
self.states = await self.parse_states()
# schedule the transitions
for state in self.states:
dt = str(state['time'])[:8]
self.log(f'Scheduling transition at: {dt}')
try:
await self.run_at(callback=self.activate_any_on, start=dt)
except ValueError:
# happens when the callback time is in the past
pass
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
async def parse_states(self):
async def gen():
for state in deepcopy(self.args['states']):
if (time := state.get('time')):
state['time'] = await self.parse_time(time)
elif isinstance((elevation := state.get('elevation')), (int, float)):
assert 'direction' in state, f'State needs a direction if it has an elevation'
if state['direction'] == 'rising':
dir = astral.SunDirection.RISING
elif state['direction'] == 'setting':
dir = astral.SunDirection.SETTING
else:
raise ValueError(f'Invalid sun direction: {state["direction"]}')
state['time'] = self.AD.sched.location.time_at_elevation(
elevation=elevation, direction=dir
).time()
else:
raise ValueError(f'Missing time')
yield state
states = [s async for s in gen()]
states = sorted(states, key=lambda s: s['time'])
return states
async def current_state(self, time: time = None):
if (await self.sleep_bool()):
if (state := self.args.get('sleep_state')):
return state
else:
return {}
else:
now = await self.get_now()
self.log(f'Getting state for datetime: {now}')
time = time or (await self.get_now()).time()
for state in self.states[::-1]:
if state['time'] <= time:
self.log(f'Selected state from {state["time"]}')
return state
else:
return self.states[-1]
async def current_scene(self, time: time = None):
if (state := (await self.current_state(time=time))) is not None:
return state['scene']
@property
def all_off(self) -> bool:
""""All off" is the logic opposite of "any on"
Returns:
bool: Whether all the lights associated with the app are off
"""
return all(self.get_state(entity) != 'on' for entity in self.app_entities)
@property
def any_on(self) -> bool:
""""Any on" is the logic opposite of "all off"
Returns:
bool: Whether any of the lights associated with the app are on
"""
return any(self.get_state(entity) == 'on' for entity in self.app_entities)
async def sleep_bool(self) -> bool:
if (sleep_var := self.args.get('sleep')):
return (await self.get_state(sleep_var)) == 'on'
else:
return False
# @sleep_bool.setter
# def sleep_bool(self, val) -> bool:
# if (sleep_var := self.args.get('sleep')):
# if isinstance(val, str):
# self.set_state(sleep_var, state=val)
# elif isinstance(val, bool):
# self.set_state(sleep_var, state='on' if val else 'off')
# else:
# raise ValueError('Sleep variable is undefined')
async def off_duration(self) -> timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating
Priority:
- Value in scene definition
- Default value
- Normal - value in app definition
- Sleep - 0
"""
current_state = await self.current_state()
duration_str = current_state.get(
'off_duration',
self.args.get('off_duration', '00:00:00')
)
try:
hours, minutes, seconds = map(int, duration_str.split(':'))
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
except Exception:
return timedelta()
async def activate(self, *args, cause: str = 'unknown', **kwargs):
self.log(f'Activating: {cause}')
scene = await self.current_scene()
if isinstance(scene, str):
self.turn_on(scene)
self.log(f'Turned on scene: {scene}')
elif isinstance(scene, dict):
# makes setting the state to 'on' optional in the yaml definition
for entity, settings in scene.items():
if 'state' not in settings:
scene[entity]['state'] = 'on'
self.call_service('scene/apply', entities=scene, transition=0)
self.log(f'Applied scene: {scene}')
elif scene is None:
self.log(f'No scene, ignoring...')
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
# self.callback_light_off()
else:
self.log(f'ERROR: unknown scene: {scene}')
async def activate_all_off(self, *args, **kwargs):
"""Activate if all of the entities are off
"""
if self.all_off:
self.log(f'Activate all off kwargs: {kwargs}')
await self.activate(*args, **kwargs)
else:
self.log(f'Skipped activating - everything is not off')
async def activate_any_on(self, *args, **kwargs):
"""Activate if any of the entities are on
"""
if self.any_on:
await self.activate(*args, **kwargs)
else:
self.log(f'Skipped activating - everything is off')
def deactivate(self, *args, cause: str = 'unknown', **kwargs):
self.log(f'Deactivating: {cause}')
for entity in self.app_entities:
self.turn_off(entity)
self.log(f'Turned off {entity}')

View File

@@ -0,0 +1,26 @@
version: 1
disable_existing_loggers: false
formatters:
rich:
style: "{"
format: "[room]{room}[/] {message}"
# format: "{message}"
datefmt: '%H:%M:%S.%f'
rich_component:
style: "{"
format: "[room]{room}[/] [component]{component}[/] {message}"
# format: "{message}"
datefmt: '%H:%M:%S.%f'
handlers:
rich:
formatter: rich
'()': 'rich.logging.RichHandler'
markup: True
show_path: false
omit_repeated_times: false
rich_component:
formatter: rich_component
'()': 'rich.logging.RichHandler'
markup: True
show_path: false
omit_repeated_times: false

View File

@@ -0,0 +1,6 @@
from .button import Button
from .door import Door
from .motion import MotionSensor
from .room_control import RoomController
__all__ = ['RoomController', 'MotionSensor', 'Button', 'Door']

View File

@@ -0,0 +1,95 @@
import json
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict
from appdaemon.entity import Entity
from . import console
if TYPE_CHECKING:
from .room_control import RoomController
@dataclass
class Button:
adapi: 'RoomController'
button_name: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Button')
topic = f'zigbee2mqtt/{self.button_name}'
self.adapi.listen_event(
self.handle_button,
'MQTT_MESSAGE',
topic=topic,
namespace='mqtt',
button=self.button_name,
)
self.logger.info(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
def handle_button(self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]):
if event_name == 'appd_started':
return
# self.logger.info(f'Button callback: {event_name}, {data}')
try:
payload = json.loads(data['payload'])
action = payload['action']
except json.JSONDecodeError:
self.logger.error(f'Error decoding JSON from {data["payload"]}')
else:
self.do_action(action)
def do_action(self, action: str):
"""Action can be single, double, or others"""
if isinstance(action, str) and action != '':
self.logger.info(f'Action: [yellow]{action}[/]')
if action == 'single':
self.adapi.call_service(
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
)
@dataclass
class VirtualButton(Button):
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Button')
friendly_name = self.adapi.name.title().replace('_', ' ') + ' Button'
kwargs = {'entity_id': self.eid, 'friendly_name': friendly_name}
if not self.adapi.entity_exists(self.eid):
self.adapi.set_state(state=self.adapi.get_now(), **kwargs)
self.logger.info(f'Created entity [green]{self.eid}[/]')
else:
self.adapi.set_state(**kwargs)
self.logger.info(f'Set friendly name [green]{self.virtual_entity.friendly_name}[/]')
self.adapi.listen_event(self.handle_virtual_button, entity_id=self.eid)
@property
def eid(self) -> str:
return f'input_button.{self.adapi.name}'
@property
def virtual_entity(self) -> Entity:
return self.adapi.get_entity(self.eid)
def handle_virtual_button(
self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]
):
if (
event_name == 'call_service'
and data.get('service') == 'press'
and (sd := data.get('service_data'))
and sd.get('entity_id') == self.eid
):
try:
if data['service_data']['entity_id'] == self.eid:
self.logger.info(f'Virtual button press: {event_name}')
# self.virtual_entity.set_state(state=datetime.now())
self.virtual_entity.set_state(state=self.adapi.get_now())
self.do_action('single')
except KeyError as e:
self.logger.error(f'Bad data from {event_name}: {json.dumps(data, indent=4)}')

239
src/room_control/console.py Normal file
View File

@@ -0,0 +1,239 @@
import json
import logging
import logging.config
import re
from abc import ABC
from dataclasses import asdict
from importlib.resources import files
import yaml
from rich.console import Console
from rich.highlighter import RegexHighlighter
from rich.theme import Theme
console = Console(
width=100,
theme=Theme(
{
'log.time': 'none',
# 'logging.level.info': 'none',
'room': 'italic bright_cyan',
'component': 'dark_violet',
'friendly_name': 'yellow',
'light': 'light_slate_blue',
'sensor': 'green',
'time': 'yellow',
'z2m': 'bright_black',
'topic': 'chartreuse2',
'true': 'green',
'false': 'red',
}
),
log_time_format='%Y-%m-%d %I:%M:%S %p',
# highlighter=RCHighlighter(),
)
class RCHighlighter(RegexHighlighter):
highlights = [
r'(?P<light>(light|switch)\.\w+)',
r'(?P<time>\d+:\d+:\d+)',
r'(?P<z2m>zigbee2mqtt/)',
r'(?P<sensor>binary_sensor\.\w+)',
# r"'state': '(?P<on>on)|(?P<off>off)'"
r'(?P<true>True)|(?P<false>False)',
]
def load_rich_config(
room: str = None, component: str = None, level: str = None
) -> logging.LoggerAdapter:
logger_name = f'Appdaemon.{room}'
if component is not None:
logger_name += f'.{component}'
with files('room_control').joinpath('.rich_logging.yaml').open('r') as f:
RICH_CFG = yaml.safe_load(f)
RICH_CFG['handlers']['rich']['console'] = console
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
RICH_CFG['handlers']['rich_component']['console'] = console
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
RICH_CFG['loggers'] = {
logger_name: {
'handlers': ['rich' if component is None else 'rich_component'],
'propagate': False,
}
}
extra = {'room': room}
if component is not None:
extra['component'] = component
if level is not None:
RICH_CFG['loggers'][logger_name]['level'] = level
logging.config.dictConfig(RICH_CFG)
logger = logging.getLogger(logger_name)
adapter = logging.LoggerAdapter(logger, extra)
return adapter
RICH_HANDLER_CFG = {
'()': 'rich.logging.RichHandler',
'markup': True,
'show_path': False,
# 'show_time': False,
'omit_repeated_times': False,
'console': console,
'highlighter': RCHighlighter(),
}
class ContextSettingFilter(logging.Filter, ABC):
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
for name, val in asdict(self).items():
if val is not None:
setattr(record, name, val)
return record
# @dataclass
# class RoomControllerFilter(ContextSettingFilter):
# room: str
# component: Optional[str] = None
class RoomFilter(logging.Filter):
"""Used to filter out messages that have a component field because they will have already been printed by their respective logger."""
def filter(self, record: logging.LogRecord) -> bool:
return not hasattr(record, 'component')
# class RoomControllerFormatter(logging.Formatter):
# def format(self, record: logging.LogRecord):
# return super().format(record)
class UnMarkupFilter(logging.Filter):
md_regex = re.compile(r'(?P<open>\[.*?\])(?P<text>.*?)(?P<close>\[\/\])')
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
record.msg = self.md_regex.sub(r'\g<text>', record.msg)
return record
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return json.dumps(record.__dict__)
def room_logging_config(name: str):
return {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'room': {'()': 'room_control.console.RoomFilter'},
'unmarkup': {'()': 'room_control.console.UnMarkupFilter'},
},
'formatters': {
'rich_room': {
'style': '{',
'format': '[room]{room}[/] {message}',
'datefmt': '%H:%M:%S.%f',
},
'file': {
'style': '{',
'format': '{asctime}.{msecs:03.0f} {levelname:8} {name}: {message}',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
'json': {'()': 'room_control.console.JSONFormatter'},
},
'handlers': {
'rich_room': {
'formatter': 'rich_room',
'filters': ['room'],
**RICH_HANDLER_CFG,
},
'file': {
'filters': ['unmarkup'],
'formatter': 'file',
'class': 'logging.handlers.RotatingFileHandler',
# 'class': 'logging.FileHandler',
'filename': f'/logs/{name}.log',
'maxBytes': 1000000,
'backupCount': 3,
},
'json': {
'filters': ['unmarkup'],
'formatter': 'json',
'filename': f'/logs/{name}.jsonl',
'class': 'logging.handlers.RotatingFileHandler',
'maxBytes': 1000000,
'backupCount': 3,
},
},
'loggers': {
f'AppDaemon.{name}': {
'level': 'INFO',
'propagate': False,
'handlers': [
'rich_room',
'file',
'json',
],
},
},
}
# def component_logging_config(parent_room: str, component: str):
# logger_name = f'AppDaemon.{parent_room}.{component}'
# cfg = load_rich_config()
# LOG_CFG = {
# 'version': 1,
# 'disable_existing_loggers': False,
# 'formatters': {
# 'rich_component': {
# 'style': '{',
# 'format': '[room]{room}[/] [component]{component}[/] {message}',
# 'datefmt': '%H:%M:%S.%f',
# },
# },
# 'handlers': {
# 'rich_component': {
# 'formatter': 'rich_component',
# **RICH_HANDLER_CFG,
# },
# },
# 'loggers': {
# logger_name: {
# # 'level': 'INFO',
# 'propagate': True,
# 'handlers': ['rich_component'],
# }
# },
# }
# return LOG_CFG
# def setup_component_logging(self) -> logging.Logger:
# """Creates a logger for a subcomponent with a RichHandler"""
# component = type(self).__name__
# parent = self.args['app']
# cfg_dict = component_logging_config(parent_room=parent, component=component)
# logger_name = next(iter(cfg_dict['loggers']))
# try:
# logging.config.dictConfig(cfg_dict)
# except Exception:
# console.print_exception()
# else:
# logger = logging.getLogger(logger_name)
# logger = logging.LoggerAdapter(logger, {'room': parent, 'component': component})
# return logger

23
src/room_control/door.py Normal file
View File

@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING
from . import console
if TYPE_CHECKING:
from .room_control import RoomController
@dataclass
class Door:
adapi: 'RoomController'
entity_id: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Door')
self.adapi.listen_state(
lambda *args, **kwargs: self.adapi.activate_all_off(cause='door open'),
entity_id=self.entity_id,
new='on',
)
self.logger.debug(f'Initialized door for [room]{self.adapi.name}[/]')

116
src/room_control/model.py Normal file
View File

@@ -0,0 +1,116 @@
import datetime
from pathlib import Path
from typing import Annotated, Dict, List, Optional, Self, Union
import yaml
from astral import SunDirection
from pydantic import BaseModel, BeforeValidator, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from rich.console import Console, ConsoleOptions, RenderResult
from rich.table import Column, Table
def str_to_timedelta(input_str: str) -> datetime.timedelta:
try:
hours, minutes, seconds = map(int, input_str.split(':'))
return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
except Exception:
return datetime.timedelta()
def str_to_direction(input_str: str) -> SunDirection:
try:
return getattr(SunDirection, input_str.upper())
except AttributeError:
raise PydanticCustomError(
'invalid_dir', 'Invalid sun direction: {dir}', dict(dir=input_str)
)
OffDuration = Annotated[datetime.timedelta, BeforeValidator(str_to_timedelta)]
class State(BaseModel):
state: bool = True
brightness: Optional[int] = Field(default=None, ge=1, le=255)
color_temp: Optional[int] = Field(default=None, ge=200, le=650)
rgb_color: Optional[list[int]] = Field(default=None, min_length=3, max_length=3)
class ApplyKwargs(BaseModel):
"""Arguments to call with the 'scene/apply' service"""
entities: Dict[str, State]
transition: Optional[int] = None
class ControllerStateConfig(BaseModel):
time: Optional[str | datetime.time | datetime.datetime] = None
elevation: Optional[float] = None
direction: Optional[SunDirection] = None
off_duration: Optional[OffDuration] = None
scene: dict[str, State] | str = Field(default_factory=dict)
@model_validator(mode='before')
def check_args(cls, values):
if values.get('elevation') is not None and values.get('direction') is None:
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
return values
@field_validator('direction', mode='before')
@classmethod
def check_sun_dir(cls, val: int | str | SunDirection | None) -> SunDirection:
if isinstance(val, str):
print(f'Str sun direction: {val}')
return str_to_direction(val)
elif isinstance(val, int):
return SunDirection.SETTING if val < 0 else SunDirection.RISING
elif isinstance(val, SunDirection):
return val
def to_apply_kwargs(self, transition: int = None):
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
class MotionSensorConfig(BaseModel):
sensor: str
ref_entity: str
class RoomControllerConfig(BaseModel):
states: List[ControllerStateConfig] = Field(default_factory=list)
off_duration: Optional[OffDuration] = Field(default_factory=datetime.timedelta)
sleep_state: Optional[ControllerStateConfig] = None
rich: Optional[str] = None
manual_mode: Optional[str] = None
button: Optional[Union[str, List[str]]] = None
motion: Optional[MotionSensorConfig] = None
log_level: Optional[str] = None
@classmethod
def from_yaml(cls: Self, yaml_path: Path) -> Self:
yaml_path = Path(yaml_path)
with yaml_path.open('r') as f:
for appname, app_cfg in yaml.load(f, Loader=yaml.SafeLoader).items():
if app_cfg['class'] == 'RoomController':
return cls.model_validate(app_cfg)
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
table = Table(
Column('Time', width=15),
Column('Scene'),
highlight=True,
padding=1,
collapse_padding=True,
)
for state in self.states:
if isinstance(state, str):
pass
elif isinstance(state, dict):
scene_json = state.to_apply_kwargs()
lines = [
f'{name:20}{state["state"]} Brightness: {state.get("brightness", ""):<4} Temp: {state.get("color_temp", "")}'
for name, state in scene_json['entities'].items()
]
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
yield table

127
src/room_control/motion.py Normal file
View File

@@ -0,0 +1,127 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING, Literal, Optional
from appdaemon.entity import Entity
from pydantic import BaseModel
from . import console
if TYPE_CHECKING:
from room_control import RoomController
class CallbackEntry(BaseModel):
entity: str
event: Optional[str] = None
type: Literal['state', 'event']
kwargs: str
function: str
name: str
pin_app: bool
pin_thread: int
Callbacks = dict[str, dict[str, CallbackEntry]]
@dataclass
class MotionSensor:
adapi: 'RoomController'
sensor_entity_id: str
ref_entity_id: str
def __post_init__(self):
self.logger = console.load_rich_config(self.adapi.name, 'Motion')
assert self.sensor_entity.exists()
assert self.ref_entity.exists()
self.ref_entity.listen_state(self.light_state_callback, immediate=True)
self.match_new_state(new=self.sensor_entity.get_state())
self.logger.info('Initialized motion sensor')
def entity_callbacks(self, entity_id: str | None = None) -> dict[str, dict]:
callbacks = self.adapi.get_callback_entries()
if self.adapi.name in callbacks:
return {
handle: cb
for handle, cb in callbacks[self.adapi.name].items()
if cb.get('entity') == entity_id
}
else:
return {}
def sensor_callbacks(self) -> dict[str, dict]:
return self.entity_callbacks(entity_id=self.sensor_entity_id)
@property
def sensor_entity(self) -> Entity:
return self.adapi.get_entity(self.sensor_entity_id)
@property
def sensor_state(self) -> bool:
return self.sensor_entity.get_state() == 'on'
@property
def ref_entity(self) -> Entity:
return self.adapi.get_entity(self.ref_entity_id)
@property
def ref_state(self) -> bool:
return self.ref_entity.get_state() == 'on'
@property
def state_mismatch(self) -> bool:
return self.sensor_state != self.ref_state
def light_state_callback(self, entity: str, attribute: str, old: str, new: str, **kwargs: dict):
for handle in self.sensor_callbacks():
self.adapi.cancel_listen_state(handle)
self.match_new_state(new)
def match_new_state(self, new: Literal['on', 'off']):
match new:
case 'on':
duration = self.adapi.off_duration()
self.listen_motion_off(duration)
case 'off':
self.listen_motion_on()
def listen_motion_on(self):
"""Sets up the motion on callback to activate the room"""
# self.cancel_motion_callback()
self.sensor_entity.listen_state(
lambda *args, **kwargs: self.adapi.activate_all_off(cause='motion on'),
new='on',
oneshot=True,
)
self.logger.info(
'Waiting for sensor motion on '
f'[friendly_name]{self.sensor_entity.friendly_name}[/]'
)
if self.sensor_state:
self.logger.warning(
'Sensor '
f'[friendly_name]{self.sensor_entity.friendly_name}[/] is already on',
)
def listen_motion_off(self, duration: timedelta):
"""Sets up the motion off callback to deactivate the room"""
# self.cancel_motion_callback()
self.sensor_entity.listen_state(
lambda *args, **kwargs: self.adapi.deactivate(cause='motion off'),
new='off',
duration=duration.total_seconds(),
oneshot=True,
)
self.logger.debug(
'Waiting for sensor '
f'[friendly_name]{self.sensor_entity.friendly_name}[/] '
f'to be clear for {duration}'
)
if not self.sensor_state:
self.logger.warning(
f'Sensor [friendly_name]{self.sensor_entity.friendly_name}[/] is currently off',
)

291
src/room_control/room_control.py Executable file
View File

@@ -0,0 +1,291 @@
import datetime
import logging
import logging.config
import traceback
from functools import wraps
from typing import Dict, List, Set
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from astral.location import Location
from . import console
from .button import Button, VirtualButton
from .door import Door
from .model import ControllerStateConfig, RoomControllerConfig
from .motion import MotionSensor
logger = logging.getLogger(__name__)
class RoomController(Hass):
"""Class for linking room's lights with a motion sensor.
- Separate the turning on and turning off functions.
- Use the state change of the light to set up the event for changing to the other state
- `handle_on`
- `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time.
## Services
- <name>/activate
- <name>/activate_all_off
- <name>/deactivate
- <name>/toggle
"""
@property
def states(self) -> List[ControllerStateConfig]:
return self._room_config.states
@states.setter
def states(self, new: List[ControllerStateConfig]):
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
self._room_config.states = new
@property
@wraps(Location.time_at_elevation)
def time_at_elevation(self):
return self.AD.sched.location.time_at_elevation
@property
def state_entity(self) -> Entity:
return self.get_entity(f'{self.name}.state', namespace='controller')
def initialize(self):
self.logger = console.load_rich_config(self.name, level=self.args.get('log_level', 'INFO'))
self.refresh_state_times()
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
self.register_service(
f'{self.name}/activate', self._service_activate, namespace='controller'
)
self.register_service(
f'{self.name}/activate_all_off', self._service_activate_all_off, namespace='controller'
)
self.register_service(
f'{self.name}/activate_any_on', self._service_activate_any_on, namespace='controller'
)
self.register_service(
f'{self.name}/deactivate', self._service_deactivate, namespace='controller'
)
self.register_service(f'{self.name}/toggle', self._service_toggle, namespace='controller')
# This needs to come after this first call of refresh_state_times
self.app_entities = self.get_app_entities()
self.log(f'entities: {self.app_entities}', level='DEBUG')
if button := self.args.get('button'):
if isinstance(button, str):
Button(self, button_name=button)
VirtualButton(self, button_name=button)
elif isinstance(button, list) and all(isinstance(b, str) for b in button):
for b in button:
Button(self, button_name=b)
VirtualButton(self, button_name=button)
if door := self.args.get('door'):
if isinstance(door, str):
self.door = Door(self, entity_id=door)
if motion := self.args.get('motion'):
self.motion = MotionSensor(
self, sensor_entity_id=motion['sensor'], ref_entity_id=motion['ref_entity']
)
state: ControllerStateConfig
for state in sorted(self._room_config.states, key=lambda s: s.time, reverse=True):
if isinstance(state.time, datetime.datetime):
t = state.time.time()
else:
t = state.time
if t < self.get_now().time():
self.log(f'Initial state: {state.time}', level='DEBUG')
self.set_controller_scene(state)
break
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def terminate(self):
self.log('[bold red]Terminating[/]', level='DEBUG')
def get_app_entities(self) -> Set[str]:
"""Gets a set of all the entities referenced by any of the state definitions"""
def gen():
for state in self._room_config.states:
if isinstance(state.scene, str):
assert state.scene.startswith(
'scene.'
), "Scene definition must start with 'scene.'"
entities = self.get_state(state.scene, attribute='entity_id')
yield from entities
else:
yield from state.scene.keys()
return set(gen())
def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states.
Parsed states have an absolute time for the current day.
"""
# re-parse the state strings into times for the current day
self._room_config = RoomControllerConfig.model_validate(self.args)
self.log(
f'{len(self._room_config.states)} states in the app configuration',
level='DEBUG',
)
for state in self._room_config.states:
if state.time is None and state.elevation is not None:
state.time = self.time_at_elevation(
elevation=state.elevation, direction=state.direction
).time()
elif isinstance(state.time, str):
state.time = self.parse_time(state.time)
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
try:
self.run_at(
callback=lambda **kwargs: self.set_controller_scene(kwargs['state']),
start=state.time.strftime('%H:%M:%S'),
state=state,
)
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
self.state_entity.listen_state(
lambda *args, **kwargs: self.call_service(
f'{self.name}/activate_any_on', namespace='controller', cause='state transition'
),
attribute='all',
)
def set_controller_scene(self, state: ControllerStateConfig):
"""Sets the internal state for the app. Only used by the scheduled transition"""
try:
self.state_entity.set_state(attributes=state.model_dump())
except Exception:
self.logger.error(traceback.format_exc())
else:
self.log(f'Set controller state of {self.name}: {state.model_dump()}', level='DEBUG')
def current_state(self) -> ControllerStateConfig:
if self.sleep_bool():
self.log('sleep: active', level='DEBUG')
if state := self.args.get('sleep_state'):
return ControllerStateConfig(**state)
else:
return ControllerStateConfig()
else:
try:
attrs = self.state_entity.get_state('all')['attributes']
state = ControllerStateConfig.model_validate(attrs)
except Exception as e:
state = ControllerStateConfig()
logger.exception(e)
finally:
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
return state
def activate(self, **kwargs):
self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
def _service_activate(self, namespace: str, domain: str, service: str, **kwargs):
# self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
state = self.current_state()
if isinstance(state.scene, str):
self.turn_on(state.scene)
# self.turn_on(state.scene, transition=0)
elif isinstance(state.scene, dict):
scene = state.to_apply_kwargs()
self.call_service('scene/apply', **scene)
# scene = state.to_apply_kwargs(transition=0)
def activate_any_on(self, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
def _service_activate_any_on(self, namespace: str, domain: str, service: str, **kwargs):
if self.any_on() and not self.manual_mode():
self.activate(**kwargs)
def activate_all_off(self, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
def _service_activate_all_off(self, namespace: str, domain: str, service: str, **kwargs):
if self.all_off() and not self.manual_mode():
self.activate(**kwargs)
def deactivate(self, **kwargs):
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
def _service_deactivate(self, namespace: str, domain: str, service: str, **kwargs):
for e in self.app_entities:
self.turn_off(e)
def toggle(self, **kwargs):
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
def _service_toggle(self, namespace: str, domain: str, service: str, **kwargs):
if self.any_on():
self.deactivate(**kwargs)
else:
self.activate(**kwargs)
def app_entity_states(self) -> Dict[str, str]:
states = {entity: self.get_state(entity) for entity in self.app_entities}
return states
def all_off(self) -> bool:
""" "All off" is the logic opposite of "any on"
Returns:
bool: Whether all the lights associated with the app are off
"""
states = self.app_entity_states()
return all(state != 'on' for entity, state in states.items())
def any_on(self) -> bool:
""" "Any on" is the logic opposite of "all off"
Returns:
bool: Whether any of the lights associated with the app are on
"""
states = self.app_entity_states()
return any(state == 'on' for entity, state in states.items())
def sleep_bool(self) -> bool:
if sleep_var := self.args.get('sleep'):
return self.get_state(sleep_var) == 'on'
else:
return False
def manual_mode(self) -> bool:
if manual_entity := self.args.get('manual_mode'):
return self.get_state(manual_entity) == 'on'
else:
return False
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating
Priority:
- Value in scene definition
- Default value
- Normal - value in app definition
- Sleep - 0
"""
if sleep_active := self.sleep_bool():
self.log(f'Sleeping mode active: {sleep_active}', level='DEBUG')
return datetime.timedelta()
else:
return self.current_state().off_duration or self._room_config.off_duration