Compare commits

..

7 Commits

Author SHA1 Message Date
John Lancaster
7ea362c5fd added ad_threads 2025-01-04 14:14:14 -06:00
John Lancaster
4b748173c1 moved sleep function 2024-10-21 03:25:57 +00:00
John Lancaster
de496509a9 WIP 2024-10-21 03:15:59 +00:00
John Lancaster
bc2224a919 formatting 2024-10-21 02:47:38 +00:00
John Lancaster
a67b2998b7 made a graceful sleep method 2024-10-21 02:47:22 +00:00
John Lancaster
0abbb9e546 context manager work for startup/shutdown 2024-10-21 02:41:19 +00:00
John Lancaster
502c218c35 more work 2024-10-20 21:48:19 +00:00
4 changed files with 259 additions and 145 deletions

54
ad_threads.py Normal file
View File

@@ -0,0 +1,54 @@
import threading
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from queue import Queue
from time import perf_counter
from typing import Callable
def worker(q: Queue):
thread = threading.current_thread()
print(thread)
while True:
try:
item = q.get()
get_time = perf_counter()
if item is None:
break
elif callable(item):
print(f'Calling {item}')
item()
else:
print(f'{thread.name}: {item}')
except Exception as exc:
print(f'Error: {exc}')
finally:
proc_time = perf_counter() - get_time
q.task_done()
print(f'{proc_time*10**3:.3f}ms')
# print('Broke worker loop')
return f'Ended {thread.name}'
def run_executor(queues: Iterable[Queue]):
with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor:
for fut in executor.map(worker, queues):
print(fut)
def main(n: int, start: bool = True):
queues = [Queue() for _ in range(n)]
thread = threading.Thread(
target=run_executor,
args=(queues,),
name='AD-ThreadPoolExecutor'
)
if start:
thread.start()
return queues
def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs):
return queue.put_nowait(partial(func, *args, **kwargs))

View File

@@ -4,49 +4,48 @@ import logging
import signal
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack, contextmanager
from threading import Event, Lock
from time import sleep
from dataclasses import dataclass, field
from threading import Event
from typing import Any, Callable
logger = logging.getLogger()
def handler(signum, frame):
print('Signal handler called with signal', signum)
raise OSError("Couldn't open device!")
logger = logging.getLogger(__name__)
@dataclass
class AppDaemonRunContext:
_stack: ExitStack
loop: asyncio.AbstractEventLoop
executor: ThreadPoolExecutor
stop_event: Event
shutdown_lock: Lock
_stack: ExitStack = field(default_factory=ExitStack)
stop_event: Event = field(default_factory=Event)
shutdown_grace_period: float = 0.1
def __init__(self):
self._stack = ExitStack()
self.stop_event = Event()
self.shutdown_lock = Lock()
loop: asyncio.AbstractEventLoop = field(init=False)
executor: ThreadPoolExecutor = field(init=False)
def __enter__(self):
self.loop = self._stack.enter_context(self.asyncio_context())
logger.debug("Entered asyncio context")
logger.debug("Created event loop")
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
self.loop.add_signal_handler(s, lambda s=s: self.loop.create_task(self.shutdown(s)))
self.loop.add_signal_handler(
s, lambda s=s: self.loop.create_task(self.shutdown(s)))
self.executor = self._stack.enter_context(self.thread_context())
logger.debug("Entered threadpool context")
logger.debug("Started thread pool")
return self
def __exit__(self, exc_type, exc_value, traceback):
self._stack.__exit__(exc_type, exc_value, traceback)
logger.debug("Exited context")
logger.debug(f'Closing context from {self.__class__.__name__}')
self._stack.close()
async def shutdown(self, signal):
def get_running_tasks(self) -> list[asyncio.Task]:
return [
t for t in asyncio.all_tasks(self.loop)
if t is not asyncio.current_task()
]
async def shutdown(self, signal=signal.SIGTERM):
"""Cleanup tasks tied to the service's shutdown.
https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
"""
logger.info(f"Received exit signal {signal.name}...")
@@ -54,15 +53,23 @@ class AppDaemonRunContext:
logger.debug('Setting stop event')
self.stop_event.set()
tasks = self.get_running_tasks()
graceful = (
asyncio.wait_for(t, timeout=2.0)
for t in asyncio.all_tasks()
if t is not asyncio.current_task()
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
for t in tasks
)
logger.debug(
'Allowing graceful shutdown from stop event for %ss',
self.shutdown_grace_period
)
logger.debug('Allowing graceful shutdown from stop event')
await asyncio.gather(*graceful, return_exceptions=True)
logger.debug("Stopping event loop in context shutdown")
for task in tasks:
if task.cancelled():
logger.warning(f'Cancelled {task.get_name()}')
logger.info("Calling stop() for asyncio event loop")
self.loop.stop()
else:
logger.warning('Already started shutdown')
@@ -75,14 +82,14 @@ class AppDaemonRunContext:
finally:
loop.close()
if loop.is_closed():
logger.debug("Closed the event loop.")
logger.debug("Gracefully closed event loop.")
@contextmanager
def thread_context(self):
with ThreadPoolExecutor(max_workers=5) as executor:
yield executor
if executor._shutdown:
logger.debug('Shut down the ThreadPoolExecutor')
logger.debug('Gracefully shut down ThreadPoolExecutor')
async def run_in_executor(
self,
@@ -105,39 +112,9 @@ class AppDaemonRunContext:
except asyncio.TimeoutError:
print('Timed out')
if __name__ == "__main__":
import logging
import random
from uuid import uuid4
logging.basicConfig(level="DEBUG", format="{levelname:<8} {message}", style="{")
def dummy_function(delay: float):
id_ = uuid4().hex[:4]
logger.info(f'{id_} sleeping for {delay:.1f}s')
sleep(delay)
logger.info(f'{id_} Done')
async def async_dummy_function(delay: float):
id_ = uuid4().hex[:4]
logger.info(f'{id_} async sleeping for {delay:.1f}s')
await asyncio.sleep(delay)
logger.info(f'{id_} Done async')
with AppDaemonRunContext() as cm:
for _ in range(3):
logger.info('Submitting random dummy_functions')
cm.executor.submit(dummy_function, random.random() * 10.0)
cm.loop.create_task(async_dummy_function(random.random() * 5.0))
async def sleep(self, delay: float):
"""Wrapper function for asyncio.sleep that suppresses and logs a task cancellation"""
try:
logger.info('Running until complete')
cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop)))
await self.run_in_executor(self.stop_event.wait, delay)
except asyncio.CancelledError:
logger.error('Cancelled')
if cm.loop.is_closed():
logger.debug('Loop is closed')
if cm.executor._shutdown:
logger.debug('Executor is shut down')
logger.debug('Cancelled during sleep')

