Compare commits

..

3 Commits

Author SHA1 Message Date
John Lancaster
050fe75e71 more context manager work 2024-10-20 21:27:51 +00:00
John Lancaster
8064a73e9f context manager and subsystem work 2024-10-20 20:56:18 +00:00
John Lancaster
9121d1ef04 added entity callback cancellation 2024-10-16 03:57:40 +00:00
4 changed files with 223 additions and 20 deletions

View File

@@ -4,26 +4,38 @@ import logging
import signal import signal
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack, contextmanager from contextlib import ExitStack, contextmanager
from threading import Event, Lock
from time import sleep from time import sleep
from typing import Any, Callable from typing import Any, Callable
logger = logging.getLogger() logger = logging.getLogger()
class CustomContextManager: def handler(signum, frame):
print('Signal handler called with signal', signum)
raise OSError("Couldn't open device!")
class AppDaemonRunContext:
_stack: ExitStack _stack: ExitStack
loop: asyncio.AbstractEventLoop loop: asyncio.AbstractEventLoop
executor: ThreadPoolExecutor executor: ThreadPoolExecutor
stop_event: Event
shutdown_lock: Lock
def __init__(self): def __init__(self):
self._stack = ExitStack() self._stack = ExitStack()
self.stop_event = Event()
self.shutdown_lock = Lock()
def __enter__(self): def __enter__(self):
self.loop = self._stack.enter_context(self.asyncio_context()) self.loop = self._stack.enter_context(self.asyncio_context())
logger.debug("Entered asyncio context") logger.debug("Entered asyncio context")
self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal) signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
# self.executor = self._stack.enter_context(ThreadPoolExecutor(max_workers=5)) for s in signals:
self.loop.add_signal_handler(s, lambda s=s: self.loop.create_task(self.shutdown(s)))
self.executor = self._stack.enter_context(self.thread_context()) self.executor = self._stack.enter_context(self.thread_context())
logger.debug("Entered threadpool context") logger.debug("Entered threadpool context")
return self return self
@@ -32,30 +44,46 @@ class CustomContextManager:
self._stack.__exit__(exc_type, exc_value, traceback) self._stack.__exit__(exc_type, exc_value, traceback)
logger.debug("Exited context") logger.debug("Exited context")
async def shutdown(self, signal):
"""Cleanup tasks tied to the service's shutdown.
https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
"""
logger.info(f"Received exit signal {signal.name}...")
if not self.stop_event.is_set():
logger.debug('Setting stop event')
self.stop_event.set()
graceful = (
asyncio.wait_for(t, timeout=2.0)
for t in asyncio.all_tasks()
if t is not asyncio.current_task()
)
logger.debug('Allowing graceful shutdown from stop event')
await asyncio.gather(*graceful, return_exceptions=True)
logger.debug("Stopping event loop in context shutdown")
self.loop.stop()
else:
logger.warning('Already started shutdown')
@contextmanager @contextmanager
def asyncio_context(self): def asyncio_context(self):
try: try:
loop = asyncio.new_event_loop() loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
yield loop yield loop
finally: finally:
loop.close() loop.close()
logger.info("Closed the event loop.") if loop.is_closed():
logger.debug("Closed the event loop.")
@contextmanager @contextmanager
def thread_context(self): def thread_context(self):
with ThreadPoolExecutor(max_workers=5) as executor: with ThreadPoolExecutor(max_workers=5) as executor:
yield executor yield executor
if executor._shutdown:
logger.debug('Shut down the ThreadPoolExecutor') logger.debug('Shut down the ThreadPoolExecutor')
def handle_signal(self, signum=None, frame=None):
logger.info(f'Handle signal: {signum}, {frame}')
# match signum:
# case signal.SIGINT:
# pass
# case signal.SIGTERM:
# pass
async def run_in_executor( async def run_in_executor(
self, self,
func: Callable, func: Callable,
@@ -97,14 +125,17 @@ if __name__ == "__main__":
await asyncio.sleep(delay) await asyncio.sleep(delay)
logger.info(f'{id_} Done async') logger.info(f'{id_} Done async')
with CustomContextManager() as cm: with AppDaemonRunContext() as cm:
for _ in range(3): for _ in range(3):
logger.info('Submitting random dummy_functions') logger.info('Submitting random dummy_functions')
cm.executor.submit(dummy_function, random.random() * 3.0) cm.executor.submit(dummy_function, random.random() * 10.0)
cm.loop.create_task(async_dummy_function(random.random() * 5.0)) cm.loop.create_task(async_dummy_function(random.random() * 5.0))
try:
logger.info('Running until complete') logger.info('Running until complete')
cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop))) cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop)))
except asyncio.CancelledError:
logger.error('Cancelled')
if cm.loop.is_closed(): if cm.loop.is_closed():
logger.debug('Loop is closed') logger.debug('Loop is closed')

