27 Commits

Author SHA1 Message Date
John Lancaster
1c1990b632 fixed default off_duration 2024-01-22 18:57:24 -06:00
John Lancaster
826e866a52 moved off_duration to roomconfig 2024-01-22 18:42:13 -06:00
John Lancaster
6ef45830fb current state moved to RoomConfig 2024-01-22 18:32:36 -06:00
John Lancaster
53cb116372 tweaked logs 2024-01-22 07:58:45 -06:00
John Lancaster
e973205b06 made current_scene more explicit 2024-01-21 10:53:55 -06:00
John Lancaster
f00421d273 should be last tweaks to fix sleep mode scenes 2024-01-06 13:33:19 -06:00
John Lancaster
061ad8d322 fixed default off duration 2024-01-05 13:27:40 -06:00
John Lancaster
12ac6c4cc4 added RoomState and RoomConfig dataclasses 2024-01-02 22:56:58 -06:00
John Lancaster
dda9d2b501 small bugfix 2024-01-02 21:54:20 -06:00
John Lancaster
f088a23da7 maybe this time? 2023-12-08 16:36:22 -06:00
John Lancaster
4f11d9bdcc fixed states? 2023-12-08 16:29:25 -06:00
John Lancaster
cf9f0f3244 moved toggle logic 2023-12-08 08:22:57 -06:00
John Lancaster
e0caaedc15 cancelling all callbacks on a state change 2023-12-06 20:58:56 -06:00
John Lancaster
0d70649bb8 fixed async all_off/any_on 2023-12-04 14:17:22 -06:00
John Lancaster
145aeca667 changed on callback to trigger on brightness 2023-12-02 12:48:34 -06:00
John Lancaster
8b00faceb6 improved logging 2023-11-25 22:36:00 -06:00
John Lancaster
148645094a toggle stuff 2023-11-25 22:04:10 -06:00
John Lancaster
e659629c71 logging tweaks 2023-11-25 21:19:59 -06:00
John Lancaster
4886eb29d6 deactivate cause 2023-11-25 21:12:34 -06:00
John Lancaster
7bd46ffc42 deactivate cause 2023-11-25 21:12:24 -06:00
John Lancaster
945abc91c3 removed sync state 2023-11-25 21:12:15 -06:00
John Lancaster
af28cda9a5 logging improvements 2023-11-25 19:29:28 -06:00
John Lancaster
afc5e45642 bug fix from scene detector 2023-11-25 18:41:49 -06:00
John Lancaster
8a5431a72b better use of sync wrapper 2023-11-25 18:00:12 -06:00
John Lancaster
50b79c8d13 streamlined initialize 2023-11-25 17:52:25 -06:00
John Lancaster
5f9218311c removed obsolete 2023-11-25 17:45:58 -06:00
John Lancaster
7de5dfa3a8 tweaked button init 2023-11-25 17:44:27 -06:00
14 changed files with 514 additions and 1020 deletions

50
button.py Normal file
View File

@@ -0,0 +1,50 @@
import asyncio
import json
from appdaemon.plugins.mqtt.mqttapi import Mqtt
from room_control import RoomController
class Button(Mqtt):
async def initialize(self):
self.app: RoomController = await self.get_app(self.args['app'])
self.setup_buttons(self.args['button'])
def setup_buttons(self, buttons):
if isinstance(buttons, list):
for button in buttons:
self.setup_button(button)
else:
self.setup_button(buttons)
def setup_button(self, name: str):
topic = f'zigbee2mqtt/{name}'
self.mqtt_subscribe(topic, namespace='mqtt')
self.listen_event(self.handle_button, "MQTT_MESSAGE", topic=topic, namespace='mqtt', button=name)
self.log(f'"{topic}" controls app {self.app.name}')
async def handle_button(self, event_name, data, kwargs):
try:
payload = json.loads(data['payload'])
action = payload['action']
except json.JSONDecodeError:
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
except KeyError as e:
return
else:
if action != '':
await self.handle_action(action)
async def handle_action(self, action: str):
if action == 'single':
self.log(f' {action.upper()} '.center(50, '='))
state = await self.get_state(self.args['ref_entity'])
kwargs = {
'kwargs': {'cause': f'button single click: toggle while {state}'}
}
if state == 'on':
self.app.deactivate(**kwargs)
else:
await self.app.activate(**kwargs)
else:
pass

8
door.py Normal file
View File

@@ -0,0 +1,8 @@
from appdaemon.plugins.hass.hassapi import Hass
from room_control import RoomController
class Door(Hass):
async def initialize(self):
app: RoomController = await self.get_app(self.args['app'])
await self.listen_state(app.activate_all_off, entity_id=self.args['door'], new='on', cause='door open')

111
motion.py Normal file
View File

