From 8064a73e9f8d4346f5d979880338eeb0519cb1a4 Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Sun, 20 Oct 2024 20:56:18 +0000 Subject: [PATCH] context manager and subsystem work --- appdaemon/{appdaemon.py => appdaemon_dev.py} | 0 .../context_manager.py | 79 +++++++--- appdaemon/subsystem.py | 146 ++++++++++++++++++ 3 files changed, 206 insertions(+), 19 deletions(-) rename appdaemon/{appdaemon.py => appdaemon_dev.py} (100%) rename context_manager.py => appdaemon/context_manager.py (54%) create mode 100644 appdaemon/subsystem.py diff --git a/appdaemon/appdaemon.py b/appdaemon/appdaemon_dev.py similarity index 100% rename from appdaemon/appdaemon.py rename to appdaemon/appdaemon_dev.py diff --git a/context_manager.py b/appdaemon/context_manager.py similarity index 54% rename from context_manager.py rename to appdaemon/context_manager.py index 363691e..cba7f13 100644 --- a/context_manager.py +++ b/appdaemon/context_manager.py @@ -4,25 +4,40 @@ import logging import signal from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack, contextmanager +from threading import Event, Lock from time import sleep from typing import Any, Callable logger = logging.getLogger() -class CustomContextManager: +def handler(signum, frame): + print('Signal handler called with signal', signum) + raise OSError("Couldn't open device!") + + +class AppDaemonRunContext: _stack: ExitStack loop: asyncio.AbstractEventLoop executor: ThreadPoolExecutor + stop_event: Event + shutdown_lock: Lock def __init__(self): self._stack = ExitStack() + self.stop_event = Event() + self.shutdown_lock = Lock() def __enter__(self): self.loop = self._stack.enter_context(self.asyncio_context()) logger.debug("Entered asyncio context") - self.loop.add_signal_handler(signal.SIGINT, self.handle_signal) - self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal) + + signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) + for s in signals: + self.loop.add_signal_handler(s, lambda s=s: self.loop.create_task(self.shutdown(s))) + + # self.loop.add_signal_handler(signal.SIGINT, self.handle_signal, signal.SIGINT) + # self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal) # self.executor = self._stack.enter_context(ThreadPoolExecutor(max_workers=5)) self.executor = self._stack.enter_context(self.thread_context()) logger.debug("Entered threadpool context") @@ -32,29 +47,52 @@ class CustomContextManager: self._stack.__exit__(exc_type, exc_value, traceback) logger.debug("Exited context") + async def shutdown(self, signal): + """Cleanup tasks tied to the service's shutdown. + + https://www.roguelynn.com/words/asyncio-graceful-shutdowns/ + """ + logger.info(f"Received exit signal {signal.name}...") + if not self.stop_event.is_set(): + logger.debug('Setting stop event') + self.stop_event.set() + + tasks = [ + t for t in asyncio.all_tasks() + if t is not asyncio.current_task() + ] + + for task in tasks: + # logger.debug(f"Waiting on task to finish: {task.get_coro().__qualname__}") + logger.debug(f"Cancelling {task.get_name()}: {task.get_coro().__qualname__}") + task.cancel() + + logger.debug('Waiting for tasks to finish...') + try: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError as e: + logger.debug(f'Cancelled: {e}') + logger.debug("Stopping event loop in context shutdown") + self.loop.stop() + else: + logger.warning('Already started shutdown') + @contextmanager def asyncio_context(self): try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = asyncio.get_event_loop() yield loop finally: loop.close() - logger.info("Closed the event loop.") + if loop.is_closed(): + logger.debug("Closed the event loop.") @contextmanager def thread_context(self): with ThreadPoolExecutor(max_workers=5) as executor: yield executor - logger.debug('Shut down the ThreadPoolExecutor') - - def handle_signal(self, signum=None, frame=None): - logger.info(f'Handle signal: {signum}, {frame}') - # match signum: - # case signal.SIGINT: - # pass - # case signal.SIGTERM: - # pass + if executor._shutdown: + logger.debug('Shut down the ThreadPoolExecutor') async def run_in_executor( self, @@ -97,14 +135,17 @@ if __name__ == "__main__": await asyncio.sleep(delay) logger.info(f'{id_} Done async') - with CustomContextManager() as cm: + with AppDaemonRunContext() as cm: for _ in range(3): logger.info('Submitting random dummy_functions') - cm.executor.submit(dummy_function, random.random() * 3.0) + cm.executor.submit(dummy_function, random.random() * 10.0) cm.loop.create_task(async_dummy_function(random.random() * 5.0)) - logger.info('Running until complete') - cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop))) + try: + logger.info('Running until complete') + cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop))) + except asyncio.CancelledError: + logger.error('Cancelled') if cm.loop.is_closed(): logger.debug('Loop is closed') diff --git a/appdaemon/subsystem.py b/appdaemon/subsystem.py new file mode 100644 index 0000000..509ff18 --- /dev/null +++ b/appdaemon/subsystem.py @@ -0,0 +1,146 @@ +import asyncio +import logging +from dataclasses import dataclass, field +from logging import Logger, getLogger +from threading import Event, RLock +from typing import Callable, Coroutine + +from appdaemon.models import AppDaemonConfig +from context_manager import AppDaemonRunContext + + +@dataclass +class ADSubsystem: + AD: "AppDaemon" + stop: Event + lock: RLock = field(default_factory=RLock) + logger: Logger = field(init=False) + + def __post_init__(self) -> None: + name = f'_{self.__class__.__name__.lower()}' + self.logger = getLogger(f'AppDaemon.{name}') + self.AD.starts.append(self.start) + + @property + def stopping(self) -> bool: + return self.stop.is_set() + + def start(self): + raise NotImplementedError('Need to implement start for subsystem') + + +@dataclass +class Utility(ADSubsystem): + loop_rate: float = 1.0 + + def start(self): + self.AD.create_task(self.loop(), 'Utility loop') + + async def loop(self): + while not self.stopping: + self.logger.info('Looping...') + await asyncio.sleep(self.loop_rate) + self.logger.debug('Stopped utility loop') + + +@dataclass +class Plugin(ADSubsystem): + state: dict[str, int] = field(default_factory=dict) + update_rate: float = 10.0 + + def __post_init__(self) -> None: + super().__post_init__() + self.state['update_count'] = 0 + + def start(self): + self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update') + + async def periodic_self_udpate(self): + while not self.stopping: + with self.lock: + self.state['update_count'] += 1 + self.logger.info(f'Updated self: {self.state["update_count"]}') + await asyncio.sleep(self.update_rate) + self.logger.debug('Stopped plugin updates') + + +@dataclass +class AppDaemon: + cfg: AppDaemonConfig + context: AppDaemonRunContext + utility: Utility = field(init=False) + plugins: dict[str, Plugin] = field(default_factory=dict) + starts: list[Callable] = field(default_factory=list) + + def __post_init__(self) -> None: + self.utility = Utility(self, self.context.stop_event) + self.plugins['dummy'] = Plugin(self, self.context.stop_event) + + def create_task(self, coro: Coroutine, name: str | None = None): + return self.context.loop.create_task(coro, name=name) + + def start(self): + for start in self.starts: + start() + + +@dataclass +class ADMain: + config_file: str + cfg: AppDaemonConfig = field(init=False) + + def __post_init__(self) -> None: + raw_cfg = read_config_file(self.config_file) + self.cfg = AppDaemonConfig( + config_file=self.config_file, + **raw_cfg['appdaemon'] + ) + + def run(self): + with AppDaemonRunContext() as cm: + ad = AppDaemon(self.cfg, cm) + ad.start() + cm.loop.run_forever() + + +if __name__ == '__main__': + import logging.config + + from appdaemon.utils import read_config_file + from rich.console import Console + from rich.highlighter import NullHighlighter + + console = Console() + logging.config.dictConfig( + { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': {'basic': {'style': '{', 'format': '{message}'}}, + 'handlers': { + 'rich': { + '()': 'rich.logging.RichHandler', + 'formatter': 'basic', + 'console': console, + 'highlighter': NullHighlighter(), + 'markup': True + } + }, + 'root': {'level': 'DEBUG', 'handlers': ['rich']}, + } + ) + + main = ADMain('/conf/ad-test/conf/appdaemon.yaml') + main.run() + + # config_file = '/conf/ad-test/conf/appdaemon.yaml' + # raw_cfg = read_config_file(config_file) + + # cfg = AppDaemonConfig( + # config_file=config_file, + # **raw_cfg['appdaemon'] + # ) + + # with AppDaemonRunContext() as cm: + # ad = AppDaemon(cfg, cm) + # # ad.start() + # # cm.loop.run_forever()