79
appdaemon/main.py Normal file
View File

@@ -0,0 +1,79 @@
from contextlib import ExitStack
from dataclasses import dataclass, field
from appdaemon.models.ad_config import AppDaemonConfig
from context_manager import AppDaemonRunContext
from subsystem import AppDaemon
@dataclass
class ADMain:
"""Class to contain the mechanics to run AppDaemon as module
"""
config_file: str
cfg: AppDaemonConfig = field(init=False)
_stack: ExitStack = field(default_factory=ExitStack)
run_context: AppDaemonRunContext = field(init=False)
ad: AppDaemon = field(init=False)
def __post_init__(self) -> None:
raw_cfg = read_config_file(self.config_file)
self.cfg = AppDaemonConfig(
config_file=self.config_file,
**raw_cfg['appdaemon']
)
def __enter__(self):
"""Used to start the asyncio loop, thread pool, and AppDaemon"""
# Use the ExitStack from ADMain instead of creating a new one
self.run_context = AppDaemonRunContext(_stack=self._stack)
self._stack.enter_context(self.run_context)
# Start AppDaemon by entering it's context
self.ad = AppDaemon(self.cfg, self.run_context)
self._stack.enter_context(self.ad)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.ad.logger.debug(f'Closing context from {self.__class__.__name__}')
self._stack.close()
self.ad.logger.info('Stopped main()')
def run(self):
if hasattr(self, 'ad'):
self.ad.logger.info('Running asyncio event loop indefinitely...')
self.run_context.loop.run_forever()
self.ad.logger.debug('Gracefully stopped event loop')
else:
logging.error('Running ADMain without context manager')
if __name__ == '__main__':
import logging.config
from appdaemon.utils import read_config_file
from rich.console import Console
from rich.highlighter import NullHighlighter
console = Console()
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name:20}[/] {message}'}},
'handlers': {
'rich': {
'()': 'rich.logging.RichHandler',
'formatter': 'basic',
'console': console,
'highlighter': NullHighlighter(),
'markup': True
}
},
'root': {'level': 'DEBUG', 'handlers': ['rich']},
}
)
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
main.run()
main.ad.logger.debug('Exiting main context from with statement')

View File