@@ -0,0 +1,111 @@
import re
from datetime import timedelta
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from room_control import RoomController
from appdaemon import utils
class Motion(Hass):
@property
def sensor(self) -> Entity:
return self.get_entity(self.args['sensor'])
@property
def sensor_state(self) -> bool:
return self.sensor.state == 'on'
@property
def ref_entity(self) -> Entity:
return self.get_entity(self.args['ref_entity'])
@property
async def ref_entity_state(self) -> bool:
return (await self.ref_entity.get_state()) == 'on'
async def initialize(self):
self.app: RoomController = await self.get_app(self.args['app'])
self.log(f'Connected to app {self.app.name}')
base_kwargs = dict(
entity_id=self.ref_entity.entity_id,
immediate=True, # avoids needing to sync the state
)
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
self.listen_state(**base_kwargs, attribute='brightness', callback=self.callback_light_on)
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
async def listen_motion_on(self):
"""Sets up the motion on callback to activate the room
"""
await self.cancel_motion_callback()
await self.listen_state(
callback=self.app.activate_all_off,
entity_id=self.sensor.entity_id,
new='on',
oneshot=True,
cause='motion on'
)
self.log(f'Waiting for motion on {self.sensor.friendly_name}')
async def listen_motion_off(self, duration: timedelta):
"""Sets up the motion off callback to deactivate the room
"""
await self.cancel_motion_callback()
await self.listen_state(
callback=self.app.deactivate,
entity_id=self.sensor.entity_id,
new='off',
duration=duration.total_seconds(),
oneshot=True,
cause='motion off'
)
self.log(f'Waiting for motion to stop on {self.sensor.friendly_name} for {duration}')
@utils.sync_wrapper
async def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns on
"""
if new is not None:
self.log(f'{entity} turned on')
duration = await self.app.off_duration()
await self.listen_motion_off(duration)
@utils.sync_wrapper
async def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns off
"""
self.log(f'{entity} turned off')
await self.listen_motion_on()
async def get_app_callbacks(self, name: str = None):
"""Gets all the callbacks associated with the app
"""
name = name or self.name
callbacks = {
handle: info
for app_name, callbacks in (await self.get_callback_entries()).items()
for handle, info in callbacks.items()
if app_name == name
}
return callbacks
async def get_sensor_callbacks(self):
return {
handle: info
for handle, info in (await self.get_app_callbacks()).items()
if info['entity'] == self.sensor.entity_id
}
async def cancel_motion_callback(self):
callbacks = await self.get_sensor_callbacks()
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
for handle, info in callbacks.items():
entity = info["entity"]
kwargs = info['kwargs']
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
new = m.group('new')
await self.cancel_listen_state(handle)
self.log(f'cancelled callback for sensor {entity} turning {new}')

View File

@@ -1,27 +0,0 @@
[project]
name = "room-control"
version = "0.1.0"
description = "Add your description here"
authors = [
{ name = "John Lancaster", email = "32917998+jsl12@users.noreply.github.com" }
]
dependencies = [
"appdaemon>=4.4.2",
"rich>=13.7.1",
"pydantic>=2.7.1",
"ruff>=0.4.2",
]
readme = "README.md"
requires-python = ">= 3.8,<3.12"
[tool.setuptools]
include-package-data = true
# [tool.setuptools.package-data]
# mypkg = ["*.yaml"]
[tool.setuptools.data-files]
config = ["config/default_config.yaml"]
[tool.ruff.format]
quote-style = 'single'

345
room_control.py Executable file
View File

