made RoomControllerBase
This commit is contained in:
152
src/room_control/base.py
Normal file
152
src/room_control/base.py
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging.config
|
||||||
|
from logging import Handler, LogRecord
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from appdaemon.adapi import ADAPI
|
||||||
|
|
||||||
|
from .console import RCHighlighter, console
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from room_control import RoomController
|
||||||
|
|
||||||
|
|
||||||
|
class Singleton(type):
|
||||||
|
"""
|
||||||
|
https://en.wikipedia.org/wiki/Singleton_pattern
|
||||||
|
https://stackoverflow.com/q/6760685
|
||||||
|
https://realpython.com/python-metaclasses/
|
||||||
|
https://docs.python.org/3/reference/datamodel.html#metaclasses
|
||||||
|
"""
|
||||||
|
|
||||||
|
_instances = {}
|
||||||
|
|
||||||
|
def __call__(cls, *args, **kwargs):
|
||||||
|
if cls not in cls._instances:
|
||||||
|
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
||||||
|
return cls._instances[cls]
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncLokiHandler(Handler, metaclass=Singleton):
|
||||||
|
loop: asyncio.BaseEventLoop
|
||||||
|
loki_url: str
|
||||||
|
queue: asyncio.Queue
|
||||||
|
consumer_task: asyncio.Task
|
||||||
|
|
||||||
|
def __init__(self, loop: asyncio.BaseEventLoop, loki_url: str):
|
||||||
|
console.print(f' [bold yellow]{hex(id(self))}[/] '.center(50, '-'))
|
||||||
|
super().__init__()
|
||||||
|
self.loop = loop
|
||||||
|
self.loki_url = loki_url
|
||||||
|
self.queue = asyncio.Queue()
|
||||||
|
self.consumer_task = self.loop.create_task(self.log_consumer())
|
||||||
|
|
||||||
|
def emit(self, record: LogRecord):
|
||||||
|
self.loop.create_task(self.send_to_loki(record))
|
||||||
|
|
||||||
|
async def send_to_loki(self, record: LogRecord):
|
||||||
|
message = self.format(record)
|
||||||
|
ns = round(record.created * 1_000_000_000)
|
||||||
|
|
||||||
|
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
|
||||||
|
payload = {
|
||||||
|
'streams': [
|
||||||
|
{
|
||||||
|
'stream': {'app': 'loki'},
|
||||||
|
'values': [[str(ns), message]],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
await self.queue.put(payload)
|
||||||
|
|
||||||
|
async def log_consumer(self):
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
payload = await self.queue.get()
|
||||||
|
await session.post(self.loki_url, json=payload, timeout=1)
|
||||||
|
# console.print('[bold yellow]Sent to Loki[/]')
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
console.print('[bold red]Cancelled[/]')
|
||||||
|
|
||||||
|
|
||||||
|
class RoomControlBase(ADAPI):
|
||||||
|
app: 'RoomController'
|
||||||
|
logger: logging.LoggerAdapter
|
||||||
|
|
||||||
|
def initialize(self, room: str, component: str = None):
|
||||||
|
"""Sets up the logging"""
|
||||||
|
logger_name = f'Appdaemon.{room}'
|
||||||
|
extra_attributes = {'room': room}
|
||||||
|
|
||||||
|
# all the stuff that has to happen for a component
|
||||||
|
if component is not None:
|
||||||
|
self.app: 'RoomController' = self.get_app(self.args['app'])
|
||||||
|
logger_name += f'.{component}'
|
||||||
|
extra_attributes['component'] = component
|
||||||
|
|
||||||
|
rich_handler_args = {
|
||||||
|
'console': console,
|
||||||
|
'highlighter': RCHighlighter(),
|
||||||
|
'markup': True,
|
||||||
|
'show_path': False,
|
||||||
|
'omit_repeated_times': False,
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.config.dictConfig(
|
||||||
|
{
|
||||||
|
'version': 1,
|
||||||
|
'disable_existing_loggers': False,
|
||||||
|
'formatters': {
|
||||||
|
'basic': {'style': '{', 'format': '{message}'},
|
||||||
|
'rich': {
|
||||||
|
'style': '{',
|
||||||
|
'format': '[room]{room}[/] {message}',
|
||||||
|
'datefmt': '%H:%M:%S.%f',
|
||||||
|
},
|
||||||
|
'rich_component': {
|
||||||
|
'style': '{',
|
||||||
|
'format': '[room]{room}[/] [component]{component}[/] {message}',
|
||||||
|
'datefmt': '%H:%M:%S.%f',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'handlers': {
|
||||||
|
'rich': {
|
||||||
|
'formatter': 'rich',
|
||||||
|
'()': 'rich.logging.RichHandler',
|
||||||
|
**rich_handler_args,
|
||||||
|
},
|
||||||
|
'rich_component': {
|
||||||
|
'formatter': 'rich_component',
|
||||||
|
'()': 'rich.logging.RichHandler',
|
||||||
|
**rich_handler_args,
|
||||||
|
},
|
||||||
|
'async_queue': {
|
||||||
|
'formatter': 'basic',
|
||||||
|
'()': 'room_control.base.AsyncLokiHandler',
|
||||||
|
'loop': self.AD.loop,
|
||||||
|
'loki_url': self.args['loki_url'],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'loggers': {
|
||||||
|
logger_name: {
|
||||||
|
'level': 'INFO',
|
||||||
|
'propagate': False,
|
||||||
|
'handlers': [
|
||||||
|
'rich' if component is None else 'rich_component',
|
||||||
|
'async_queue',
|
||||||
|
],
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
self.logger = logging.LoggerAdapter(logging.getLogger(logger_name), extra_attributes)
|
||||||
|
self.handler: AsyncLokiHandler = self.logger.logger.handlers[-1]
|
||||||
|
|
||||||
|
def terminate(self):
|
||||||
|
status: bool = self.handler.consumer_task.cancel()
|
||||||
|
if status:
|
||||||
|
self.log('Cancelled consumer task')
|
||||||
@@ -1,43 +1,27 @@
|
|||||||
import json
|
import json
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from logging import Logger
|
from typing import List
|
||||||
from typing import TYPE_CHECKING, List
|
|
||||||
|
|
||||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
||||||
|
|
||||||
from . import console
|
from .base import RoomControlBase
|
||||||
from .loki import LokiHandler
|
|
||||||
from .model import ButtonConfig
|
from .model import ButtonConfig
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from room_control import RoomController
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(init=False)
|
@dataclass(init=False)
|
||||||
class Button(Mqtt):
|
class Button(RoomControlBase, Mqtt):
|
||||||
button: str | List[str]
|
button: str | List[str]
|
||||||
rich: bool = False
|
rich: bool = False
|
||||||
config: ButtonConfig
|
config: ButtonConfig
|
||||||
logger: Logger
|
|
||||||
|
|
||||||
async def initialize(self):
|
def initialize(self):
|
||||||
self.app: 'RoomController' = await self.get_app(self.args['app'])
|
super().initialize(room=self.args['app'], component='Button')
|
||||||
self.configure_logging()
|
|
||||||
self.config = ButtonConfig(**self.args)
|
self.config = ButtonConfig(**self.args)
|
||||||
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
|
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
|
||||||
|
|
||||||
self.button = self.config.button
|
self.button = self.config.button
|
||||||
self.setup_buttons(self.button)
|
self.setup_buttons(self.button)
|
||||||
|
|
||||||
def configure_logging(self):
|
|
||||||
self.logger = console.load_rich_config(room=self.app.name, component=type(self).__name__)
|
|
||||||
if url := self.args.get('loki_url'):
|
|
||||||
logger: Logger = self.logger.logger
|
|
||||||
handler = LokiHandler(loop=self.AD.loop, loki_url=url)
|
|
||||||
handler.addFilter(console.UnMarkupFilter())
|
|
||||||
logger.addHandler(handler)
|
|
||||||
self.log('Added LokiHandler')
|
|
||||||
|
|
||||||
def setup_buttons(self, buttons):
|
def setup_buttons(self, buttons):
|
||||||
if isinstance(buttons, list):
|
if isinstance(buttons, list):
|
||||||
for button in buttons:
|
for button in buttons:
|
||||||
|
|||||||
@@ -62,6 +62,12 @@ def load_rich_config(
|
|||||||
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
|
RICH_CFG['handlers']['rich']['highlighter'] = RCHighlighter()
|
||||||
RICH_CFG['handlers']['rich_component']['console'] = console
|
RICH_CFG['handlers']['rich_component']['console'] = console
|
||||||
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
|
RICH_CFG['handlers']['rich_component']['highlighter'] = RCHighlighter()
|
||||||
|
# RICH_CFG['handlers']['async_queue'] = {
|
||||||
|
# 'formatter': 'basic',
|
||||||
|
# '()': 'room_control.loki.AsyncQueueHandler',
|
||||||
|
# 'loop': self.AD.loop,
|
||||||
|
# 'queue': self.loki_queue,
|
||||||
|
# }
|
||||||
RICH_CFG['loggers'] = {
|
RICH_CFG['loggers'] = {
|
||||||
logger_name: {
|
logger_name: {
|
||||||
'handlers': ['rich' if component is None else 'rich_component'],
|
'handlers': ['rich' if component is None else 'rich_component'],
|
||||||
|
|||||||
@@ -1,36 +1,17 @@
|
|||||||
from logging import Logger
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
from appdaemon.plugins.hass.hassapi import Hass
|
||||||
|
|
||||||
from . import console
|
from .base import RoomControlBase
|
||||||
from .loki import LokiHandler
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from room_control import RoomController
|
|
||||||
|
|
||||||
|
|
||||||
class Door(Hass):
|
class Door(RoomControlBase, Hass):
|
||||||
app: 'RoomController'
|
def initialize(self):
|
||||||
logger: Logger
|
super().initialize(room=self.args['app'], component='Door')
|
||||||
|
|
||||||
async def initialize(self):
|
self.listen_state(
|
||||||
self.app: 'RoomController' = await self.get_app(self.args['app'])
|
|
||||||
self.configure_logging()
|
|
||||||
self.log(f'Connected to AD app [room]{self.app.name}[/]', level='DEBUG')
|
|
||||||
|
|
||||||
await self.listen_state(
|
|
||||||
self.app.activate_all_off,
|
self.app.activate_all_off,
|
||||||
entity_id=self.args['door'],
|
entity_id=self.args['door'],
|
||||||
new='on',
|
new='on',
|
||||||
cause='door open',
|
cause='door open',
|
||||||
)
|
)
|
||||||
|
|
||||||
def configure_logging(self):
|
self.log(f'Waiting for door to open: [bold green]{self.args["door"]}[/]')
|
||||||
self.logger = console.load_rich_config(room=self.app.name, component=type(self).__name__)
|
|
||||||
if url := self.args.get('loki_url'):
|
|
||||||
logger: Logger = self.logger.logger
|
|
||||||
handler = LokiHandler(loop=self.AD.loop, loki_url=url)
|
|
||||||
handler.addFilter(console.UnMarkupFilter())
|
|
||||||
logger.addHandler(handler)
|
|
||||||
self.log('Added LokiHandler')
|
|
||||||
|
|||||||
@@ -1,69 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from logging import Handler, LogRecord
|
|
||||||
|
|
||||||
import aiohttp
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncQueueHandler(Handler):
|
|
||||||
loop: asyncio.BaseEventLoop
|
|
||||||
queue: asyncio.Queue
|
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, queue: asyncio.Queue):
|
|
||||||
super().__init__()
|
|
||||||
self.loop = loop
|
|
||||||
self.queue = queue
|
|
||||||
|
|
||||||
def emit(self, record: LogRecord):
|
|
||||||
self.loop.create_task(self.send_to_loki(record))
|
|
||||||
|
|
||||||
async def send_to_loki(self, record: LogRecord):
|
|
||||||
message = self.format(record)
|
|
||||||
ns = round(record.created * 1_000_000_000)
|
|
||||||
|
|
||||||
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
|
|
||||||
payload = {
|
|
||||||
'streams': [
|
|
||||||
{
|
|
||||||
'stream': {'app': 'loki'},
|
|
||||||
'values': [[str(ns), message]],
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
await self.queue.put(payload)
|
|
||||||
|
|
||||||
|
|
||||||
class LokiHandler(Handler):
|
|
||||||
loop: asyncio.BaseEventLoop
|
|
||||||
loki_url: str
|
|
||||||
level: int | str = 0
|
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, loki_url: str, level: int | str = 0) -> None:
|
|
||||||
self.loop: asyncio.BaseEventLoop = loop
|
|
||||||
self.loki_url: str = loki_url
|
|
||||||
super().__init__(level)
|
|
||||||
|
|
||||||
def emit(self, record: LogRecord) -> None:
|
|
||||||
self.loop.create_task(self.send_to_loki(record))
|
|
||||||
|
|
||||||
async def send_to_loki(self, record: LogRecord):
|
|
||||||
message = self.format(record)
|
|
||||||
ns = round(record.created * 1_000_000_000)
|
|
||||||
|
|
||||||
labels = {'level': record.levelname, 'room': record.room}
|
|
||||||
|
|
||||||
if comp := getattr(record, 'component', None):
|
|
||||||
labels['component'] = comp
|
|
||||||
|
|
||||||
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
|
|
||||||
payload = {
|
|
||||||
'streams': [
|
|
||||||
{
|
|
||||||
'stream': labels,
|
|
||||||
'values': [[str(ns), message]],
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
|
||||||
await session.post(self.loki_url, json=payload, timeout=3)
|
|
||||||
@@ -1,17 +1,12 @@
|
|||||||
import re
|
import re
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from logging import Logger
|
from typing import Literal, Optional
|
||||||
from typing import TYPE_CHECKING, Literal, Optional
|
|
||||||
|
|
||||||
from appdaemon.entity import Entity
|
from appdaemon.entity import Entity
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
from appdaemon.plugins.hass.hassapi import Hass
|
||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
from . import console
|
from .base import RoomControlBase
|
||||||
from .loki import LokiHandler
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from room_control import RoomController
|
|
||||||
|
|
||||||
|
|
||||||
class CallbackEntry(BaseModel):
|
class CallbackEntry(BaseModel):
|
||||||
@@ -28,10 +23,7 @@ class CallbackEntry(BaseModel):
|
|||||||
Callbacks = dict[str, dict[str, CallbackEntry]]
|
Callbacks = dict[str, dict[str, CallbackEntry]]
|
||||||
|
|
||||||
|
|
||||||
class Motion(Hass):
|
class Motion(RoomControlBase, Hass):
|
||||||
logger: Logger
|
|
||||||
app: 'RoomController'
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def sensor(self) -> Entity:
|
def sensor(self) -> Entity:
|
||||||
return self.get_entity(self.args['sensor'])
|
return self.get_entity(self.args['sensor'])
|
||||||
@@ -53,8 +45,7 @@ class Motion(Hass):
|
|||||||
return self.sensor_state != self.ref_entity_state
|
return self.sensor_state != self.ref_entity_state
|
||||||
|
|
||||||
def initialize(self):
|
def initialize(self):
|
||||||
self.app: 'RoomController' = self.get_app(self.args['app'])
|
super().initialize(room=self.args['app'], component='Motion')
|
||||||
self.configure_logging()
|
|
||||||
|
|
||||||
assert self.entity_exists(self.args['sensor'])
|
assert self.entity_exists(self.args['sensor'])
|
||||||
assert self.entity_exists(self.args['ref_entity'])
|
assert self.entity_exists(self.args['ref_entity'])
|
||||||
@@ -72,7 +63,6 @@ class Motion(Hass):
|
|||||||
if self.sensor_state:
|
if self.sensor_state:
|
||||||
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
|
self.app.activate(kwargs={'cause': f'Syncing state with {self.sensor.entity_id}'})
|
||||||
|
|
||||||
# don't need to await these because they'll already get turned into a task by the utils.sync_wrapper decorator
|
|
||||||
self.listen_state(
|
self.listen_state(
|
||||||
**base_kwargs,
|
**base_kwargs,
|
||||||
attribute='brightness',
|
attribute='brightness',
|
||||||
@@ -80,21 +70,8 @@ class Motion(Hass):
|
|||||||
)
|
)
|
||||||
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
|
self.listen_state(**base_kwargs, new='off', callback=self.callback_light_off)
|
||||||
|
|
||||||
for handle, cb in self.callbacks():
|
|
||||||
self.log(f'Handle [yellow]{handle[:4]}[/]: {cb.function}', level='DEBUG')
|
|
||||||
|
|
||||||
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
||||||
|
|
||||||
def configure_logging(self):
|
|
||||||
self.logger = console.load_rich_config(self.app.name, type(self).__name__)
|
|
||||||
|
|
||||||
if url := self.args.get('loki_url'):
|
|
||||||
logger: Logger = self.logger.logger
|
|
||||||
handler = LokiHandler(loop=self.AD.loop, loki_url=url)
|
|
||||||
handler.addFilter(console.UnMarkupFilter())
|
|
||||||
logger.addHandler(handler)
|
|
||||||
self.log('Added LokiHandler')
|
|
||||||
|
|
||||||
def callbacks(self):
|
def callbacks(self):
|
||||||
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
|
"""Returns a dictionary of validated CallbackEntry objects that are associated with this app"""
|
||||||
self_callbacks = self.get_callback_entries().get(self.name, {})
|
self_callbacks = self.get_callback_entries().get(self.name, {})
|
||||||
|
|||||||
@@ -7,16 +7,14 @@ from typing import Dict, List
|
|||||||
|
|
||||||
from appdaemon.entity import Entity
|
from appdaemon.entity import Entity
|
||||||
from appdaemon.plugins.hass.hassapi import Hass
|
from appdaemon.plugins.hass.hassapi import Hass
|
||||||
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
|
||||||
|
|
||||||
from . import console
|
from .base import RoomControlBase
|
||||||
from .loki import LokiHandler
|
|
||||||
from .model import ControllerStateConfig, RoomControllerConfig
|
from .model import ControllerStateConfig, RoomControllerConfig
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class RoomController(Hass, Mqtt):
|
class RoomController(RoomControlBase, Hass):
|
||||||
"""Class for linking room's lights with a motion sensor.
|
"""Class for linking room's lights with a motion sensor.
|
||||||
|
|
||||||
- Separate the turning on and turning off functions.
|
- Separate the turning on and turning off functions.
|
||||||
@@ -36,9 +34,8 @@ class RoomController(Hass, Mqtt):
|
|||||||
self._room_config.states = new
|
self._room_config.states = new
|
||||||
|
|
||||||
def initialize(self):
|
def initialize(self):
|
||||||
self.configure_logging()
|
super().initialize(room=self.name)
|
||||||
self.app_entities = self.gather_app_entities()
|
self.app_entities = self.gather_app_entities()
|
||||||
# self.log(f'entities: {self.app_entities}')
|
|
||||||
self.refresh_state_times()
|
self.refresh_state_times()
|
||||||
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
self.run_daily(callback=self.refresh_state_times, start='00:00:00')
|
||||||
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
self.log(f'Initialized [bold green]{type(self).__name__}[/]')
|
||||||
@@ -46,16 +43,6 @@ class RoomController(Hass, Mqtt):
|
|||||||
def terminate(self):
|
def terminate(self):
|
||||||
self.log('[bold red]Terminating[/]', level='DEBUG')
|
self.log('[bold red]Terminating[/]', level='DEBUG')
|
||||||
|
|
||||||
def configure_logging(self):
|
|
||||||
self.logger = console.load_rich_config(self.name)
|
|
||||||
|
|
||||||
if url := self.args.get('loki_url'):
|
|
||||||
logger: logging.Logger = self.logger.logger
|
|
||||||
handler = LokiHandler(loop=self.AD.loop, loki_url=url)
|
|
||||||
handler.addFilter(console.UnMarkupFilter())
|
|
||||||
logger.addHandler(handler)
|
|
||||||
self.log('Added LokiHandler')
|
|
||||||
|
|
||||||
def gather_app_entities(self) -> List[str]:
|
def gather_app_entities(self) -> List[str]:
|
||||||
"""Returns a list of all the entities involved in any of the states"""
|
"""Returns a list of all the entities involved in any of the states"""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user