Files
appdaemon_snippets/appdaemon/subsystem.py
2024-10-20 20:56:18 +00:00

147 lines
4.0 KiB
Python

import asyncio
import logging
from dataclasses import dataclass, field
from logging import Logger, getLogger
from threading import Event, RLock
from typing import Callable, Coroutine
from appdaemon.models import AppDaemonConfig
from context_manager import AppDaemonRunContext
@dataclass
class ADSubsystem:
AD: "AppDaemon"
stop: Event
lock: RLock = field(default_factory=RLock)
logger: Logger = field(init=False)
def __post_init__(self) -> None:
name = f'_{self.__class__.__name__.lower()}'
self.logger = getLogger(f'AppDaemon.{name}')
self.AD.starts.append(self.start)
@property
def stopping(self) -> bool:
return self.stop.is_set()
def start(self):
raise NotImplementedError('Need to implement start for subsystem')
@dataclass
class Utility(ADSubsystem):
loop_rate: float = 1.0
def start(self):
self.AD.create_task(self.loop(), 'Utility loop')
async def loop(self):
while not self.stopping:
self.logger.info('Looping...')
await asyncio.sleep(self.loop_rate)
self.logger.debug('Stopped utility loop')
@dataclass
class Plugin(ADSubsystem):
state: dict[str, int] = field(default_factory=dict)
update_rate: float = 10.0
def __post_init__(self) -> None:
super().__post_init__()
self.state['update_count'] = 0
def start(self):
self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update')
async def periodic_self_udpate(self):
while not self.stopping:
with self.lock:
self.state['update_count'] += 1
self.logger.info(f'Updated self: {self.state["update_count"]}')
await asyncio.sleep(self.update_rate)
self.logger.debug('Stopped plugin updates')
@dataclass
class AppDaemon:
cfg: AppDaemonConfig
context: AppDaemonRunContext
utility: Utility = field(init=False)
plugins: dict[str, Plugin] = field(default_factory=dict)
starts: list[Callable] = field(default_factory=list)
def __post_init__(self) -> None:
self.utility = Utility(self, self.context.stop_event)
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
def create_task(self, coro: Coroutine, name: str | None = None):
return self.context.loop.create_task(coro, name=name)
def start(self):
for start in self.starts:
start()
@dataclass
class ADMain:
config_file: str
cfg: AppDaemonConfig = field(init=False)
def __post_init__(self) -> None:
raw_cfg = read_config_file(self.config_file)
self.cfg = AppDaemonConfig(
config_file=self.config_file,
**raw_cfg['appdaemon']
)
def run(self):
with AppDaemonRunContext() as cm:
ad = AppDaemon(self.cfg, cm)
ad.start()
cm.loop.run_forever()
if __name__ == '__main__':
import logging.config
from appdaemon.utils import read_config_file
from rich.console import Console
from rich.highlighter import NullHighlighter
console = Console()
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'formatters': {'basic': {'style': '{', 'format': '{message}'}},
'handlers': {
'rich': {
'()': 'rich.logging.RichHandler',
'formatter': 'basic',
'console': console,
'highlighter': NullHighlighter(),
'markup': True
}
},
'root': {'level': 'DEBUG', 'handlers': ['rich']},
}
)
main = ADMain('/conf/ad-test/conf/appdaemon.yaml')
main.run()
# config_file = '/conf/ad-test/conf/appdaemon.yaml'
# raw_cfg = read_config_file(config_file)
# cfg = AppDaemonConfig(
# config_file=config_file,
# **raw_cfg['appdaemon']
# )
# with AppDaemonRunContext() as cm:
# ad = AppDaemon(cfg, cm)
# # ad.start()
# # cm.loop.run_forever()