@@ -1,9 +1,15 @@
import asyncio
import logging
import os
import signal
import traceback
from contextlib import ExitStack
from dataclasses import dataclass, field
from logging import Logger, getLogger
from random import random
from threading import Event, RLock
from typing import Callable, Coroutine
from time import perf_counter
from typing import Coroutine
from appdaemon.models import AppDaemonConfig
from context_manager import AppDaemonRunContext
@@ -13,134 +19,132 @@ from context_manager import AppDaemonRunContext
class ADSubsystem:
AD: "AppDaemon"
stop: Event
"""An thread event for the subsystem to use to shutdown gracefully"""
lock: RLock = field(default_factory=RLock)
"""A threadsafe re-entrant lock to protect any internal data while it's being modified"""
logger: Logger = field(init=False)
tasks: list[asyncio.Task] = field(default_factory=list)
def __post_init__(self) -> None:
name = f'_{self.__class__.__name__.lower()}'
self.logger = getLogger(f'AppDaemon.{name}')
self.AD.starts.append(self.start)
self.create_task = self.AD.create_task
self.sleep = self.AD.context.sleep
def __enter__(self):
self.logger.debug(f'Starting {self.__class__.__name__}')
def __exit__(self, exc_type, exc_value, traceback):
pass
# self.logger.debug(f'Exiting {self.__class__.__name__}')
@property
def stopping(self) -> bool:
return self.stop.is_set()
def start(self):
raise NotImplementedError('Need to implement start for subsystem')
@dataclass
class Utility(ADSubsystem):
loop_rate: float = 1.0
def start(self):
self.AD.create_task(self.loop(), 'Utility loop')
def __enter__(self):
super().__enter__()
self.create_task(self.loop(), 'utility loop', critical=True)
return self
async def loop(self):
while not self.stopping:
self.logger.info('Looping...')
await asyncio.sleep(self.loop_rate)
self.logger.debug('Stopped utility loop')
self.logger.debug('Looping...')
await self.sleep(self.loop_rate)
task_name = asyncio.current_task().get_name()
self.logger.debug(f'Gracefully stopped {task_name} task')
@dataclass
class Plugin(ADSubsystem):
state: dict[str, int] = field(default_factory=dict)
update_rate: float = 30.0
update_rate: float = 5.0
def __post_init__(self) -> None:
super().__post_init__()
self.state['update_count'] = 0
def start(self):
self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update')
def __enter__(self):
super().__enter__()
self.create_task(
self.periodic_self_udpate(),
name='plugin periodic update',
critical=True
)
return self
async def periodic_self_udpate(self):
loop_time = perf_counter()
while not self.stopping:
with self.lock:
self.state['update_count'] += 1
self.logger.info(f'Updated self: {self.state["update_count"]}')
await asyncio.sleep(self.update_rate)
self.logger.debug('Stopped plugin updates')
# self.logger.debug('Long plugin update...')
# await asyncio.sleep(random())
self.logger.debug(
'Plugin self update: %s %s',
self.state["update_count"],
f'{perf_counter()-loop_time:.3f}s'
)
loop_time = perf_counter()
if self.state['update_count'] == 2:
raise ValueError('fake error')
await self.sleep(self.update_rate)
task_name = asyncio.current_task().get_name()
self.logger.debug(f'Gracefully stopped {task_name} task')
@dataclass
class AppDaemon:
cfg: AppDaemonConfig
context: AppDaemonRunContext
_stack: ExitStack = field(default_factory=ExitStack)
utility: Utility = field(init=False)
plugins: dict[str, Plugin] = field(default_factory=dict)
starts: list[Callable] = field(default_factory=list)
def __post_init__(self) -> None:
self.logger = logging.getLogger('AppDaemon')
self.utility = Utility(self, self.context.stop_event)
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
def create_task(self, coro: Coroutine, name: str | None = None):
return self.context.loop.create_task(coro, name=name)
def __enter__(self):
self.logger.info('Starting AppDaemon')
self._stack.enter_context(self.utility)
for plugin in self.plugins.values():
self._stack.enter_context(plugin)
return self
def start(self):
for start in self.starts:
start()
def __exit__(self, exc_type, exc_value, traceback):
self._stack.__exit__(exc_type, exc_value, traceback)
def create_task(self, coro: Coroutine, name: str | None = None, critical: bool = False):
"""Creates an async task and adds exception callbacks"""
task = self.context.loop.create_task(coro, name=name)
task.add_done_callback(self.check_task_exception)
if critical:
task.add_done_callback(self.critical_exception)
return task
@dataclass
class ADMain:
config_file: str
cfg: AppDaemonConfig = field(init=False)
def check_task_exception(self, task: asyncio.Task):
if (exc := task.exception()) and not isinstance(exc, asyncio.CancelledError):
self.logger.error('\n'.join(traceback.format_exception(exc)))
def __post_init__(self) -> None:
raw_cfg = read_config_file(self.config_file)
self.cfg = AppDaemonConfig(
config_file=self.config_file,
**raw_cfg['appdaemon']
)
def critical_exception(self, task: asyncio.Task):
if task.exception():
self.logger.critical(
'Critical error in %s, forcing shutdown',
task.get_name()
)
self.shutdown()
def run(self):
with AppDaemonRunContext() as cm:
ad = AppDaemon(self.cfg, cm)
ad.start()
cm.loop.run_forever()
if __name__ == '__main__':
import logging.config
from appdaemon.utils import read_config_file
from rich.console import Console
from rich.highlighter import NullHighlighter
console = Console()
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}},
'handlers': {
'rich': {
'()': 'rich.logging.RichHandler',
'formatter': 'basic',
'console': console,
'highlighter': NullHighlighter(),
'markup': True
}
},
'root': {'level': 'DEBUG', 'handlers': ['rich']},
}
)
main = ADMain('/conf/ad-test/conf/appdaemon.yaml')
main.run()
# config_file = '/conf/ad-test/conf/appdaemon.yaml'
# raw_cfg = read_config_file(config_file)
# cfg = AppDaemonConfig(
# config_file=config_file,
# **raw_cfg['appdaemon']
# )
# with AppDaemonRunContext() as cm:
# ad = AppDaemon(cfg, cm)
# # ad.start()
# # cm.loop.run_forever()
def shutdown(self):
self.logger.debug('Shutting down by sending SIGTERM')
os.kill(os.getpid(), signal.SIGTERM)