26
appdaemon/entity.py Normal file
View File

@@ -0,0 +1,26 @@
from appdaemon import utils
class Entity:
@utils.sync_decorator
async def get_callbacks(self, limit_to_app: str = None) -> dict[str, dict[str, Any]]:
async with self.AD.callbacks.callbacks_lock:
return {
handle: {
'app_name': cb_info['name'],
'function': cb_info['function'].__name__,
**cb_info["kwargs"]
}
for app_name, app_callbacks in self.AD.callbacks.callbacks.items()
if limit_to_app is None or app_name == limit_to_app
for handle, cb_info in app_callbacks.items()
if self.namespace == cb_info['namespace']
and cb_info["type"] == "state"
and cb_info.get('entity') == self.entity_id
}
@utils.sync_decorator
async def cancel_callbacks(self, all: bool = False):
for handle, info in (await self.get_callbacks()).items():
if all or self.name == info['app_name']:
await self.AD.state.cancel_state_callback(handle, info['app_name'])

146
appdaemon/subsystem.py Normal file
View File

@@ -0,0 +1,146 @@
import asyncio
import logging
from dataclasses import dataclass, field
from logging import Logger, getLogger
from threading import Event, RLock
from typing import Callable, Coroutine
from appdaemon.models import AppDaemonConfig
from context_manager import AppDaemonRunContext
@dataclass
class ADSubsystem:
AD: "AppDaemon"
stop: Event
lock: RLock = field(default_factory=RLock)
logger: Logger = field(init=False)
def __post_init__(self) -> None:
name = f'_{self.__class__.__name__.lower()}'
self.logger = getLogger(f'AppDaemon.{name}')
self.AD.starts.append(self.start)
@property
def stopping(self) -> bool:
return self.stop.is_set()
def start(self):
raise NotImplementedError('Need to implement start for subsystem')
@dataclass
class Utility(ADSubsystem):
loop_rate: float = 1.0
def start(self):
self.AD.create_task(self.loop(), 'Utility loop')
async def loop(self):
while not self.stopping:
self.logger.info('Looping...')
await asyncio.sleep(self.loop_rate)
self.logger.debug('Stopped utility loop')
@dataclass
class Plugin(ADSubsystem):
state: dict[str, int] = field(default_factory=dict)
update_rate: float = 30.0
def __post_init__(self) -> None:
super().__post_init__()
self.state['update_count'] = 0
def start(self):
self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update')
async def periodic_self_udpate(self):
while not self.stopping:
with self.lock:
self.state['update_count'] += 1
self.logger.info(f'Updated self: {self.state["update_count"]}')
await asyncio.sleep(self.update_rate)
self.logger.debug('Stopped plugin updates')
@dataclass
class AppDaemon:
cfg: AppDaemonConfig
context: AppDaemonRunContext
utility: Utility = field(init=False)
plugins: dict[str, Plugin] = field(default_factory=dict)
starts: list[Callable] = field(default_factory=list)
def __post_init__(self) -> None:
self.utility = Utility(self, self.context.stop_event)
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
def create_task(self, coro: Coroutine, name: str | None = None):
return self.context.loop.create_task(coro, name=name)
def start(self):
for start in self.starts:
start()
@dataclass
class ADMain:
config_file: str
cfg: AppDaemonConfig = field(init=False)
def __post_init__(self) -> None:
raw_cfg = read_config_file(self.config_file)
self.cfg = AppDaemonConfig(
config_file=self.config_file,
**raw_cfg['appdaemon']
)
def run(self):
with AppDaemonRunContext() as cm:
ad = AppDaemon(self.cfg, cm)
ad.start()
cm.loop.run_forever()
if __name__ == '__main__':
import logging.config
from appdaemon.utils import read_config_file
from rich.console import Console
from rich.highlighter import NullHighlighter
console = Console()
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}},
'handlers': {
'rich': {
'()': 'rich.logging.RichHandler',
'formatter': 'basic',
'console': console,
'highlighter': NullHighlighter(),
'markup': True
}
},
'root': {'level': 'DEBUG', 'handlers': ['rich']},
}
)
main = ADMain('/conf/ad-test/conf/appdaemon.yaml')
main.run()
# config_file = '/conf/ad-test/conf/appdaemon.yaml'
# raw_cfg = read_config_file(config_file)
# cfg = AppDaemonConfig(
# config_file=config_file,
# **raw_cfg['appdaemon']
# )
# with AppDaemonRunContext() as cm:
# ad = AppDaemon(cfg, cm)
# # ad.start()
# # cm.loop.run_forever()