@@ -0,0 +1,345 @@
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import Dict, List
import appdaemon.utils as utils
import yaml
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from appdaemon.plugins.mqtt.mqttapi import Mqtt
from astral import SunDirection
def str_to_timedelta(input_str: str) -> timedelta:
try:
hours, minutes, seconds = map(int, input_str.split(':'))
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
except Exception:
return timedelta()
@dataclass
class RoomState:
scene: Dict[str, Dict[str, str | int]]
off_duration: timedelta = None
time: time = None
time_fmt: List[str] = field(default_factory=lambda : ['%H:%M:%S', '%I:%M:%S %p'], repr=False)
elevation: int | float = None
direction: SunDirection = None
def __post_init__(self):
if isinstance(self.time, str):
for fmt in self.time_fmt:
try:
self.time = datetime.strptime(self.time, fmt).time()
except:
continue
else:
break
if self.elevation is not None:
assert self.direction is not None, f'Elevation setting requires a direction'
if self.direction.lower() == 'setting':
self.direction = SunDirection.SETTING
elif self.direction.lower() == 'rising':
self.direction = SunDirection.RISING
else:
raise ValueError(f'Invalid sun direction: {self.direction}')
if isinstance(self.elevation, str):
self.elevation = float(self.elevation)
if isinstance(self.off_duration, str):
self.off_duration = str_to_timedelta(self.off_duration)
@classmethod
def from_json(cls, json_input):
return cls(**json_input)
@dataclass
class RoomConfig:
states: List[RoomState]
off_duration: timedelta = None
def __post_init__(self):
if isinstance(self.off_duration, str):
self.off_duration = str_to_timedelta(self.off_duration)
@classmethod
def from_app_config(cls, app_cfg: Dict[str, Dict]):
if 'off_duration' in app_cfg:
kwargs = {'off_duration': app_cfg['off_duration']}
else:
kwargs = {}
self = cls(
states=[RoomState.from_json(s) for s in app_cfg['states']],
**kwargs
)
return self
@classmethod
def from_yaml(cls, yaml_path: Path, app_name: str):
with yaml_path.open('r') as f:
cfg: Dict = yaml.load(f, Loader=yaml.SafeLoader)[app_name]
return cls.from_app_config(cfg)
def sort_states(self):
"""Should only be called after all the times have been resolved
"""
assert all(isinstance(state.time, time) for state in self.states), 'Times have not all been resolved yet'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
def current_state(self, now: time) -> RoomState:
time_fmt = "%I:%M:%S %p"
print(now.strftime(time_fmt))
self.sort_states()
for state in self.states:
if state.time <= now:
return state
else:
# self.log(f'Defaulting to first state')
return self.states[0]
def current_scene(self, now: time) -> Dict:
state = self.current_state(now)
return state.scene
def current_off_duration(self, now: time) -> timedelta:
state = self.current_state(now)
if state.off_duration is None:
if self.off_duration is None:
raise ValueError(f'Need an off duration')
else:
return self.off_duration
else:
return state.off_duration
class RoomController(Hass, Mqtt):
"""Class for linking an light with a motion sensor.
- Separate the turning on and turning off functions.
- Use the state change of the light to set up the event for changing to the other state
- `handle_on`
- `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time.
"""
@property
def states(self) -> List[RoomState]:
return self._room_config.states
@states.setter
def states(self, new: List[RoomState]):
assert all(isinstance(s, RoomState) for s in new), f'Invalid: {new}'
self._room_config.states = new
async def initialize(self):
self.app_entities = await self.gather_app_entities()
# self.log(f'entities: {self.app_entities}')
await self.refresh_state_times()
await self.run_daily(callback=self.refresh_state_times, start='00:00:00')
async def gather_app_entities(self) -> List[str]:
"""Returns a list of all the entities involved in any of the states
"""
async def async_generator():
for settings in deepcopy(self.args['states']):
if (scene := settings.get('scene')):
if isinstance(scene, str):
assert scene.startswith('scene.'), f"Scene definition must start with 'scene.' for app {self.name}"
entity: Entity = self.get_entity(scene)
entity_state = await entity.get_state('all')
attributes = entity_state['attributes']
for entity in attributes['entity_id']:
yield entity
else:
for key in scene.keys():
yield key
else:
yield self.args['entity']
entities = [e async for e in async_generator()]
return set(entities)
async def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states.
Parsed states have an absolute time for the current day.
"""
# re-parse the state strings into times for the current day
self._room_config = RoomConfig.from_app_config(self.args)
self.log(f'{len(self._room_config.states)} states in the RoomConfig')
for state in self._room_config.states:
if state.time is None and state.elevation is not None:
state.time = self.AD.sched.location.time_at_elevation(
elevation=state.elevation,
direction=state.direction
).time()
elif isinstance(state.time, str):
state.time = await self.parse_time(state.time)
assert isinstance(state.time, time), f'Invalid time: {state.time}'
for state in self.states:
self.log(f'State: {state.time.strftime("%I:%M:%S %p")} {state.scene}')
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
# schedule the transitions
for state in self.states[::-1]:
# t: time = state['time']
t: time = state.time
try:
await self.run_at(
callback=self.activate_any_on,
start=t.strftime('%H:%M:%S'),
cause='scheduled transition'
)
except ValueError:
# happens when the callback time is in the past
pass
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
async def current_state(self, now: time = None) -> RoomState:
if (await self.sleep_bool()):
self.log(f'sleep: active')
if (state := self.args.get('sleep_state')):
return RoomState.from_json(state)
else:
return RoomState(scene={})
else:
now = now or (await self.get_now()).time()
self.log(f'Getting state for {now}', level='DEBUG')
state = self._room_config.current_state(now)
self.log(f'Current state: {state}', level='DEBUG')
return state
async def current_scene(self, now: time = None) -> Dict[str, Dict[str, str | int]]:
state = await self.current_state(now)
assert isinstance(state, RoomState)
self.log(f'Current scene: {state}')
return state.scene
async def app_entity_states(self) -> Dict[str, str]:
states = {
entity: (await self.get_state(entity))
for entity in self.app_entities
}
return states
async def all_off(self) -> bool:
""""All off" is the logic opposite of "any on"
Returns:
bool: Whether all the lights associated with the app are off
"""
states = await self.app_entity_states()
return all(state != 'on' for entity, state in states.items())
async def any_on(self) -> bool:
""""Any on" is the logic opposite of "all off"
Returns:
bool: Whether any of the lights associated with the app are on
"""
states = await self.app_entity_states()
return any(state == 'on' for entity, state in states.items())
async def sleep_bool(self) -> bool:
if (sleep_var := self.args.get('sleep')):
return (await self.get_state(sleep_var)) == 'on'
else:
return False
# @sleep_bool.setter
# def sleep_bool(self, val) -> bool:
# if (sleep_var := self.args.get('sleep')):
# if isinstance(val, str):
# self.set_state(sleep_var, state=val)
# elif isinstance(val, bool):
# self.set_state(sleep_var, state='on' if val else 'off')
# else:
# raise ValueError('Sleep variable is undefined')
async def off_duration(self, now: time = None) -> timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating
Priority:
- Value in scene definition
- Default value
- Normal - value in app definition
- Sleep - 0
"""
sleep_mode_active = await self.sleep_bool()
if sleep_mode_active:
self.log(f'Sleeping mode active: {sleep_mode_active}')
return timedelta()
else:
now = now or (await self.get_now()).time()
return self._room_config.current_off_duration(now)
@utils.sync_wrapper
async def activate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
if kwargs is not None:
cause = kwargs.get('cause', 'unknown')
else:
cause = 'unknown'
self.log(f'Activating: {cause}')
scene = await self.current_scene()
if isinstance(scene, str):
self.turn_on(scene)
self.log(f'Turned on scene: {scene}')
elif isinstance(scene, dict):
# makes setting the state to 'on' optional in the yaml definition
for entity, settings in scene.items():
if 'state' not in settings:
scene[entity]['state'] = 'on'
self.call_service('scene/apply', entities=scene, transition=0)
self.log(f'Applied scene: {scene}')
elif scene is None:
self.log(f'No scene, ignoring...')
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
# self.callback_light_off()
else:
self.log(f'ERROR: unknown scene: {scene}')
@utils.sync_wrapper
async def activate_all_off(self, *args, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()
"""
if (await self.all_off()):
self.activate(*args, **kwargs)
else:
self.log(f'Skipped activating - everything is not off')
@utils.sync_wrapper
async def activate_any_on(self, *args, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()
"""
if (await self.any_on()):
self.activate(*args, **kwargs)
else:
self.log(f'Skipped activating - everything is off')
def deactivate(self, entity = None, attribute = None, old = None, new = None, kwargs = None):
cause = kwargs.get('cause', 'unknown')
self.log(f'Deactivating: {cause}')
for e in self.app_entities:
self.turn_off(e)
self.log(f'Turned off {e}')

View File

@@ -1,26 +0,0 @@
version: 1
disable_existing_loggers: false
formatters:
rich:
style: "{"
format: "[room]{room}[/] {message}"
# format: "{message}"
datefmt: '%H:%M:%S.%f'
rich_component:
style: "{"
format: "[room]{room}[/] [component]{component}[/] {message}"
# format: "{message}"
datefmt: '%H:%M:%S.%f'
handlers:
rich:
formatter: rich
'()': 'rich.logging.RichHandler'
markup: True
show_path: false
omit_repeated_times: false
rich_component:
formatter: rich_component
'()': 'rich.logging.RichHandler'
markup: True
show_path: false
omit_repeated_times: false

View File

@@ -1,6 +0,0 @@
from .controller import RoomController
from .motion import Motion
from .button import Button
from .door import Door
__all__ = ['RoomController', 'Motion', 'Button', 'Door']

View File

@@ -1,150 +0,0 @@
import asyncio
import logging.config
from logging import Handler, LogRecord
from typing import TYPE_CHECKING
import aiohttp
from appdaemon.adapi import ADAPI
from .console import RCHighlighter, console
if TYPE_CHECKING:
from room_control import RoomController
class Singleton(type):
"""
https://en.wikipedia.org/wiki/Singleton_pattern
https://stackoverflow.com/q/6760685
https://realpython.com/python-metaclasses/
https://docs.python.org/3/reference/datamodel.html#metaclasses
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class AsyncLokiHandler(Handler, metaclass=Singleton):
loop: asyncio.BaseEventLoop
loki_url: str
queue: asyncio.Queue
consumer_task: asyncio.Task
def __init__(self, loop: asyncio.BaseEventLoop, loki_url: str):
console.print(f' [bold yellow]{hex(id(self))}[/] '.center(50, '-'))
super().__init__()
self.loop = loop
self.loki_url = loki_url
self.queue = asyncio.Queue()
self.consumer_task = self.loop.create_task(self.log_consumer())
def emit(self, record: LogRecord):
self.loop.create_task(self.send_to_loki(record))
async def send_to_loki(self, record: LogRecord):
labels = {'room': record.room}
if component := getattr(record, 'component', False):
labels['component'] = component
message = self.format(record)
ns = round(record.created * 1_000_000_000)
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
payload = {'streams': [{'stream': labels, 'values': [[str(ns), message]]}]}
await self.queue.put(payload)
async def log_consumer(self):
async with aiohttp.ClientSession() as session:
try:
while True:
payload = await self.queue.get()
await session.post(self.loki_url, json=payload, timeout=1)
# console.print('[bold yellow]Sent to Loki[/]')
except asyncio.CancelledError:
console.print('[bold red]Cancelled[/]')
class RoomControlBase(ADAPI):
app: 'RoomController'
logger: logging.LoggerAdapter
def initialize(self, room: str, component: str = None):
"""Sets up the logging"""
logger_name = f'Appdaemon.{room}'
extra_attributes = {'room': room}
# all the stuff that has to happen for a component
if component is not None:
self.app: 'RoomController' = self.get_app(self.args['app'])
logger_name += f'.{component}'
extra_attributes['component'] = component
rich_handler_args = {
'console': console,
'highlighter': RCHighlighter(),
'markup': True,
'show_path': False,
'omit_repeated_times': False,
}
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'filters': {'unmarkup': {'()': 'room_control.console.UnMarkupFilter'}},
'formatters': {
'basic': {'style': '{', 'format': '{message}'},
'rich': {
'style': '{',
'format': '[room]{room}[/] {message}',
'datefmt': '%H:%M:%S.%f',
},
'rich_component': {
'style': '{',
'format': '[room]{room}[/] [component]{component}[/] {message}',
'datefmt': '%H:%M:%S.%f',
},
},
'handlers': {
'rich': {
'formatter': 'rich',
'()': 'rich.logging.RichHandler',
**rich_handler_args,
},
'rich_component': {
'formatter': 'rich_component',
'()': 'rich.logging.RichHandler',
**rich_handler_args,
},
'async_queue': {
'formatter': 'basic',
'filters': ['unmarkup'],
'()': 'room_control.base.AsyncLokiHandler',
'loop': self.AD.loop,
'loki_url': self.args['loki_url'],
},
},
'loggers': {
logger_name: {
'level': 'INFO',
'propagate': False,
'handlers': [
'rich' if component is None else 'rich_component',
'async_queue',
],
}
},
}
)
self.logger = logging.LoggerAdapter(logging.getLogger(logger_name), extra_attributes)
self.handler: AsyncLokiHandler = self.logger.logger.handlers[-1]
def terminate(self):
status: bool = self.handler.consumer_task.cancel()
if status:
self.log('Cancelled consumer task')

View File

@@ -1,70 +0,0 @@
import json
from dataclasses import dataclass
from typing import List
from appdaemon.plugins.mqtt.mqttapi import Mqtt
from .base import RoomControlBase
from .model import ButtonConfig
@dataclass(init=False)
class Button(RoomControlBase, Mqtt):
button: str | List[str]
rich: bool = False
config: ButtonConfig
def initialize(self):
super().initialize(room=self.args['app'], component='Button')
self.config = ButtonConfig(**self.args)
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
self.button = self.config.button
self.setup_buttons(self.button)
def setup_buttons(self, buttons):
if isinstance(buttons, list):
for button in buttons:
self.setup_button(button)
else:
self.setup_button(buttons)
def setup_button(self, name: str):
topic = f'zigbee2mqtt/{name}'
# self.mqtt_subscribe(topic, namespace='mqtt')
self.listen_event(
self.handle_button,
'MQTT_MESSAGE',
topic=topic,
namespace='mqtt',
button=name,
)
self.log(f'MQTT topic [topic]{topic}[/] controls app [room]{self.app.name}[/]')
def handle_button(self, event_name, data, kwargs):
try:
payload = json.loads(data['payload'])
action = payload['action']
except json.JSONDecodeError:
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
except KeyError:
return
else:
if isinstance(action, str) and action != '':
self.log(f'Action: [yellow]{action}[/]')
self.handle_action(action)
def handle_action(self, action: str):
if action == 'single':
state = self.get_state(self.args['ref_entity'])
kwargs = {'kwargs': {'cause': f'button single click: toggle while {state}'}}
if manual_entity := self.args.get('manual_mode'):
self.set_state(entity_id=manual_entity, state='off')
if state == 'on':
self.app.deactivate(**kwargs)
else:
self.app.activate(**kwargs)
else:
pass

View File

@@ -1,189 +0,0 @@
import json
import logging
import logging.config
import re
from abc import ABC
from dataclasses import asdict
from importlib.resources import files
import yaml
from rich.console import Console
from rich.highlighter import RegexHighlighter
from rich.theme import Theme
console = Console(
width=100,
theme=Theme(
{
'log.time': 'none',
# 'logging.level.info': 'none',
'room': 'italic bright_cyan',
'component': 'dark_violet',
'friendly_name': 'yellow',
'light': 'light_slate_blue',
'sensor': 'green',
'time': 'yellow',
'z2m': 'bright_black',
'topic': 'chartreuse2',
'true': 'green',
'false': 'red',
}
),
log_time_format='%Y-%m-%d %I:%M:%S %p',
# highlighter=RCHighlighter(),
)
class RCHighlighter(RegexHighlighter):
highlights = [
r'(?P<light>(light|switch)\.\w+)',
r'(?P<time>\d+:\d+:\d+)',
r'(?P<z2m>zigbee2mqtt/)',
r'(?P<sensor>binary_sensor\.\w+)',
# r"'state': '(?P<on>on)|(?P<off>off)'"
r'(?P<true>True)|(?P<false>False)',
]
def load_rich_config(
room: str = None, component: str = None, level: str = 'INFO'
) -> logging.LoggerAdapter:
"""Loads the config from the .rich_logging.yaml file, adds some bits, and returns a LoggerAdapter
"""
logger_name = f'Appdaemon.{room}'
if component is not None:
logger_name += f'.{component}'
with files('room_control').joinpath('.rich_logging.yaml').open('r') as f:
RICH_CFG = yaml.safe_load(f)
RICH_CFG['handlers']['rich']['console'] = console
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
RICH_CFG['handlers']['rich_component']['console'] = console
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
# RICH_CFG['handlers']['async_queue'] = {
# 'formatter': 'basic',
# '()': 'room_control.loki.AsyncQueueHandler',
# 'loop': self.AD.loop,
# 'queue': self.loki_queue,
# }
RICH_CFG['loggers'] = {
logger_name: {
'handlers': ['rich' if component is None else 'rich_component'],
'propagate': False,
'level': level,
}
}
extra = {'room': room}
if component is not None:
extra['component'] = component
logging.config.dictConfig(RICH_CFG)
logger = logging.getLogger(logger_name)
adapter = logging.LoggerAdapter(logger, extra)
return adapter
class ContextSettingFilter(logging.Filter, ABC):
"""Adds all the dataclass fields as attributes to LogRecord objects.
Intended to be inherited from by something that uses the dataclasses.dataclass decorator.
"""
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
for name, val in asdict(self).items():
if val is not None:
setattr(record, name, val)
return record
class RoomFilter(logging.Filter):
"""Used to filter out messages that have a component field because they will have already been printed by their respective logger."""
def filter(self, record: logging.LogRecord) -> bool:
return not hasattr(record, 'component')
class UnMarkupFilter(logging.Filter):
md_regex = re.compile(r'(?P<open>\[.*?\])(?P<text>.*?)(?P<close>\[\/\])')
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
record.msg = self.md_regex.sub(r'\g<text>', record.msg)
return record
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return json.dumps(record.__dict__)
RICH_HANDLER_CFG = {
'()': 'rich.logging.RichHandler',
'markup': True,
'show_path': False,
# 'show_time': False,
'omit_repeated_times': False,
'console': console,
'highlighter': RCHighlighter(),
}
## currently unused
def room_logging_config(name: str):
return {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'room': {'()': 'room_control.console.RoomFilter'},
'unmarkup': {'()': 'room_control.console.UnMarkupFilter'},
},
'formatters': {
'rich_room': {
'style': '{',
'format': '[room]{room}[/] {message}',
'datefmt': '%H:%M:%S.%f',
},
'file': {
'style': '{',
'format': '{asctime}.{msecs:03.0f} {levelname:8} {name}: {message}',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
'json': {'()': 'room_control.console.JSONFormatter'},
},
'handlers': {
'rich_room': {
'formatter': 'rich_room',
'filters': ['room'],
**RICH_HANDLER_CFG,
},
'file': {
'filters': ['unmarkup'],
'formatter': 'file',
'class': 'logging.handlers.RotatingFileHandler',
# 'class': 'logging.FileHandler',
'filename': f'/logs/{name}.log',
'maxBytes': 1000000,
'backupCount': 3,
},
'json': {
'filters': ['unmarkup'],
'formatter': 'json',
'filename': f'/logs/{name}.jsonl',
'class': 'logging.handlers.RotatingFileHandler',
'maxBytes': 1000000,
'backupCount': 3,
},
},
'loggers': {
f'AppDaemon.{name}': {
'level': 'INFO',
'propagate': False,
'handlers': [
'rich_room',
'file',
'json',
],
},
},
}

View File

@@ -1,235 +0,0 @@
import datetime
import json
import logging
import logging.config
from copy import deepcopy
from typing import Dict, List
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from .base import RoomControlBase
from .model import ControllerStateConfig, RoomControllerConfig
logger = logging.getLogger(__name__)
class RoomController(RoomControlBase, Hass):
"""Class for linking room's lights with a motion sensor.
- Separate the turning on and turning off functions.
- Use the state change of the light to set up the event for changing to the other state
- `handle_on`
- `handle_off`
- When the light comes on, check if it's attributes match what they should, given the time.
"""
@property
def states(self) -> List[ControllerStateConfig]:
return self._room_config.states
@states.setter
def states(self, new: List[ControllerStateConfig]):
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
self._room_config.states = new
def initialize(self):
super().initialize(room=self.name)
self.app_entities = self.gather_app_entities()
self.refresh_state_times()
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def terminate(self):
self.log('[bold red]Terminating[/]', level='DEBUG')
def gather_app_entities(self) -> List[str]:
"""Returns a list of all the entities involved in any of the states"""
def generator():
for settings in deepcopy(self.args['states']):
if scene := settings.get('scene'):
if isinstance(scene, str):
assert scene.startswith(
'scene.'
), f"Scene definition must start with 'scene.' for app {self.name}"
entity: Entity = self.get_entity(scene)
entity_state = entity.get_state('all')
attributes = entity_state['attributes']
for entity in attributes['entity_id']:
yield entity
else:
for key in scene.keys():
yield key
else:
yield self.args['entity']
return set(list(generator()))
def refresh_state_times(self, *args, **kwargs):
"""Resets the `self.states` attribute to a newly parsed version of the states.
Parsed states have an absolute time for the current day.
"""
# re-parse the state strings into times for the current day
self._room_config = RoomControllerConfig.model_validate(self.args)
self.log(
f'{len(self._room_config.states)} states in the app configuration',
level='DEBUG',
)
for state in self._room_config.states:
if state.time is None and state.elevation is not None:
state.time = self.AD.sched.location.time_at_elevation(
elevation=state.elevation, direction=state.direction
).time()
elif isinstance(state.time, str):
state.time = self.parse_time(state.time)
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
# schedule the transitions
for state in self.states[::-1]:
# t: datetime.time = state['time']
t: datetime.time = state.time
try:
self.run_at(
callback=self.activate_any_on,
start=t.strftime('%H:%M:%S'),
cause='scheduled transition',
)
except ValueError:
# happens when the callback time is in the past
pass
except Exception as e:
self.log(f'Failed with {type(e)}: {e}')
def current_state(self, now: datetime.time = None) -> ControllerStateConfig:
if self.sleep_bool():
self.log('sleep: active')
if state := self.args.get('sleep_state'):
return ControllerStateConfig(**state)
else:
return ControllerStateConfig(scene={})
else:
now = now or self.get_now().time().replace(microsecond=0)
self.log(f'Getting state for {now.strftime("%I:%M:%S %p")}', level='DEBUG')
state = self._room_config.current_state(now)
self.log(f'Current state: {state.time}', level='DEBUG')
return state
def app_entity_states(self) -> Dict[str, str]:
states = {entity: self.get_state(entity) for entity in self.app_entities}
return states
def all_off(self) -> bool:
""" "All off" is the logic opposite of "any on"
Returns:
bool: Whether all the lights associated with the app are off
"""
states = self.app_entity_states()
return all(state != 'on' for entity, state in states.items())
def any_on(self) -> bool:
""" "Any on" is the logic opposite of "all off"
Returns:
bool: Whether any of the lights associated with the app are on
"""
states = self.app_entity_states()
return any(state == 'on' for entity, state in states.items())
def sleep_bool(self) -> bool:
if sleep_var := self.args.get('sleep'):
return self.get_state(sleep_var) == 'on'
else:
return False
def manual_mode(self) -> bool:
if manual_entity := self.args.get('manual_mode'):
return self.get_state(manual_entity) == 'on'
else:
return False
# @sleep_bool.setter
# def sleep_bool(self, val) -> bool:
# if (sleep_var := self.args.get('sleep')):
# if isinstance(val, str):
# self.set_state(sleep_var, state=val)
# elif isinstance(val, bool):
# self.set_state(sleep_var, state='on' if val else 'off')
# else:
# raise ValueError('Sleep variable is undefined')
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
"""Determines the time that the motion sensor has to be clear before deactivating
Priority:
- Value in scene definition
- Default value
- Normal - value in app definition
- Sleep - 0
"""
sleep_mode_active = self.sleep_bool()
if sleep_mode_active:
self.log(f'Sleeping mode active: {sleep_mode_active}')
return datetime.timedelta()
else:
now = now or self.get_now().time()
return self._room_config.current_off_duration(now)
def activate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
if kwargs is not None:
cause = kwargs.get('cause', 'unknown')
else:
cause = 'unknown'
self.log(f'Activating: {cause}')
scene_kwargs = self.current_state().to_apply_kwargs(transition=0)
if isinstance(scene_kwargs, str):
self.turn_on(scene_kwargs)
self.log(f'Turned on scene: {scene_kwargs}')
elif isinstance(scene_kwargs, dict):
self.call_service('scene/apply', **scene_kwargs)
self.log(f'Applied scene:\n{json.dumps(scene_kwargs, indent=2)}', level='DEBUG')
elif scene_kwargs is None:
self.log('No scene, ignoring...')
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
# self.callback_light_off()
else:
self.log(f'ERROR: unknown scene: {scene_kwargs}')
def activate_all_off(self, *args, **kwargs):
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
if self.all_off():
self.activate(*args, **kwargs)
else:
self.log('Skipped activating - everything is not off')
def activate_any_on(self, *args, **kwargs):
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
if self.any_on() and not self.manual_mode():
self.activate(*args, **kwargs)
else:
self.log('Skipped activating - everything is off')
def toggle_activate(self, *args, **kwargs):
if self.any_on():
self.deactivate(*args, **kwargs)
else:
self.activate(*args, **kwargs)
def deactivate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
cause = kwargs.get('cause', 'unknown')
self.log(f'Deactivating: {cause}')
for e in self.app_entities:
self.turn_off(e)
self.log(f'Turned off {e}')

View File

@@ -1,17 +0,0 @@
from appdaemon.plugins.hass.hassapi import Hass
from .base import RoomControlBase
class Door(RoomControlBase, Hass):
def initialize(self):
super().initialize(room=self.args['app'], component='Door')
self.listen_state(
self.app.activate_all_off,
entity_id=self.args['door'],
new='on',
cause='door open',
)
self.log(f'Waiting for door to open: [bold green]{self.args["door"]}[/]')

View File

@@ -1,135 +0,0 @@
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import Annotated, Dict, List, Optional, Self
import yaml
from astral import SunDirection
from pydantic import BaseModel, BeforeValidator, Field, model_validator
from pydantic_core import PydanticCustomError
from rich.console import Console, ConsoleOptions, RenderResult
from rich.table import Column, Table
def str_to_timedelta(input_str: str) -> timedelta:
try:
hours, minutes, seconds = map(int, input_str.split(':'))
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
except Exception:
return timedelta()
def str_to_direction(input_str: str) -> SunDirection:
try:
return getattr(SunDirection, input_str.upper())
except AttributeError:
raise PydanticCustomError(
'invalid_dir', 'Invalid sun direction: {dir}', dict(dir=input_str)
)
OffDuration = Annotated[timedelta, BeforeValidator(str_to_timedelta)]
class State(BaseModel):
state: bool = True
brightness: Optional[int] = Field(default=None, ge=1, le=255)
color_temp: Optional[int] = Field(default=None, ge=200, le=650)
rgb_color: Optional[list[int]] = Field(default=None, min_length=3, max_length=3)
class ApplyKwargs(BaseModel):
"""Arguments to call with the 'scene/apply' service"""
entities: Dict[str, State]
transition: Optional[int] = None
class ControllerStateConfig(BaseModel):
time: Optional[str | datetime] = None
elevation: Optional[float] = None
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None
off_duration: Optional[OffDuration] = None
scene: dict[str, State] | str
@model_validator(mode='before')
def check_args(cls, values):
time, elevation = values.get('time'), values.get('elevation')
if time is not None and elevation is not None:
raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
elif elevation is not None and 'direction' not in values:
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
return values
def to_apply_kwargs(self, **kwargs):
return ApplyKwargs(entities=self.scene, **kwargs).model_dump(exclude_none=True)
class RoomControllerConfig(BaseModel):
states: List[ControllerStateConfig] = Field(default_factory=list)
off_duration: Optional[OffDuration] = None
sleep_state: Optional[ControllerStateConfig] = None
rich: Optional[str] = None
manual_mode: Optional[str] = None
@classmethod
def from_yaml(cls: Self, yaml_path: Path) -> Self:
yaml_path = Path(yaml_path)
with yaml_path.open('r') as f:
for appname, app_cfg in yaml.load(f, Loader=yaml.SafeLoader).items():
if app_cfg['class'] == 'RoomController':
return cls.model_validate(app_cfg)
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
table = Table(
Column('Time', width=15),
Column('Scene'),
highlight=True,
padding=1,
collapse_padding=True,
)
for state in self.states:
if isinstance(state, str):
pass
elif isinstance(state, dict):
scene_json = state.to_apply_kwargs()
lines = [
f'{name:20}{state["state"]} Brightness: {state.get("brightness", ""):<4} Temp: {state.get("color_temp", "")}'
for name, state in scene_json['entities'].items()
]
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
yield table
def sort_states(self):
"""Should only be called after all the times have been resolved"""
assert all(
isinstance(state.time, time) for state in self.states
), 'Times have not all been resolved yet'
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
def current_state(self, now: time) -> ControllerStateConfig:
self.sort_states()
for state in self.states:
if state.time <= now:
return state
else:
return self.states[0]
def current_scene(self, now: time) -> Dict:
state = self.current_state(now)
return state.scene
def current_off_duration(self, now: time) -> timedelta:
state = self.current_state(now)
if state.off_duration is None:
if self.off_duration is None:
raise ValueError('Need an off duration')
else:
return self.off_duration
else:
return state.off_duration
class ButtonConfig(BaseModel):
app: str
button: str | List[str]
ref_entity: str

View File

@@ -1,165 +0,0 @@
import re
from datetime import timedelta
from typing import Literal, Optional
from appdaemon.entity import Entity
from appdaemon.plugins.hass.hassapi import Hass
from pydantic import BaseModel, ValidationError
from .base import RoomControlBase
class CallbackEntry(BaseModel):
entity: str
event: Optional[str] = None
type: Literal['state', 'event']
kwargs: str
function: str
name: str
pin_app: bool
pin_thread: int
Callbacks = dict[str, dict[str, CallbackEntry]]
class Motion(RoomControlBase, Hass):
@property
def sensor(self) -> Entity:
return self.get_entity(self.args['sensor'])
@property
def sensor_state(self) -> bool:
return self.sensor.state == 'on'
@property
def ref_entity(self) -> Entity:
return self.get_entity(self.args['ref_entity'])
@property
def ref_entity_state(self) -> bool:
return self.ref_entity.get_state() == 'on'
@property
def state_mismatch(self) -> bool:
return self.sensor_state != self.ref_entity_state
def initialize(self):
super().initialize(room=self.args['app'], component='Motion')
assert self.entity_exists(self.args['sensor'])
assert self.entity_exists(self.args['ref_entity'])
base_kwargs = dict(
entity_id=self.ref_entity.entity_id,
immediate=True, # avoids needing to sync the state
)
if self.state_mismatch:
self.log(
f'Sensor is {self.sensor_state} ' f'and light is {self.ref_entity_state}',
level='WARNING',
)
if self.sensor_state:
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
self.listen_state(
**base_kwargs,
attribute='brightness',
callback=self.callback_light_on,
)
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
def callbacks(self):
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
self_callbacks = self.get_callback_entries().get(self.name, {})
for handle, cb_dict in self_callbacks.items():
try:
yield handle, CallbackEntry.model_validate(cb_dict)
except ValidationError as e:
self.logger.error('Error parsing callback dictionary')
self.logger.error(e)
def listen_motion_on(self):
"""Sets up the motion on callback to activate the room"""
self.cancel_motion_callback()
self.listen_state(
callback=self.app.activate_all_off,
entity_id=self.sensor.entity_id,
new='on',
oneshot=True,
cause='motion on',
)
self.log(f'Waiting for sensor motion on [friendly_name]{self.sensor.friendly_name}[/]')
if self.sensor_state:
self.log(
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is already on',
level='WARNING',
)
def listen_motion_off(self, duration: timedelta):
"""Sets up the motion off callback to deactivate the room"""
self.cancel_motion_callback()
self.listen_state(
callback=self.app.deactivate,
entity_id=self.sensor.entity_id,
new='off',
duration=duration.total_seconds(),
oneshot=True,
cause='motion off',
)
self.log(
f'Waiting for sensor [friendly_name]{self.sensor.friendly_name}[/] to be clear for {duration}'
)
if not self.sensor_state:
self.log(
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is currently off',
level='WARNING',
)
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns on"""
if new is not None:
self.log(f'Detected {entity} turning on', level='DEBUG')
duration = self.app.off_duration()
self.listen_motion_off(duration)
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
"""Called when the light turns off"""
self.log(f'Detected {entity} turning off', level='DEBUG')
self.listen_motion_on()
def get_app_callbacks(self, name: str = None):
"""Gets all the callbacks associated with the app"""
name = name or self.name
callbacks = {
handle: info
for app_name, callbacks in self.get_callback_entries().items()
for handle, info in callbacks.items()
if app_name == name
}
return callbacks
def get_sensor_callbacks(self):
return {
handle: info
for handle, info in self.get_app_callbacks().items()
if info['entity'] == self.sensor.entity_id
}
def cancel_motion_callback(self):
callbacks = self.get_sensor_callbacks()
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
for handle, info in callbacks.items():
entity = info['entity']
kwargs = info['kwargs']
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
new = m.group('new')
self.cancel_listen_state(handle)
self.log(
f'cancelled callback for sensor {entity} turning {new}',
level='DEBUG',
)