Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
72b4b7d72e | ||
|
|
b1638be692 | ||
|
|
8b7294b445 | ||
|
|
e4dada011c | ||
|
|
e3186f1b5e | ||
|
|
2180e544c0 | ||
|
|
82f43cd029 | ||
|
|
1b3fc4afb7 | ||
|
|
0f58ac4cc6 | ||
|
|
b278a3cda1 | ||
|
|
e4fcd757ce | ||
|
|
7cde3c75d8 |
@@ -12,16 +12,17 @@ dependencies = [
|
|||||||
"ruff>=0.4.2",
|
"ruff>=0.4.2",
|
||||||
]
|
]
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">= 3.8,<3.12"
|
requires-python = ">= 3.10,<3.13"
|
||||||
|
|
||||||
[tool.setuptools]
|
[build-system]
|
||||||
include-package-data = true
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
# [tool.setuptools.package-data]
|
[tool.hatch.metadata]
|
||||||
# mypkg = ["*.yaml"]
|
allow-direct-references = true
|
||||||
|
|
||||||
[tool.setuptools.data-files]
|
[tool.hatch.build.targets.wheel]
|
||||||
config = ["config/default_config.yaml"]
|
packages = ["src/room_control"]
|
||||||
|
|
||||||
[tool.ruff.format]
|
[tool.ruff.format]
|
||||||
quote-style = 'single'
|
quote-style = 'single'
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from .controller import RoomController
|
|
||||||
from .motion import Motion
|
|
||||||
from .button import Button
|
from .button import Button
|
||||||
from .door import Door
|
from .door import Door
|
||||||
|
from .motion import MotionSensor
|
||||||
|
from .room_control import RoomController
|
||||||
|
|
||||||
__all__ = ['RoomController', 'Motion', 'Button', 'Door']
|
__all__ = ['RoomController', 'MotionSensor', 'Button', 'Door']
|
||||||
|
|||||||
@@ -1,150 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import logging.config
|
|
||||||
from logging import Handler, LogRecord
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
import aiohttp
|
|
||||||
from appdaemon.adapi import ADAPI
|
|
||||||
|
|
||||||
from .console import RCHighlighter, console
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from room_control import RoomController
|
|
||||||
|
|
||||||
|
|
||||||
class Singleton(type):
|
|
||||||
"""
|
|
||||||
https://en.wikipedia.org/wiki/Singleton_pattern
|
|
||||||
https://stackoverflow.com/q/6760685
|
|
||||||
https://realpython.com/python-metaclasses/
|
|
||||||
https://docs.python.org/3/reference/datamodel.html#metaclasses
|
|
||||||
"""
|
|
||||||
|
|
||||||
_instances = {}
|
|
||||||
|
|
||||||
def __call__(cls, *args, **kwargs):
|
|
||||||
if cls not in cls._instances:
|
|
||||||
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
|
||||||
return cls._instances[cls]
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncLokiHandler(Handler, metaclass=Singleton):
|
|
||||||
loop: asyncio.BaseEventLoop
|
|
||||||
loki_url: str
|
|
||||||
queue: asyncio.Queue
|
|
||||||
consumer_task: asyncio.Task
|
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, loki_url: str):
|
|
||||||
console.print(f' [bold yellow]{hex(id(self))}[/] '.center(50, '-'))
|
|
||||||
super().__init__()
|
|
||||||
self.loop = loop
|
|
||||||
self.loki_url = loki_url
|
|
||||||
self.queue = asyncio.Queue()
|
|
||||||
self.consumer_task = self.loop.create_task(self.log_consumer())
|
|
||||||
|
|
||||||
def emit(self, record: LogRecord):
|
|
||||||
self.loop.create_task(self.send_to_loki(record))
|
|
||||||
|
|
||||||
async def send_to_loki(self, record: LogRecord):
|
|
||||||
labels = {'room': record.room}
|
|
||||||
if component := getattr(record, 'component', False):
|
|
||||||
labels['component'] = component
|
|
||||||
|
|
||||||
message = self.format(record)
|
|
||||||
ns = round(record.created * 1_000_000_000)
|
|
||||||
|
|
||||||
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
|
|
||||||
payload = {'streams': [{'stream': labels, 'values': [[str(ns), message]]}]}
|
|
||||||
await self.queue.put(payload)
|
|
||||||
|
|
||||||
async def log_consumer(self):
|
|
||||||
async with aiohttp.ClientSession() as session:
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
payload = await self.queue.get()
|
|
||||||
await session.post(self.loki_url, json=payload, timeout=1)
|
|
||||||
# console.print('[bold yellow]Sent to Loki[/]')
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
console.print('[bold red]Cancelled[/]')
|
|
||||||
|
|
||||||
|
|
||||||
class RoomControlBase(ADAPI):
|
|
||||||
app: 'RoomController'
|
|
||||||
logger: logging.LoggerAdapter
|
|
||||||
|
|
||||||
def initialize(self, room: str, component: str = None):
|
|
||||||
"""Sets up the logging"""
|
|
||||||
logger_name = f'Appdaemon.{room}'
|
|
||||||
extra_attributes = {'room': room}
|
|
||||||
|
|
||||||
# all the stuff that has to happen for a component
|
|
||||||
if component is not None:
|
|
||||||
self.app: 'RoomController' = self.get_app(self.args['app'])
|
|
||||||
logger_name += f'.{component}'
|
|
||||||
extra_attributes['component'] = component
|
|
||||||
|
|
||||||
rich_handler_args = {
|
|
||||||
'console': console,
|
|
||||||
'highlighter': RCHighlighter(),
|
|
||||||
'markup': True,
|
|
||||||
'show_path': False,
|
|
||||||
'omit_repeated_times': False,
|
|
||||||
}
|
|
||||||
|
|
||||||
logging.config.dictConfig(
|
|
||||||
{
|
|
||||||
'version': 1,
|
|
||||||
'disable_existing_loggers': False,
|
|
||||||
'filters': {'unmarkup': {'()': 'room_control.console.UnMarkupFilter'}},
|
|
||||||
'formatters': {
|
|
||||||
'basic': {'style': '{', 'format': '{message}'},
|
|
||||||
'rich': {
|
|
||||||
'style': '{',
|
|
||||||
'format': '[room]{room}[/] {message}',
|
|
||||||
'datefmt': '%H:%M:%S.%f',
|
|
||||||
},
|
|
||||||
'rich_component': {
|
|
||||||
'style': '{',
|
|
||||||
'format': '[room]{room}[/] [component]{component}[/] {message}',
|
|
||||||
'datefmt': '%H:%M:%S.%f',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'handlers': {
|
|
||||||
'rich': {
|
|
||||||
'formatter': 'rich',
|
|
||||||
'()': 'rich.logging.RichHandler',
|
|
||||||
**rich_handler_args,
|
|
||||||
},
|
|
||||||
'rich_component': {
|
|
||||||
'formatter': 'rich_component',
|
|
||||||
'()': 'rich.logging.RichHandler',
|
|
||||||
**rich_handler_args,
|
|
||||||
},
|
|
||||||
'async_queue': {
|
|
||||||
'formatter': 'basic',
|
|
||||||
'filters': ['unmarkup'],
|
|
||||||
'()': 'room_control.base.AsyncLokiHandler',
|
|
||||||
'loop': self.AD.loop,
|
|
||||||
'loki_url': self.args['loki_url'],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
'loggers': {
|
|
||||||
logger_name: {
|
|
||||||
'level': 'INFO',
|
|
||||||
'propagate': False,
|
|
||||||
'handlers': [
|
|
||||||
'rich' if component is None else 'rich_component',
|
|
||||||
'async_queue',
|
|
||||||
],
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
self.logger = logging.LoggerAdapter(logging.getLogger(logger_name), extra_attributes)
|
|
||||||
self.handler: AsyncLokiHandler = self.logger.logger.handlers[-1]
|
|
||||||
|
|
||||||
def terminate(self):
|
|
||||||
status: bool = self.handler.consumer_task.cancel()
|
|
||||||
if status:
|
|
||||||
self.log('Cancelled consumer task')
|
|
||||||
@@ -1,70 +1,95 @@
|
|||||||
import json
|
import json
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import List
|
from typing import TYPE_CHECKING, Any, Dict
|
||||||
|
|
||||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
from appdaemon.entity import Entity
|
||||||
|
|
||||||
from .base import RoomControlBase
|
from . import console
|
||||||
from .model import ButtonConfig
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .room_control import RoomController
|
||||||
|
|
||||||
|
|
||||||
@dataclass(init=False)
|
@dataclass
|
||||||
class Button(RoomControlBase, Mqtt):
|
class Button:
|
||||||
button: str | List[str]
|
adapi: 'RoomController'
|
||||||
rich: bool = False
|
button_name: str
|
||||||
config: ButtonConfig
|
|
||||||
|
|
||||||
def initialize(self):
|
def __post_init__(self):
|
||||||
super().initialize(room=self.args['app'], component='Button')
|
self.logger = console.load_rich_config(self.adapi.name, 'Button')
|
||||||
self.config = ButtonConfig(**self.args)
|
topic = f'zigbee2mqtt/{self.button_name}'
|
||||||
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
|
self.adapi.listen_event(
|
||||||
|
|
||||||
self.button = self.config.button
|
|
||||||
self.setup_buttons(self.button)
|
|
||||||
|
|
||||||
def setup_buttons(self, buttons):
|
|
||||||
if isinstance(buttons, list):
|
|
||||||
for button in buttons:
|
|
||||||
self.setup_button(button)
|
|
||||||
else:
|
|
||||||
self.setup_button(buttons)
|
|
||||||
|
|
||||||
def setup_button(self, name: str):
|
|
||||||
topic = f'zigbee2mqtt/{name}'
|
|
||||||
# self.mqtt_subscribe(topic, namespace='mqtt')
|
|
||||||
self.listen_event(
|
|
||||||
self.handle_button,
|
self.handle_button,
|
||||||
'MQTT_MESSAGE',
|
'MQTT_MESSAGE',
|
||||||
topic=topic,
|
topic=topic,
|
||||||
namespace='mqtt',
|
namespace='mqtt',
|
||||||
button=name,
|
button=self.button_name,
|
||||||
)
|
)
|
||||||
self.log(f'MQTT topic [topic]{topic}[/] controls app [room]{self.app.name}[/]')
|
self.logger.info(f'MQTT topic [topic]{topic}[/] controls [room]{self.adapi.name}[/]')
|
||||||
|
|
||||||
def handle_button(self, event_name, data, kwargs):
|
def handle_button(self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]):
|
||||||
|
if event_name == 'appd_started':
|
||||||
|
return
|
||||||
|
|
||||||
|
# self.logger.info(f'Button callback: {event_name}, {data}')
|
||||||
try:
|
try:
|
||||||
payload = json.loads(data['payload'])
|
payload = json.loads(data['payload'])
|
||||||
action = payload['action']
|
action = payload['action']
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
self.log(f'Error decoding JSON from {data["payload"]}', level='ERROR')
|
self.logger.error(f'Error decoding JSON from {data["payload"]}')
|
||||||
except KeyError:
|
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
if isinstance(action, str) and action != '':
|
self.do_action(action)
|
||||||
self.log(f'Action: [yellow]{action}[/]')
|
|
||||||
self.handle_action(action)
|
|
||||||
|
|
||||||
def handle_action(self, action: str):
|
def do_action(self, action: str):
|
||||||
if action == 'single':
|
"""Action can be single, double, or others"""
|
||||||
state = self.get_state(self.args['ref_entity'])
|
if isinstance(action, str) and action != '':
|
||||||
kwargs = {'kwargs': {'cause': f'button single click: toggle while {state}'}}
|
self.logger.info(f'Action: [yellow]{action}[/]')
|
||||||
|
if action == 'single':
|
||||||
|
self.adapi.call_service(
|
||||||
|
f'{self.adapi.name}/toggle', namespace='controller', cause='button'
|
||||||
|
)
|
||||||
|
|
||||||
if manual_entity := self.args.get('manual_mode'):
|
|
||||||
self.set_state(entity_id=manual_entity, state='off')
|
|
||||||
|
|
||||||
if state == 'on':
|
@dataclass
|
||||||
self.app.deactivate(**kwargs)
|
class VirtualButton(Button):
|
||||||
else:
|
def __post_init__(self):
|
||||||
self.app.activate(**kwargs)
|
self.logger = console.load_rich_config(self.adapi.name, 'Button')
|
||||||
|
|
||||||
|
friendly_name = self.adapi.name.title().replace('_', ' ') + ' Button'
|
||||||
|
|
||||||
|
kwargs = {'entity_id': self.eid, 'friendly_name': friendly_name}
|
||||||
|
|
||||||
|
if not self.adapi.entity_exists(self.eid):
|
||||||
|
self.adapi.set_state(state=self.adapi.get_now(), **kwargs)
|
||||||
|
self.logger.info(f'Created entity [green]{self.eid}[/]')
|
||||||
else:
|
else:
|
||||||
pass
|
self.adapi.set_state(**kwargs)
|
||||||
|
self.logger.info(f'Set friendly name [green]{self.virtual_entity.friendly_name}[/]')
|
||||||
|
|
||||||
|
self.adapi.listen_event(self.handle_virtual_button, entity_id=self.eid)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def eid(self) -> str:
|
||||||
|
return f'input_button.{self.adapi.name}'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def virtual_entity(self) -> Entity:
|
||||||
|
return self.adapi.get_entity(self.eid)
|
||||||
|
|
||||||
|
def handle_virtual_button(
|
||||||
|
self, event_name: str, data: Dict[str, Any], **kwargs: Dict[str, Any]
|
||||||
|
):
|
||||||
|
if (
|
||||||
|
event_name == 'call_service'
|
||||||
|
and data.get('service') == 'press'
|
||||||
|
and (sd := data.get('service_data'))
|
||||||
|
and sd.get('entity_id') == self.eid
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
if data['service_data']['entity_id'] == self.eid:
|
||||||
|
self.logger.info(f'Virtual button press: {event_name}')
|
||||||
|
# self.virtual_entity.set_state(state=datetime.now())
|
||||||
|
self.virtual_entity.set_state(state=self.adapi.get_now())
|
||||||
|
self.do_action('single')
|
||||||
|
except KeyError as e:
|
||||||
|
self.logger.error(f'Bad data from {event_name}: {json.dumps(data, indent=4)}')
|
||||||
|
|||||||
@@ -46,10 +46,8 @@ class RCHighlighter(RegexHighlighter):
|
|||||||
|
|
||||||
|
|
||||||
def load_rich_config(
|
def load_rich_config(
|
||||||
room: str = None, component: str = None, level: str = 'INFO'
|
room: str = None, component: str = None, level: str = None
|
||||||
) -> logging.LoggerAdapter:
|
) -> logging.LoggerAdapter:
|
||||||
"""Loads the config from the .rich_logging.yaml file, adds some bits, and returns a LoggerAdapter
|
|
||||||
"""
|
|
||||||
logger_name = f'Appdaemon.{room}'
|
logger_name = f'Appdaemon.{room}'
|
||||||
|
|
||||||
if component is not None:
|
if component is not None:
|
||||||
@@ -62,17 +60,10 @@ def load_rich_config(
|
|||||||
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
|
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
|
||||||
RICH_CFG['handlers']['rich_component']['console'] = console
|
RICH_CFG['handlers']['rich_component']['console'] = console
|
||||||
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
|
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
|
||||||
# RICH_CFG['handlers']['async_queue'] = {
|
|
||||||
# 'formatter': 'basic',
|
|
||||||
# '()': 'room_control.loki.AsyncQueueHandler',
|
|
||||||
# 'loop': self.AD.loop,
|
|
||||||
# 'queue': self.loki_queue,
|
|
||||||
# }
|
|
||||||
RICH_CFG['loggers'] = {
|
RICH_CFG['loggers'] = {
|
||||||
logger_name: {
|
logger_name: {
|
||||||
'handlers': ['rich' if component is None else 'rich_component'],
|
'handlers': ['rich' if component is None else 'rich_component'],
|
||||||
'propagate': False,
|
'propagate': False,
|
||||||
'level': level,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -81,17 +72,27 @@ def load_rich_config(
|
|||||||
if component is not None:
|
if component is not None:
|
||||||
extra['component'] = component
|
extra['component'] = component
|
||||||
|
|
||||||
|
if level is not None:
|
||||||
|
RICH_CFG['loggers'][logger_name]['level'] = level
|
||||||
|
|
||||||
logging.config.dictConfig(RICH_CFG)
|
logging.config.dictConfig(RICH_CFG)
|
||||||
logger = logging.getLogger(logger_name)
|
logger = logging.getLogger(logger_name)
|
||||||
adapter = logging.LoggerAdapter(logger, extra)
|
adapter = logging.LoggerAdapter(logger, extra)
|
||||||
return adapter
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
class ContextSettingFilter(logging.Filter, ABC):
|
RICH_HANDLER_CFG = {
|
||||||
"""Adds all the dataclass fields as attributes to LogRecord objects.
|
'()': 'rich.logging.RichHandler',
|
||||||
|
'markup': True,
|
||||||
|
'show_path': False,
|
||||||
|
# 'show_time': False,
|
||||||
|
'omit_repeated_times': False,
|
||||||
|
'console': console,
|
||||||
|
'highlighter': RCHighlighter(),
|
||||||
|
}
|
||||||
|
|
||||||
Intended to be inherited from by something that uses the dataclasses.dataclass decorator.
|
|
||||||
"""
|
class ContextSettingFilter(logging.Filter, ABC):
|
||||||
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
|
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
|
||||||
for name, val in asdict(self).items():
|
for name, val in asdict(self).items():
|
||||||
if val is not None:
|
if val is not None:
|
||||||
@@ -99,6 +100,12 @@ class ContextSettingFilter(logging.Filter, ABC):
|
|||||||
return record
|
return record
|
||||||
|
|
||||||
|
|
||||||
|
# @dataclass
|
||||||
|
# class RoomControllerFilter(ContextSettingFilter):
|
||||||
|
# room: str
|
||||||
|
# component: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class RoomFilter(logging.Filter):
|
class RoomFilter(logging.Filter):
|
||||||
"""Used to filter out messages that have a component field because they will have already been printed by their respective logger."""
|
"""Used to filter out messages that have a component field because they will have already been printed by their respective logger."""
|
||||||
|
|
||||||
@@ -106,6 +113,11 @@ class RoomFilter(logging.Filter):
|
|||||||
return not hasattr(record, 'component')
|
return not hasattr(record, 'component')
|
||||||
|
|
||||||
|
|
||||||
|
# class RoomControllerFormatter(logging.Formatter):
|
||||||
|
# def format(self, record: logging.LogRecord):
|
||||||
|
# return super().format(record)
|
||||||
|
|
||||||
|
|
||||||
class UnMarkupFilter(logging.Filter):
|
class UnMarkupFilter(logging.Filter):
|
||||||
md_regex = re.compile(r'(?P<open>\[.*?\])(?P<text>.*?)(?P<close>\[\/\])')
|
md_regex = re.compile(r'(?P<open>\[.*?\])(?P<text>.*?)(?P<close>\[\/\])')
|
||||||
|
|
||||||
@@ -119,17 +131,6 @@ class JSONFormatter(logging.Formatter):
|
|||||||
return json.dumps(record.__dict__)
|
return json.dumps(record.__dict__)
|
||||||
|
|
||||||
|
|
||||||
RICH_HANDLER_CFG = {
|
|
||||||
'()': 'rich.logging.RichHandler',
|
|
||||||
'markup': True,
|
|
||||||
'show_path': False,
|
|
||||||
# 'show_time': False,
|
|
||||||
'omit_repeated_times': False,
|
|
||||||
'console': console,
|
|
||||||
'highlighter': RCHighlighter(),
|
|
||||||
}
|
|
||||||
|
|
||||||
## currently unused
|
|
||||||
def room_logging_config(name: str):
|
def room_logging_config(name: str):
|
||||||
return {
|
return {
|
||||||
'version': 1,
|
'version': 1,
|
||||||
@@ -187,3 +188,52 @@ def room_logging_config(name: str):
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# def component_logging_config(parent_room: str, component: str):
|
||||||
|
# logger_name = f'AppDaemon.{parent_room}.{component}'
|
||||||
|
|
||||||
|
# cfg = load_rich_config()
|
||||||
|
|
||||||
|
# LOG_CFG = {
|
||||||
|
# 'version': 1,
|
||||||
|
# 'disable_existing_loggers': False,
|
||||||
|
# 'formatters': {
|
||||||
|
# 'rich_component': {
|
||||||
|
# 'style': '{',
|
||||||
|
# 'format': '[room]{room}[/] [component]{component}[/] {message}',
|
||||||
|
# 'datefmt': '%H:%M:%S.%f',
|
||||||
|
# },
|
||||||
|
# },
|
||||||
|
# 'handlers': {
|
||||||
|
# 'rich_component': {
|
||||||
|
# 'formatter': 'rich_component',
|
||||||
|
# **RICH_HANDLER_CFG,
|
||||||
|
# },
|
||||||
|
# },
|
||||||
|
# 'loggers': {
|
||||||
|
# logger_name: {
|
||||||
|
# # 'level': 'INFO',
|
||||||
|
# 'propagate': True,
|
||||||
|
# 'handlers': ['rich_component'],
|
||||||
|
# }
|
||||||
|
# },
|
||||||
|
# }
|
||||||
|
# return LOG_CFG
|
||||||
|
|
||||||
|
|
||||||
|
# def setup_component_logging(self) -> logging.Logger:
|
||||||
|
# """Creates a logger for a subcomponent with a RichHandler"""
|
||||||
|
# component = type(self).__name__
|
||||||
|
# parent = self.args['app']
|
||||||
|
# cfg_dict = component_logging_config(parent_room=parent, component=component)
|
||||||
|
# logger_name = next(iter(cfg_dict['loggers']))
|
||||||
|
|
||||||
|
# try:
|
||||||
|
# logging.config.dictConfig(cfg_dict)
|
||||||
|
# except Exception:
|
||||||
|
# console.print_exception()
|
||||||
|
# else:
|
||||||
|
# logger = logging.getLogger(logger_name)
|
||||||
|
# logger = logging.LoggerAdapter(logger, {'room': parent, 'component': component})
|
||||||
|
# return logger
|
||||||
|
|||||||
@@ -1,235 +0,0 @@
|
|||||||
import datetime
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
from copy import deepcopy
|
|
||||||
from typing import Dict, List
|
|
||||||
|
|
||||||
from appdaemon.entity import Entity
|
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
|
||||||
|
|
||||||
from .base import RoomControlBase
|
|
||||||
from .model import ControllerStateConfig, RoomControllerConfig
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class RoomController(RoomControlBase, Hass):
|
|
||||||
"""Class for linking room's lights with a motion sensor.
|
|
||||||
|
|
||||||
- Separate the turning on and turning off functions.
|
|
||||||
- Use the state change of the light to set up the event for changing to the other state
|
|
||||||
- `handle_on`
|
|
||||||
- `handle_off`
|
|
||||||
- When the light comes on, check if it's attributes match what they should, given the time.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def states(self) -> List[ControllerStateConfig]:
|
|
||||||
return self._room_config.states
|
|
||||||
|
|
||||||
@states.setter
|
|
||||||
def states(self, new: List[ControllerStateConfig]):
|
|
||||||
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
|
|
||||||
self._room_config.states = new
|
|
||||||
|
|
||||||
def initialize(self):
|
|
||||||
super().initialize(room=self.name)
|
|
||||||
self.app_entities = self.gather_app_entities()
|
|
||||||
self.refresh_state_times()
|
|
||||||
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
|
||||||
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
|
||||||
|
|
||||||
def terminate(self):
|
|
||||||
self.log('[bold red]Terminating[/]', level='DEBUG')
|
|
||||||
|
|
||||||
def gather_app_entities(self) -> List[str]:
|
|
||||||
"""Returns a list of all the entities involved in any of the states"""
|
|
||||||
|
|
||||||
def generator():
|
|
||||||
for settings in deepcopy(self.args['states']):
|
|
||||||
if scene := settings.get('scene'):
|
|
||||||
if isinstance(scene, str):
|
|
||||||
assert scene.startswith(
|
|
||||||
'scene.'
|
|
||||||
), f"Scene definition must start with 'scene.' for app {self.name}"
|
|
||||||
entity: Entity = self.get_entity(scene)
|
|
||||||
entity_state = entity.get_state('all')
|
|
||||||
attributes = entity_state['attributes']
|
|
||||||
for entity in attributes['entity_id']:
|
|
||||||
yield entity
|
|
||||||
else:
|
|
||||||
for key in scene.keys():
|
|
||||||
yield key
|
|
||||||
else:
|
|
||||||
yield self.args['entity']
|
|
||||||
|
|
||||||
return set(list(generator()))
|
|
||||||
|
|
||||||
def refresh_state_times(self, *args, **kwargs):
|
|
||||||
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
|
||||||
|
|
||||||
Parsed states have an absolute time for the current day.
|
|
||||||
"""
|
|
||||||
# re-parse the state strings into times for the current day
|
|
||||||
self._room_config = RoomControllerConfig.model_validate(self.args)
|
|
||||||
self.log(
|
|
||||||
f'{len(self._room_config.states)} states in the app configuration',
|
|
||||||
level='DEBUG',
|
|
||||||
)
|
|
||||||
|
|
||||||
for state in self._room_config.states:
|
|
||||||
if state.time is None and state.elevation is not None:
|
|
||||||
state.time = self.AD.sched.location.time_at_elevation(
|
|
||||||
elevation=state.elevation, direction=state.direction
|
|
||||||
).time()
|
|
||||||
elif isinstance(state.time, str):
|
|
||||||
state.time = self.parse_time(state.time)
|
|
||||||
|
|
||||||
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
|
|
||||||
|
|
||||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
|
||||||
|
|
||||||
# schedule the transitions
|
|
||||||
for state in self.states[::-1]:
|
|
||||||
# t: datetime.time = state['time']
|
|
||||||
t: datetime.time = state.time
|
|
||||||
try:
|
|
||||||
self.run_at(
|
|
||||||
callback=self.activate_any_on,
|
|
||||||
start=t.strftime('%H:%M:%S'),
|
|
||||||
cause='scheduled transition',
|
|
||||||
)
|
|
||||||
except ValueError:
|
|
||||||
# happens when the callback time is in the past
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
|
||||||
self.log(f'Failed with {type(e)}: {e}')
|
|
||||||
|
|
||||||
def current_state(self, now: datetime.time = None) -> ControllerStateConfig:
|
|
||||||
if self.sleep_bool():
|
|
||||||
self.log('sleep: active')
|
|
||||||
if state := self.args.get('sleep_state'):
|
|
||||||
return ControllerStateConfig(**state)
|
|
||||||
else:
|
|
||||||
return ControllerStateConfig(scene={})
|
|
||||||
else:
|
|
||||||
now = now or self.get_now().time().replace(microsecond=0)
|
|
||||||
self.log(f'Getting state for {now.strftime("%I:%M:%S %p")}', level='DEBUG')
|
|
||||||
|
|
||||||
state = self._room_config.current_state(now)
|
|
||||||
self.log(f'Current state: {state.time}', level='DEBUG')
|
|
||||||
return state
|
|
||||||
|
|
||||||
def app_entity_states(self) -> Dict[str, str]:
|
|
||||||
states = {entity: self.get_state(entity) for entity in self.app_entities}
|
|
||||||
return states
|
|
||||||
|
|
||||||
def all_off(self) -> bool:
|
|
||||||
""" "All off" is the logic opposite of "any on"
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: Whether all the lights associated with the app are off
|
|
||||||
"""
|
|
||||||
states = self.app_entity_states()
|
|
||||||
return all(state != 'on' for entity, state in states.items())
|
|
||||||
|
|
||||||
def any_on(self) -> bool:
|
|
||||||
""" "Any on" is the logic opposite of "all off"
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: Whether any of the lights associated with the app are on
|
|
||||||
"""
|
|
||||||
states = self.app_entity_states()
|
|
||||||
return any(state == 'on' for entity, state in states.items())
|
|
||||||
|
|
||||||
def sleep_bool(self) -> bool:
|
|
||||||
if sleep_var := self.args.get('sleep'):
|
|
||||||
return self.get_state(sleep_var) == 'on'
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def manual_mode(self) -> bool:
|
|
||||||
if manual_entity := self.args.get('manual_mode'):
|
|
||||||
return self.get_state(manual_entity) == 'on'
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# @sleep_bool.setter
|
|
||||||
# def sleep_bool(self, val) -> bool:
|
|
||||||
# if (sleep_var := self.args.get('sleep')):
|
|
||||||
# if isinstance(val, str):
|
|
||||||
# self.set_state(sleep_var, state=val)
|
|
||||||
# elif isinstance(val, bool):
|
|
||||||
# self.set_state(sleep_var, state='on' if val else 'off')
|
|
||||||
# else:
|
|
||||||
# raise ValueError('Sleep variable is undefined')
|
|
||||||
|
|
||||||
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
|
|
||||||
"""Determines the time that the motion sensor has to be clear before deactivating
|
|
||||||
|
|
||||||
Priority:
|
|
||||||
- Value in scene definition
|
|
||||||
- Default value
|
|
||||||
- Normal - value in app definition
|
|
||||||
- Sleep - 0
|
|
||||||
|
|
||||||
"""
|
|
||||||
sleep_mode_active = self.sleep_bool()
|
|
||||||
if sleep_mode_active:
|
|
||||||
self.log(f'Sleeping mode active: {sleep_mode_active}')
|
|
||||||
return datetime.timedelta()
|
|
||||||
else:
|
|
||||||
now = now or self.get_now().time()
|
|
||||||
return self._room_config.current_off_duration(now)
|
|
||||||
|
|
||||||
def activate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
if kwargs is not None:
|
|
||||||
cause = kwargs.get('cause', 'unknown')
|
|
||||||
else:
|
|
||||||
cause = 'unknown'
|
|
||||||
|
|
||||||
self.log(f'Activating: {cause}')
|
|
||||||
scene_kwargs = self.current_state().to_apply_kwargs(transition=0)
|
|
||||||
|
|
||||||
if isinstance(scene_kwargs, str):
|
|
||||||
self.turn_on(scene_kwargs)
|
|
||||||
self.log(f'Turned on scene: {scene_kwargs}')
|
|
||||||
|
|
||||||
elif isinstance(scene_kwargs, dict):
|
|
||||||
self.call_service('scene/apply', **scene_kwargs)
|
|
||||||
self.log(f'Applied scene:\n{json.dumps(scene_kwargs, indent=2)}', level='DEBUG')
|
|
||||||
|
|
||||||
elif scene_kwargs is None:
|
|
||||||
self.log('No scene, ignoring...')
|
|
||||||
# Need to act as if the light had just turned off to reset the motion (and maybe other things?)
|
|
||||||
# self.callback_light_off()
|
|
||||||
else:
|
|
||||||
self.log(f'ERROR: unknown scene: {scene_kwargs}')
|
|
||||||
|
|
||||||
def activate_all_off(self, *args, **kwargs):
|
|
||||||
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
|
|
||||||
if self.all_off():
|
|
||||||
self.activate(*args, **kwargs)
|
|
||||||
else:
|
|
||||||
self.log('Skipped activating - everything is not off')
|
|
||||||
|
|
||||||
def activate_any_on(self, *args, **kwargs):
|
|
||||||
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
|
|
||||||
if self.any_on() and not self.manual_mode():
|
|
||||||
self.activate(*args, **kwargs)
|
|
||||||
else:
|
|
||||||
self.log('Skipped activating - everything is off')
|
|
||||||
|
|
||||||
def toggle_activate(self, *args, **kwargs):
|
|
||||||
if self.any_on():
|
|
||||||
self.deactivate(*args, **kwargs)
|
|
||||||
else:
|
|
||||||
self.activate(*args, **kwargs)
|
|
||||||
|
|
||||||
def deactivate(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
cause = kwargs.get('cause', 'unknown')
|
|
||||||
self.log(f'Deactivating: {cause}')
|
|
||||||
for e in self.app_entities:
|
|
||||||
self.turn_off(e)
|
|
||||||
self.log(f'Turned off {e}')
|
|
||||||
@@ -1,17 +1,23 @@
|
|||||||
from appdaemon.plugins.hass.hassapi import Hass
|
from dataclasses import dataclass
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from .base import RoomControlBase
|
from . import console
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .room_control import RoomController
|
||||||
|
|
||||||
|
|
||||||
class Door(RoomControlBase, Hass):
|
@dataclass
|
||||||
def initialize(self):
|
class Door:
|
||||||
super().initialize(room=self.args['app'], component='Door')
|
adapi: 'RoomController'
|
||||||
|
entity_id: str
|
||||||
|
|
||||||
self.listen_state(
|
def __post_init__(self):
|
||||||
self.app.activate_all_off,
|
self.logger = console.load_rich_config(self.adapi.name, 'Door')
|
||||||
entity_id=self.args['door'],
|
|
||||||
|
self.adapi.listen_state(
|
||||||
|
lambda *args, **kwargs: self.adapi.activate_all_off(cause='door open'),
|
||||||
|
entity_id=self.entity_id,
|
||||||
new='on',
|
new='on',
|
||||||
cause='door open',
|
|
||||||
)
|
)
|
||||||
|
self.logger.debug(f'Initialized door for [room]{self.adapi.name}[/]')
|
||||||
self.log(f'Waiting for door to open: [bold green]{self.args["door"]}[/]')
|
|
||||||
|
|||||||
@@ -1,21 +1,21 @@
|
|||||||
from datetime import datetime, time, timedelta
|
import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Annotated, Dict, List, Optional, Self
|
from typing import Annotated, Dict, List, Optional, Self, Union
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from astral import SunDirection
|
from astral import SunDirection
|
||||||
from pydantic import BaseModel, BeforeValidator, Field, model_validator
|
from pydantic import BaseModel, BeforeValidator, Field, field_validator, model_validator
|
||||||
from pydantic_core import PydanticCustomError
|
from pydantic_core import PydanticCustomError
|
||||||
from rich.console import Console, ConsoleOptions, RenderResult
|
from rich.console import Console, ConsoleOptions, RenderResult
|
||||||
from rich.table import Column, Table
|
from rich.table import Column, Table
|
||||||
|
|
||||||
|
|
||||||
def str_to_timedelta(input_str: str) -> timedelta:
|
def str_to_timedelta(input_str: str) -> datetime.timedelta:
|
||||||
try:
|
try:
|
||||||
hours, minutes, seconds = map(int, input_str.split(':'))
|
hours, minutes, seconds = map(int, input_str.split(':'))
|
||||||
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
return datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||||
except Exception:
|
except Exception:
|
||||||
return timedelta()
|
return datetime.timedelta()
|
||||||
|
|
||||||
|
|
||||||
def str_to_direction(input_str: str) -> SunDirection:
|
def str_to_direction(input_str: str) -> SunDirection:
|
||||||
@@ -27,7 +27,7 @@ def str_to_direction(input_str: str) -> SunDirection:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
OffDuration = Annotated[timedelta, BeforeValidator(str_to_timedelta)]
|
OffDuration = Annotated[datetime.timedelta, BeforeValidator(str_to_timedelta)]
|
||||||
|
|
||||||
|
|
||||||
class State(BaseModel):
|
class State(BaseModel):
|
||||||
@@ -45,31 +45,47 @@ class ApplyKwargs(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class ControllerStateConfig(BaseModel):
|
class ControllerStateConfig(BaseModel):
|
||||||
time: Optional[str | datetime] = None
|
time: Optional[str | datetime.time | datetime.datetime] = None
|
||||||
elevation: Optional[float] = None
|
elevation: Optional[float] = None
|
||||||
direction: Optional[Annotated[SunDirection, BeforeValidator(str_to_direction)]] = None
|
direction: Optional[SunDirection] = None
|
||||||
off_duration: Optional[OffDuration] = None
|
off_duration: Optional[OffDuration] = None
|
||||||
scene: dict[str, State] | str
|
scene: dict[str, State] | str = Field(default_factory=dict)
|
||||||
|
|
||||||
@model_validator(mode='before')
|
@model_validator(mode='before')
|
||||||
def check_args(cls, values):
|
def check_args(cls, values):
|
||||||
time, elevation = values.get('time'), values.get('elevation')
|
if values.get('elevation') is not None and values.get('direction') is None:
|
||||||
if time is not None and elevation is not None:
|
|
||||||
raise PydanticCustomError('bad_time_spec', 'Only one of time or elevation can be set.')
|
|
||||||
elif elevation is not None and 'direction' not in values:
|
|
||||||
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
|
raise PydanticCustomError('no_sun_dir', 'Needs sun direction with elevation')
|
||||||
return values
|
return values
|
||||||
|
|
||||||
def to_apply_kwargs(self, **kwargs):
|
@field_validator('direction', mode='before')
|
||||||
return ApplyKwargs(entities=self.scene, **kwargs).model_dump(exclude_none=True)
|
@classmethod
|
||||||
|
def check_sun_dir(cls, val: int | str | SunDirection | None) -> SunDirection:
|
||||||
|
if isinstance(val, str):
|
||||||
|
print(f'Str sun direction: {val}')
|
||||||
|
return str_to_direction(val)
|
||||||
|
elif isinstance(val, int):
|
||||||
|
return SunDirection.SETTING if val < 0 else SunDirection.RISING
|
||||||
|
elif isinstance(val, SunDirection):
|
||||||
|
return val
|
||||||
|
|
||||||
|
def to_apply_kwargs(self, transition: int = None):
|
||||||
|
return ApplyKwargs(entities=self.scene, transition=transition).model_dump(exclude_none=True)
|
||||||
|
|
||||||
|
|
||||||
|
class MotionSensorConfig(BaseModel):
|
||||||
|
sensor: str
|
||||||
|
ref_entity: str
|
||||||
|
|
||||||
|
|
||||||
class RoomControllerConfig(BaseModel):
|
class RoomControllerConfig(BaseModel):
|
||||||
states: List[ControllerStateConfig] = Field(default_factory=list)
|
states: List[ControllerStateConfig] = Field(default_factory=list)
|
||||||
off_duration: Optional[OffDuration] = None
|
off_duration: Optional[OffDuration] = Field(default_factory=datetime.timedelta)
|
||||||
sleep_state: Optional[ControllerStateConfig] = None
|
sleep_state: Optional[ControllerStateConfig] = None
|
||||||
rich: Optional[str] = None
|
rich: Optional[str] = None
|
||||||
manual_mode: Optional[str] = None
|
manual_mode: Optional[str] = None
|
||||||
|
button: Optional[Union[str, List[str]]] = None
|
||||||
|
motion: Optional[MotionSensorConfig] = None
|
||||||
|
log_level: Optional[str] = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_yaml(cls: Self, yaml_path: Path) -> Self:
|
def from_yaml(cls: Self, yaml_path: Path) -> Self:
|
||||||
@@ -98,38 +114,3 @@ class RoomControllerConfig(BaseModel):
|
|||||||
]
|
]
|
||||||
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
|
table.add_row(state.time.strftime('%I:%M:%S %p'), '\n'.join(lines))
|
||||||
yield table
|
yield table
|
||||||
|
|
||||||
def sort_states(self):
|
|
||||||
"""Should only be called after all the times have been resolved"""
|
|
||||||
assert all(
|
|
||||||
isinstance(state.time, time) for state in self.states
|
|
||||||
), 'Times have not all been resolved yet'
|
|
||||||
self.states = sorted(self.states, key=lambda s: s.time, reverse=True)
|
|
||||||
|
|
||||||
def current_state(self, now: time) -> ControllerStateConfig:
|
|
||||||
self.sort_states()
|
|
||||||
for state in self.states:
|
|
||||||
if state.time <= now:
|
|
||||||
return state
|
|
||||||
else:
|
|
||||||
return self.states[0]
|
|
||||||
|
|
||||||
def current_scene(self, now: time) -> Dict:
|
|
||||||
state = self.current_state(now)
|
|
||||||
return state.scene
|
|
||||||
|
|
||||||
def current_off_duration(self, now: time) -> timedelta:
|
|
||||||
state = self.current_state(now)
|
|
||||||
if state.off_duration is None:
|
|
||||||
if self.off_duration is None:
|
|
||||||
raise ValueError('Need an off duration')
|
|
||||||
else:
|
|
||||||
return self.off_duration
|
|
||||||
else:
|
|
||||||
return state.off_duration
|
|
||||||
|
|
||||||
|
|
||||||
class ButtonConfig(BaseModel):
|
|
||||||
app: str
|
|
||||||
button: str | List[str]
|
|
||||||
ref_entity: str
|
|
||||||
|
|||||||
@@ -1,12 +1,14 @@
|
|||||||
import re
|
from dataclasses import dataclass
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from typing import Literal, Optional
|
from typing import TYPE_CHECKING, Literal, Optional
|
||||||
|
|
||||||
from appdaemon.entity import Entity
|
from appdaemon.entity import Entity
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
from pydantic import BaseModel
|
||||||
from pydantic import BaseModel, ValidationError
|
|
||||||
|
|
||||||
from .base import RoomControlBase
|
from . import console
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from room_control import RoomController
|
||||||
|
|
||||||
|
|
||||||
class CallbackEntry(BaseModel):
|
class CallbackEntry(BaseModel):
|
||||||
@@ -23,143 +25,103 @@ class CallbackEntry(BaseModel):
|
|||||||
Callbacks = dict[str, dict[str, CallbackEntry]]
|
Callbacks = dict[str, dict[str, CallbackEntry]]
|
||||||
|
|
||||||
|
|
||||||
class Motion(RoomControlBase, Hass):
|
@dataclass
|
||||||
|
class MotionSensor:
|
||||||
|
adapi: 'RoomController'
|
||||||
|
sensor_entity_id: str
|
||||||
|
ref_entity_id: str
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self.logger = console.load_rich_config(self.adapi.name, 'Motion')
|
||||||
|
|
||||||
|
assert self.sensor_entity.exists()
|
||||||
|
assert self.ref_entity.exists()
|
||||||
|
|
||||||
|
self.ref_entity.listen_state(self.light_state_callback, immediate=True)
|
||||||
|
self.match_new_state(new=self.sensor_entity.get_state())
|
||||||
|
self.logger.info('Initialized motion sensor')
|
||||||
|
|
||||||
|
def entity_callbacks(self, entity_id: str | None = None) -> dict[str, dict]:
|
||||||
|
callbacks = self.adapi.get_callback_entries()
|
||||||
|
if self.adapi.name in callbacks:
|
||||||
|
return {
|
||||||
|
handle: cb
|
||||||
|
for handle, cb in callbacks[self.adapi.name].items()
|
||||||
|
if cb.get('entity') == entity_id
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def sensor_callbacks(self) -> dict[str, dict]:
|
||||||
|
return self.entity_callbacks(entity_id=self.sensor_entity_id)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def sensor(self) -> Entity:
|
def sensor_entity(self) -> Entity:
|
||||||
return self.get_entity(self.args['sensor'])
|
return self.adapi.get_entity(self.sensor_entity_id)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def sensor_state(self) -> bool:
|
def sensor_state(self) -> bool:
|
||||||
return self.sensor.state == 'on'
|
return self.sensor_entity.get_state() == 'on'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ref_entity(self) -> Entity:
|
def ref_entity(self) -> Entity:
|
||||||
return self.get_entity(self.args['ref_entity'])
|
return self.adapi.get_entity(self.ref_entity_id)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ref_entity_state(self) -> bool:
|
def ref_state(self) -> bool:
|
||||||
return self.ref_entity.get_state() == 'on'
|
return self.ref_entity.get_state() == 'on'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state_mismatch(self) -> bool:
|
def state_mismatch(self) -> bool:
|
||||||
return self.sensor_state != self.ref_entity_state
|
return self.sensor_state != self.ref_state
|
||||||
|
|
||||||
def initialize(self):
|
def light_state_callback(self, entity: str, attribute: str, old: str, new: str, **kwargs: dict):
|
||||||
super().initialize(room=self.args['app'], component='Motion')
|
for handle in self.sensor_callbacks():
|
||||||
|
self.adapi.cancel_listen_state(handle)
|
||||||
|
self.match_new_state(new)
|
||||||
|
|
||||||
assert self.entity_exists(self.args['sensor'])
|
def match_new_state(self, new: Literal['on', 'off']):
|
||||||
assert self.entity_exists(self.args['ref_entity'])
|
match new:
|
||||||
|
case 'on':
|
||||||
base_kwargs = dict(
|
duration = self.adapi.off_duration()
|
||||||
entity_id=self.ref_entity.entity_id,
|
self.listen_motion_off(duration)
|
||||||
immediate=True, # avoids needing to sync the state
|
case 'off':
|
||||||
)
|
self.listen_motion_on()
|
||||||
|
|
||||||
if self.state_mismatch:
|
|
||||||
self.log(
|
|
||||||
f'Sensor is {self.sensor_state} ' f'and light is {self.ref_entity_state}',
|
|
||||||
level='WARNING',
|
|
||||||
)
|
|
||||||
if self.sensor_state:
|
|
||||||
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
|
|
||||||
|
|
||||||
self.listen_state(
|
|
||||||
**base_kwargs,
|
|
||||||
attribute='brightness',
|
|
||||||
callback=self.callback_light_on,
|
|
||||||
)
|
|
||||||
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
|
|
||||||
|
|
||||||
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
|
||||||
|
|
||||||
def callbacks(self):
|
|
||||||
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
|
|
||||||
self_callbacks = self.get_callback_entries().get(self.name, {})
|
|
||||||
for handle, cb_dict in self_callbacks.items():
|
|
||||||
try:
|
|
||||||
yield handle, CallbackEntry.model_validate(cb_dict)
|
|
||||||
except ValidationError as e:
|
|
||||||
self.logger.error('Error parsing callback dictionary')
|
|
||||||
self.logger.error(e)
|
|
||||||
|
|
||||||
def listen_motion_on(self):
|
def listen_motion_on(self):
|
||||||
"""Sets up the motion on callback to activate the room"""
|
"""Sets up the motion on callback to activate the room"""
|
||||||
self.cancel_motion_callback()
|
# self.cancel_motion_callback()
|
||||||
self.listen_state(
|
self.sensor_entity.listen_state(
|
||||||
callback=self.app.activate_all_off,
|
lambda *args, **kwargs: self.adapi.activate_all_off(cause='motion on'),
|
||||||
entity_id=self.sensor.entity_id,
|
|
||||||
new='on',
|
new='on',
|
||||||
oneshot=True,
|
oneshot=True,
|
||||||
cause='motion on',
|
|
||||||
)
|
)
|
||||||
self.log(f'Waiting for sensor motion on [friendly_name]{self.sensor.friendly_name}[/]')
|
self.logger.info(
|
||||||
|
'Waiting for sensor motion on '
|
||||||
|
f'[friendly_name]{self.sensor_entity.friendly_name}[/]'
|
||||||
|
)
|
||||||
if self.sensor_state:
|
if self.sensor_state:
|
||||||
self.log(
|
self.logger.warning(
|
||||||
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is already on',
|
'Sensor '
|
||||||
level='WARNING',
|
f'[friendly_name]{self.sensor_entity.friendly_name}[/] is already on',
|
||||||
)
|
)
|
||||||
|
|
||||||
def listen_motion_off(self, duration: timedelta):
|
def listen_motion_off(self, duration: timedelta):
|
||||||
"""Sets up the motion off callback to deactivate the room"""
|
"""Sets up the motion off callback to deactivate the room"""
|
||||||
self.cancel_motion_callback()
|
# self.cancel_motion_callback()
|
||||||
self.listen_state(
|
self.sensor_entity.listen_state(
|
||||||
callback=self.app.deactivate,
|
lambda *args, **kwargs: self.adapi.deactivate(cause='motion off'),
|
||||||
entity_id=self.sensor.entity_id,
|
|
||||||
new='off',
|
new='off',
|
||||||
duration=duration.total_seconds(),
|
duration=duration.total_seconds(),
|
||||||
oneshot=True,
|
oneshot=True,
|
||||||
cause='motion off',
|
|
||||||
)
|
)
|
||||||
self.log(
|
self.logger.debug(
|
||||||
f'Waiting for sensor [friendly_name]{self.sensor.friendly_name}[/] to be clear for {duration}'
|
'Waiting for sensor '
|
||||||
|
f'[friendly_name]{self.sensor_entity.friendly_name}[/] '
|
||||||
|
f'to be clear for {duration}'
|
||||||
)
|
)
|
||||||
|
|
||||||
if not self.sensor_state:
|
if not self.sensor_state:
|
||||||
self.log(
|
self.logger.warning(
|
||||||
f'Sensor [friendly_name]{self.sensor.friendly_name}[/] is currently off',
|
f'Sensor [friendly_name]{self.sensor_entity.friendly_name}[/] is currently off',
|
||||||
level='WARNING',
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def callback_light_on(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
"""Called when the light turns on"""
|
|
||||||
if new is not None:
|
|
||||||
self.log(f'Detected {entity} turning on', level='DEBUG')
|
|
||||||
duration = self.app.off_duration()
|
|
||||||
self.listen_motion_off(duration)
|
|
||||||
|
|
||||||
def callback_light_off(self, entity=None, attribute=None, old=None, new=None, kwargs=None):
|
|
||||||
"""Called when the light turns off"""
|
|
||||||
self.log(f'Detected {entity} turning off', level='DEBUG')
|
|
||||||
self.listen_motion_on()
|
|
||||||
|
|
||||||
def get_app_callbacks(self, name: str = None):
|
|
||||||
"""Gets all the callbacks associated with the app"""
|
|
||||||
name = name or self.name
|
|
||||||
callbacks = {
|
|
||||||
handle: info
|
|
||||||
for app_name, callbacks in self.get_callback_entries().items()
|
|
||||||
for handle, info in callbacks.items()
|
|
||||||
if app_name == name
|
|
||||||
}
|
|
||||||
return callbacks
|
|
||||||
|
|
||||||
def get_sensor_callbacks(self):
|
|
||||||
return {
|
|
||||||
handle: info
|
|
||||||
for handle, info in self.get_app_callbacks().items()
|
|
||||||
if info['entity'] == self.sensor.entity_id
|
|
||||||
}
|
|
||||||
|
|
||||||
def cancel_motion_callback(self):
|
|
||||||
callbacks = self.get_sensor_callbacks()
|
|
||||||
# self.log(f'Found {len(callbacks)} callbacks for {self.sensor.entity_id}')
|
|
||||||
for handle, info in callbacks.items():
|
|
||||||
entity = info['entity']
|
|
||||||
kwargs = info['kwargs']
|
|
||||||
if (m := re.match('new=(?P<new>.*?)\s', kwargs)) is not None:
|
|
||||||
new = m.group('new')
|
|
||||||
self.cancel_listen_state(handle)
|
|
||||||
self.log(
|
|
||||||
f'cancelled callback for sensor {entity} turning {new}',
|
|
||||||
level='DEBUG',
|
|
||||||
)
|
|
||||||
|
|||||||
291
src/room_control/room_control.py
Executable file
291
src/room_control/room_control.py
Executable file
@@ -0,0 +1,291 @@
|
|||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
import traceback
|
||||||
|
from functools import wraps
|
||||||
|
from typing import Dict, List, Set
|
||||||
|
|
||||||
|
from appdaemon.entity import Entity
|
||||||
|
from appdaemon.plugins.hass.hassapi import Hass
|
||||||
|
from astral.location import Location
|
||||||
|
|
||||||
|
from . import console
|
||||||
|
from .button import Button, VirtualButton
|
||||||
|
from .door import Door
|
||||||
|
from .model import ControllerStateConfig, RoomControllerConfig
|
||||||
|
from .motion import MotionSensor
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RoomController(Hass):
|
||||||
|
"""Class for linking room's lights with a motion sensor.
|
||||||
|
|
||||||
|
- Separate the turning on and turning off functions.
|
||||||
|
- Use the state change of the light to set up the event for changing to the other state
|
||||||
|
- `handle_on`
|
||||||
|
- `handle_off`
|
||||||
|
- When the light comes on, check if it's attributes match what they should, given the time.
|
||||||
|
|
||||||
|
|
||||||
|
## Services
|
||||||
|
|
||||||
|
- <name>/activate
|
||||||
|
- <name>/activate_all_off
|
||||||
|
- <name>/deactivate
|
||||||
|
- <name>/toggle
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def states(self) -> List[ControllerStateConfig]:
|
||||||
|
return self._room_config.states
|
||||||
|
|
||||||
|
@states.setter
|
||||||
|
def states(self, new: List[ControllerStateConfig]):
|
||||||
|
assert all(isinstance(s, ControllerStateConfig) for s in new), f'Invalid: {new}'
|
||||||
|
self._room_config.states = new
|
||||||
|
|
||||||
|
@property
|
||||||
|
@wraps(Location.time_at_elevation)
|
||||||
|
def time_at_elevation(self):
|
||||||
|
return self.AD.sched.location.time_at_elevation
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state_entity(self) -> Entity:
|
||||||
|
return self.get_entity(f'{self.name}.state', namespace='controller')
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
self.logger = console.load_rich_config(self.name, level=self.args.get('log_level', 'INFO'))
|
||||||
|
|
||||||
|
self.refresh_state_times()
|
||||||
|
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||||
|
|
||||||
|
self.register_service(
|
||||||
|
f'{self.name}/activate', self._service_activate, namespace='controller'
|
||||||
|
)
|
||||||
|
self.register_service(
|
||||||
|
f'{self.name}/activate_all_off', self._service_activate_all_off, namespace='controller'
|
||||||
|
)
|
||||||
|
self.register_service(
|
||||||
|
f'{self.name}/activate_any_on', self._service_activate_any_on, namespace='controller'
|
||||||
|
)
|
||||||
|
self.register_service(
|
||||||
|
f'{self.name}/deactivate', self._service_deactivate, namespace='controller'
|
||||||
|
)
|
||||||
|
self.register_service(f'{self.name}/toggle', self._service_toggle, namespace='controller')
|
||||||
|
|
||||||
|
# This needs to come after this first call of refresh_state_times
|
||||||
|
self.app_entities = self.get_app_entities()
|
||||||
|
self.log(f'entities: {self.app_entities}', level='DEBUG')
|
||||||
|
|
||||||
|
if button := self.args.get('button'):
|
||||||
|
if isinstance(button, str):
|
||||||
|
Button(self, button_name=button)
|
||||||
|
VirtualButton(self, button_name=button)
|
||||||
|
elif isinstance(button, list) and all(isinstance(b, str) for b in button):
|
||||||
|
for b in button:
|
||||||
|
Button(self, button_name=b)
|
||||||
|
VirtualButton(self, button_name=button)
|
||||||
|
|
||||||
|
if door := self.args.get('door'):
|
||||||
|
if isinstance(door, str):
|
||||||
|
self.door = Door(self, entity_id=door)
|
||||||
|
|
||||||
|
if motion := self.args.get('motion'):
|
||||||
|
self.motion = MotionSensor(
|
||||||
|
self, sensor_entity_id=motion['sensor'], ref_entity_id=motion['ref_entity']
|
||||||
|
)
|
||||||
|
|
||||||
|
state: ControllerStateConfig
|
||||||
|
for state in sorted(self._room_config.states, key=lambda s: s.time, reverse=True):
|
||||||
|
if isinstance(state.time, datetime.datetime):
|
||||||
|
t = state.time.time()
|
||||||
|
else:
|
||||||
|
t = state.time
|
||||||
|
if t < self.get_now().time():
|
||||||
|
self.log(f'Initial state: {state.time}', level='DEBUG')
|
||||||
|
self.set_controller_scene(state)
|
||||||
|
break
|
||||||
|
|
||||||
|
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
||||||
|
|
||||||
|
def terminate(self):
|
||||||
|
self.log('[bold red]Terminating[/]', level='DEBUG')
|
||||||
|
|
||||||
|
def get_app_entities(self) -> Set[str]:
|
||||||
|
"""Gets a set of all the entities referenced by any of the state definitions"""
|
||||||
|
|
||||||
|
def gen():
|
||||||
|
for state in self._room_config.states:
|
||||||
|
if isinstance(state.scene, str):
|
||||||
|
assert state.scene.startswith(
|
||||||
|
'scene.'
|
||||||
|
), "Scene definition must start with 'scene.'"
|
||||||
|
entities = self.get_state(state.scene, attribute='entity_id')
|
||||||
|
yield from entities
|
||||||
|
else:
|
||||||
|
yield from state.scene.keys()
|
||||||
|
|
||||||
|
return set(gen())
|
||||||
|
|
||||||
|
def refresh_state_times(self, *args, **kwargs):
|
||||||
|
"""Resets the `self.states` attribute to a newly parsed version of the states.
|
||||||
|
|
||||||
|
Parsed states have an absolute time for the current day.
|
||||||
|
"""
|
||||||
|
# re-parse the state strings into times for the current day
|
||||||
|
self._room_config = RoomControllerConfig.model_validate(self.args)
|
||||||
|
self.log(
|
||||||
|
f'{len(self._room_config.states)} states in the app configuration',
|
||||||
|
level='DEBUG',
|
||||||
|
)
|
||||||
|
|
||||||
|
for state in self._room_config.states:
|
||||||
|
if state.time is None and state.elevation is not None:
|
||||||
|
state.time = self.time_at_elevation(
|
||||||
|
elevation=state.elevation, direction=state.direction
|
||||||
|
).time()
|
||||||
|
elif isinstance(state.time, str):
|
||||||
|
state.time = self.parse_time(state.time)
|
||||||
|
|
||||||
|
assert isinstance(state.time, datetime.time), f'Invalid time: {state.time}'
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.run_at(
|
||||||
|
callback=lambda **kwargs: self.set_controller_scene(kwargs['state']),
|
||||||
|
start=state.time.strftime('%H:%M:%S'),
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.log(f'Failed with {type(e)}: {e}')
|
||||||
|
|
||||||
|
self.state_entity.listen_state(
|
||||||
|
lambda *args, **kwargs: self.call_service(
|
||||||
|
f'{self.name}/activate_any_on', namespace='controller', cause='state transition'
|
||||||
|
),
|
||||||
|
attribute='all',
|
||||||
|
)
|
||||||
|
|
||||||
|
def set_controller_scene(self, state: ControllerStateConfig):
|
||||||
|
"""Sets the internal state for the app. Only used by the scheduled transition"""
|
||||||
|
try:
|
||||||
|
self.state_entity.set_state(attributes=state.model_dump())
|
||||||
|
except Exception:
|
||||||
|
self.logger.error(traceback.format_exc())
|
||||||
|
else:
|
||||||
|
self.log(f'Set controller state of {self.name}: {state.model_dump()}', level='DEBUG')
|
||||||
|
|
||||||
|
def current_state(self) -> ControllerStateConfig:
|
||||||
|
if self.sleep_bool():
|
||||||
|
self.log('sleep: active', level='DEBUG')
|
||||||
|
if state := self.args.get('sleep_state'):
|
||||||
|
return ControllerStateConfig(**state)
|
||||||
|
else:
|
||||||
|
return ControllerStateConfig()
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
attrs = self.state_entity.get_state('all')['attributes']
|
||||||
|
state = ControllerStateConfig.model_validate(attrs)
|
||||||
|
except Exception as e:
|
||||||
|
state = ControllerStateConfig()
|
||||||
|
logger.exception(e)
|
||||||
|
finally:
|
||||||
|
# self.log(f'Current state: {state.model_dump(exclude_none=True)}', level='DEBUG')
|
||||||
|
return state
|
||||||
|
|
||||||
|
def activate(self, **kwargs):
|
||||||
|
self.call_service(f'{self.name}/activate', namespace='controller', **kwargs)
|
||||||
|
|
||||||
|
def _service_activate(self, namespace: str, domain: str, service: str, **kwargs):
|
||||||
|
# self.log(f'Custom kwargs: {kwargs}', level='DEBUG')
|
||||||
|
state = self.current_state()
|
||||||
|
if isinstance(state.scene, str):
|
||||||
|
self.turn_on(state.scene)
|
||||||
|
# self.turn_on(state.scene, transition=0)
|
||||||
|
elif isinstance(state.scene, dict):
|
||||||
|
scene = state.to_apply_kwargs()
|
||||||
|
self.call_service('scene/apply', **scene)
|
||||||
|
# scene = state.to_apply_kwargs(transition=0)
|
||||||
|
|
||||||
|
def activate_any_on(self, **kwargs):
|
||||||
|
"""Activate if any of the entities are on. Args and kwargs are passed directly to self.activate()"""
|
||||||
|
self.call_service(f'{self.name}/activate_any_on', namespace='controller', **kwargs)
|
||||||
|
|
||||||
|
def _service_activate_any_on(self, namespace: str, domain: str, service: str, **kwargs):
|
||||||
|
if self.any_on() and not self.manual_mode():
|
||||||
|
self.activate(**kwargs)
|
||||||
|
|
||||||
|
def activate_all_off(self, **kwargs):
|
||||||
|
"""Activate if all of the entities are off. Args and kwargs are passed directly to self.activate()"""
|
||||||
|
self.call_service(f'{self.name}/activate_all_off', namespace='controller', **kwargs)
|
||||||
|
|
||||||
|
def _service_activate_all_off(self, namespace: str, domain: str, service: str, **kwargs):
|
||||||
|
if self.all_off() and not self.manual_mode():
|
||||||
|
self.activate(**kwargs)
|
||||||
|
|
||||||
|
def deactivate(self, **kwargs):
|
||||||
|
self.call_service(f'{self.name}/deactivate', namespace='controller', **kwargs)
|
||||||
|
|
||||||
|
def _service_deactivate(self, namespace: str, domain: str, service: str, **kwargs):
|
||||||
|
for e in self.app_entities:
|
||||||
|
self.turn_off(e)
|
||||||
|
|
||||||
|
def toggle(self, **kwargs):
|
||||||
|
self.call_service(f'{self.name}/toggle', namespace='controller', **kwargs)
|
||||||
|
|
||||||
|
def _service_toggle(self, namespace: str, domain: str, service: str, **kwargs):
|
||||||
|
if self.any_on():
|
||||||
|
self.deactivate(**kwargs)
|
||||||
|
else:
|
||||||
|
self.activate(**kwargs)
|
||||||
|
|
||||||
|
def app_entity_states(self) -> Dict[str, str]:
|
||||||
|
states = {entity: self.get_state(entity) for entity in self.app_entities}
|
||||||
|
return states
|
||||||
|
|
||||||
|
def all_off(self) -> bool:
|
||||||
|
""" "All off" is the logic opposite of "any on"
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: Whether all the lights associated with the app are off
|
||||||
|
"""
|
||||||
|
states = self.app_entity_states()
|
||||||
|
return all(state != 'on' for entity, state in states.items())
|
||||||
|
|
||||||
|
def any_on(self) -> bool:
|
||||||
|
""" "Any on" is the logic opposite of "all off"
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: Whether any of the lights associated with the app are on
|
||||||
|
"""
|
||||||
|
states = self.app_entity_states()
|
||||||
|
return any(state == 'on' for entity, state in states.items())
|
||||||
|
|
||||||
|
def sleep_bool(self) -> bool:
|
||||||
|
if sleep_var := self.args.get('sleep'):
|
||||||
|
return self.get_state(sleep_var) == 'on'
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def manual_mode(self) -> bool:
|
||||||
|
if manual_entity := self.args.get('manual_mode'):
|
||||||
|
return self.get_state(manual_entity) == 'on'
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def off_duration(self, now: datetime.time = None) -> datetime.timedelta:
|
||||||
|
"""Determines the time that the motion sensor has to be clear before deactivating
|
||||||
|
|
||||||
|
Priority:
|
||||||
|
- Value in scene definition
|
||||||
|
- Default value
|
||||||
|
- Normal - value in app definition
|
||||||
|
- Sleep - 0
|
||||||
|
|
||||||
|
"""
|
||||||
|
if sleep_active := self.sleep_bool():
|
||||||
|
self.log(f'Sleeping mode active: {sleep_active}', level='DEBUG')
|
||||||
|
return datetime.timedelta()
|
||||||
|
else:
|
||||||
|
return self.current_state().off_duration or self._room_config.off_duration
|
||||||
Reference in New Issue
Block a user