import asyncio import logging from dataclasses import dataclass, field from logging import Logger, getLogger from threading import Event, RLock from typing import Callable, Coroutine from appdaemon.models import AppDaemonConfig from context_manager import AppDaemonRunContext @dataclass class ADSubsystem: AD: "AppDaemon" stop: Event lock: RLock = field(default_factory=RLock) logger: Logger = field(init=False) def __post_init__(self) -> None: name = f'_{self.__class__.__name__.lower()}' self.logger = getLogger(f'AppDaemon.{name}') self.AD.starts.append(self.start) @property def stopping(self) -> bool: return self.stop.is_set() def start(self): raise NotImplementedError('Need to implement start for subsystem') @dataclass class Utility(ADSubsystem): loop_rate: float = 1.0 def start(self): self.AD.create_task(self.loop(), 'Utility loop') async def loop(self): while not self.stopping: self.logger.info('Looping...') await asyncio.sleep(self.loop_rate) self.logger.debug('Stopped utility loop') @dataclass class Plugin(ADSubsystem): state: dict[str, int] = field(default_factory=dict) update_rate: float = 30.0 def __post_init__(self) -> None: super().__post_init__() self.state['update_count'] = 0 def start(self): self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update') async def periodic_self_udpate(self): while not self.stopping: with self.lock: self.state['update_count'] += 1 self.logger.info(f'Updated self: {self.state["update_count"]}') await asyncio.sleep(self.update_rate) self.logger.debug('Stopped plugin updates') @dataclass class AppDaemon: cfg: AppDaemonConfig context: AppDaemonRunContext utility: Utility = field(init=False) plugins: dict[str, Plugin] = field(default_factory=dict) starts: list[Callable] = field(default_factory=list) def __post_init__(self) -> None: self.utility = Utility(self, self.context.stop_event) self.plugins['dummy'] = Plugin(self, self.context.stop_event) def create_task(self, coro: Coroutine, name: str | None = None): return self.context.loop.create_task(coro, name=name) def start(self): for start in self.starts: start() @dataclass class ADMain: config_file: str cfg: AppDaemonConfig = field(init=False) def __post_init__(self) -> None: raw_cfg = read_config_file(self.config_file) self.cfg = AppDaemonConfig( config_file=self.config_file, **raw_cfg['appdaemon'] ) def run(self): with AppDaemonRunContext() as cm: ad = AppDaemon(self.cfg, cm) ad.start() cm.loop.run_forever() if __name__ == '__main__': import logging.config from appdaemon.utils import read_config_file from rich.console import Console from rich.highlighter import NullHighlighter console = Console() logging.config.dictConfig( { 'version': 1, 'disable_existing_loggers': False, 'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}}, 'handlers': { 'rich': { '()': 'rich.logging.RichHandler', 'formatter': 'basic', 'console': console, 'highlighter': NullHighlighter(), 'markup': True } }, 'root': {'level': 'DEBUG', 'handlers': ['rich']}, } ) main = ADMain('/conf/ad-test/conf/appdaemon.yaml') main.run() # config_file = '/conf/ad-test/conf/appdaemon.yaml' # raw_cfg = read_config_file(config_file) # cfg = AppDaemonConfig( # config_file=config_file, # **raw_cfg['appdaemon'] # ) # with AppDaemonRunContext() as cm: # ad = AppDaemon(cfg, cm) # # ad.start() # # cm.loop.run_forever()