Compare commits
8 Commits
050fe75e71
...
threads
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4aabf977e | ||
|
|
7ea362c5fd | ||
|
|
4b748173c1 | ||
|
|
de496509a9 | ||
|
|
bc2224a919 | ||
|
|
a67b2998b7 | ||
|
|
0abbb9e546 | ||
|
|
502c218c35 |
78
ad_threads.py
Normal file
78
ad_threads.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Iterable
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from functools import partial
|
||||
from queue import Queue
|
||||
from time import perf_counter
|
||||
from typing import Callable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def worker(q: Queue):
|
||||
thread = threading.current_thread()
|
||||
logger.info(thread)
|
||||
while True:
|
||||
try:
|
||||
item = q.get()
|
||||
get_time = perf_counter()
|
||||
if item is None:
|
||||
break
|
||||
elif callable(item):
|
||||
logger.info(f'Calling {item}')
|
||||
item()
|
||||
except Exception as exc:
|
||||
logger.info(f'Error: {exc}')
|
||||
finally:
|
||||
q.task_done()
|
||||
proc_time = perf_counter() - get_time
|
||||
logger.info(f'{thread.name}: {proc_time*10**3:.3f}ms {item}')
|
||||
|
||||
# logger.info('Broke worker loop')
|
||||
return f'Ended {thread.name}'
|
||||
|
||||
|
||||
def run_executor(queues: Iterable[Queue]):
|
||||
with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor:
|
||||
for q in queues:
|
||||
executor.submit(worker, q=q)
|
||||
# for fut in executor.map(worker, queues):
|
||||
# logger.info(fut)
|
||||
|
||||
|
||||
def main(n: int, start: bool = True):
|
||||
queues = [Queue() for _ in range(n)]
|
||||
thread = threading.Thread(
|
||||
target=run_executor,
|
||||
args=(queues,),
|
||||
name='AD-ThreadPoolExecutor'
|
||||
)
|
||||
if start:
|
||||
thread.start()
|
||||
return queues
|
||||
|
||||
|
||||
def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs):
|
||||
return queue.put_nowait(partial(func, *args, **kwargs))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from time import sleep
|
||||
|
||||
logging.basicConfig(
|
||||
level='INFO',
|
||||
format='{asctime} [{levelname}] {message}',
|
||||
style='{'
|
||||
)
|
||||
|
||||
queues = main(5)
|
||||
|
||||
logger.info('Starting')
|
||||
|
||||
for i in range(5):
|
||||
for q in queues:
|
||||
q.put_nowait(i)
|
||||
logger.info('Sleeping')
|
||||
sleep(1.0)
|
||||
logger.info('Done')
|
||||
@@ -4,47 +4,46 @@ import logging
|
||||
import signal
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from threading import Event, Lock
|
||||
from time import sleep
|
||||
from dataclasses import dataclass, field
|
||||
from threading import Event
|
||||
from typing import Any, Callable
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def handler(signum, frame):
|
||||
print('Signal handler called with signal', signum)
|
||||
raise OSError("Couldn't open device!")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppDaemonRunContext:
|
||||
_stack: ExitStack
|
||||
loop: asyncio.AbstractEventLoop
|
||||
executor: ThreadPoolExecutor
|
||||
stop_event: Event
|
||||
shutdown_lock: Lock
|
||||
_stack: ExitStack = field(default_factory=ExitStack)
|
||||
stop_event: Event = field(default_factory=Event)
|
||||
shutdown_grace_period: float = 0.1
|
||||
|
||||
def __init__(self):
|
||||
self._stack = ExitStack()
|
||||
self.stop_event = Event()
|
||||
self.shutdown_lock = Lock()
|
||||
loop: asyncio.AbstractEventLoop = field(init=False)
|
||||
executor: ThreadPoolExecutor = field(init=False)
|
||||
|
||||
def __enter__(self):
|
||||
self.loop = self._stack.enter_context(self.asyncio_context())
|
||||
logger.debug("Entered asyncio context")
|
||||
logger.debug("Created event loop")
|
||||
|
||||
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
||||
for s in signals:
|
||||
self.loop.add_signal_handler(s, lambda s=s: self.loop.create_task(self.shutdown(s)))
|
||||
self.loop.add_signal_handler(
|
||||
s, lambda s=s: self.loop.create_task(self.shutdown(s)))
|
||||
|
||||
self.executor = self._stack.enter_context(self.thread_context())
|
||||
logger.debug("Entered threadpool context")
|
||||
logger.debug("Started thread pool")
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self._stack.__exit__(exc_type, exc_value, traceback)
|
||||
logger.debug("Exited context")
|
||||
logger.debug(f'Closing context from {self.__class__.__name__}')
|
||||
self._stack.close()
|
||||
|
||||
async def shutdown(self, signal):
|
||||
def get_running_tasks(self) -> list[asyncio.Task]:
|
||||
return [
|
||||
t for t in asyncio.all_tasks(self.loop)
|
||||
if t is not asyncio.current_task()
|
||||
]
|
||||
|
||||
async def shutdown(self, signal=signal.SIGTERM):
|
||||
"""Cleanup tasks tied to the service's shutdown.
|
||||
|
||||
https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
|
||||
@@ -54,15 +53,23 @@ class AppDaemonRunContext:
|
||||
logger.debug('Setting stop event')
|
||||
self.stop_event.set()
|
||||
|
||||
tasks = self.get_running_tasks()
|
||||
|
||||
graceful = (
|
||||
asyncio.wait_for(t, timeout=2.0)
|
||||
for t in asyncio.all_tasks()
|
||||
if t is not asyncio.current_task()
|
||||
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
|
||||
for t in tasks
|
||||
)
|
||||
logger.debug(
|
||||
'Allowing graceful shutdown from stop event for %ss',
|
||||
self.shutdown_grace_period
|
||||
)
|
||||
logger.debug('Allowing graceful shutdown from stop event')
|
||||
await asyncio.gather(*graceful, return_exceptions=True)
|
||||
|
||||
logger.debug("Stopping event loop in context shutdown")
|
||||
for task in tasks:
|
||||
if task.cancelled():
|
||||
logger.warning(f'Cancelled {task.get_name()}')
|
||||
|
||||
logger.info("Calling stop() for asyncio event loop")
|
||||
self.loop.stop()
|
||||
else:
|
||||
logger.warning('Already started shutdown')
|
||||
@@ -75,14 +82,14 @@ class AppDaemonRunContext:
|
||||
finally:
|
||||
loop.close()
|
||||
if loop.is_closed():
|
||||
logger.debug("Closed the event loop.")
|
||||
logger.debug("Gracefully closed event loop.")
|
||||
|
||||
@contextmanager
|
||||
def thread_context(self):
|
||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||
yield executor
|
||||
if executor._shutdown:
|
||||
logger.debug('Shut down the ThreadPoolExecutor')
|
||||
logger.debug('Gracefully shut down ThreadPoolExecutor')
|
||||
|
||||
async def run_in_executor(
|
||||
self,
|
||||
@@ -105,39 +112,9 @@ class AppDaemonRunContext:
|
||||
except asyncio.TimeoutError:
|
||||
print('Timed out')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import logging
|
||||
import random
|
||||
from uuid import uuid4
|
||||
|
||||
logging.basicConfig(level="DEBUG", format="{levelname:<8} {message}", style="{")
|
||||
|
||||
def dummy_function(delay: float):
|
||||
id_ = uuid4().hex[:4]
|
||||
logger.info(f'{id_} sleeping for {delay:.1f}s')
|
||||
sleep(delay)
|
||||
logger.info(f'{id_} Done')
|
||||
|
||||
async def async_dummy_function(delay: float):
|
||||
id_ = uuid4().hex[:4]
|
||||
logger.info(f'{id_} async sleeping for {delay:.1f}s')
|
||||
await asyncio.sleep(delay)
|
||||
logger.info(f'{id_} Done async')
|
||||
|
||||
with AppDaemonRunContext() as cm:
|
||||
for _ in range(3):
|
||||
logger.info('Submitting random dummy_functions')
|
||||
cm.executor.submit(dummy_function, random.random() * 10.0)
|
||||
cm.loop.create_task(async_dummy_function(random.random() * 5.0))
|
||||
|
||||
async def sleep(self, delay: float):
|
||||
"""Wrapper function for asyncio.sleep that suppresses and logs a task cancellation"""
|
||||
try:
|
||||
logger.info('Running until complete')
|
||||
cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop)))
|
||||
await self.run_in_executor(self.stop_event.wait, delay)
|
||||
except asyncio.CancelledError:
|
||||
logger.error('Cancelled')
|
||||
|
||||
if cm.loop.is_closed():
|
||||
logger.debug('Loop is closed')
|
||||
if cm.executor._shutdown:
|
||||
logger.debug('Executor is shut down')
|
||||
logger.debug('Cancelled during sleep')
|
||||
|
||||
79
appdaemon/main.py
Normal file
79
appdaemon/main.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from contextlib import ExitStack
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from appdaemon.models.ad_config import AppDaemonConfig
|
||||
from context_manager import AppDaemonRunContext
|
||||
from subsystem import AppDaemon
|
||||
|
||||
|
||||
@dataclass
|
||||
class ADMain:
|
||||
"""Class to contain the mechanics to run AppDaemon as module
|
||||
"""
|
||||
config_file: str
|
||||
cfg: AppDaemonConfig = field(init=False)
|
||||
_stack: ExitStack = field(default_factory=ExitStack)
|
||||
run_context: AppDaemonRunContext = field(init=False)
|
||||
ad: AppDaemon = field(init=False)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
raw_cfg = read_config_file(self.config_file)
|
||||
self.cfg = AppDaemonConfig(
|
||||
config_file=self.config_file,
|
||||
**raw_cfg['appdaemon']
|
||||
)
|
||||
|
||||
def __enter__(self):
|
||||
"""Used to start the asyncio loop, thread pool, and AppDaemon"""
|
||||
# Use the ExitStack from ADMain instead of creating a new one
|
||||
self.run_context = AppDaemonRunContext(_stack=self._stack)
|
||||
self._stack.enter_context(self.run_context)
|
||||
|
||||
# Start AppDaemon by entering it's context
|
||||
self.ad = AppDaemon(self.cfg, self.run_context)
|
||||
self._stack.enter_context(self.ad)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.ad.logger.debug(f'Closing context from {self.__class__.__name__}')
|
||||
self._stack.close()
|
||||
self.ad.logger.info('Stopped main()')
|
||||
|
||||
def run(self):
|
||||
if hasattr(self, 'ad'):
|
||||
self.ad.logger.info('Running asyncio event loop indefinitely...')
|
||||
self.run_context.loop.run_forever()
|
||||
self.ad.logger.debug('Gracefully stopped event loop')
|
||||
else:
|
||||
logging.error('Running ADMain without context manager')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import logging.config
|
||||
|
||||
from appdaemon.utils import read_config_file
|
||||
from rich.console import Console
|
||||
from rich.highlighter import NullHighlighter
|
||||
|
||||
console = Console()
|
||||
logging.config.dictConfig(
|
||||
{
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False,
|
||||
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name:20}[/] {message}'}},
|
||||
'handlers': {
|
||||
'rich': {
|
||||
'()': 'rich.logging.RichHandler',
|
||||
'formatter': 'basic',
|
||||
'console': console,
|
||||
'highlighter': NullHighlighter(),
|
||||
'markup': True
|
||||
}
|
||||
},
|
||||
'root': {'level': 'DEBUG', 'handlers': ['rich']},
|
||||
}
|
||||
)
|
||||
|
||||
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
|
||||
main.run()
|
||||
main.ad.logger.debug('Exiting main context from with statement')
|
||||
@@ -1,9 +1,15 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import traceback
|
||||
from contextlib import ExitStack
|
||||
from dataclasses import dataclass, field
|
||||
from logging import Logger, getLogger
|
||||
from random import random
|
||||
from threading import Event, RLock
|
||||
from typing import Callable, Coroutine
|
||||
from time import perf_counter
|
||||
from typing import Coroutine
|
||||
|
||||
from appdaemon.models import AppDaemonConfig
|
||||
from context_manager import AppDaemonRunContext
|
||||
@@ -13,134 +19,132 @@ from context_manager import AppDaemonRunContext
|
||||
class ADSubsystem:
|
||||
AD: "AppDaemon"
|
||||
stop: Event
|
||||
"""An thread event for the subsystem to use to shutdown gracefully"""
|
||||
lock: RLock = field(default_factory=RLock)
|
||||
"""A threadsafe re-entrant lock to protect any internal data while it's being modified"""
|
||||
logger: Logger = field(init=False)
|
||||
tasks: list[asyncio.Task] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
name = f'_{self.__class__.__name__.lower()}'
|
||||
self.logger = getLogger(f'AppDaemon.{name}')
|
||||
self.AD.starts.append(self.start)
|
||||
self.create_task = self.AD.create_task
|
||||
self.sleep = self.AD.context.sleep
|
||||
|
||||
def __enter__(self):
|
||||
self.logger.debug(f'Starting {self.__class__.__name__}')
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
pass
|
||||
# self.logger.debug(f'Exiting {self.__class__.__name__}')
|
||||
|
||||
@property
|
||||
def stopping(self) -> bool:
|
||||
return self.stop.is_set()
|
||||
|
||||
def start(self):
|
||||
raise NotImplementedError('Need to implement start for subsystem')
|
||||
|
||||
|
||||
@dataclass
|
||||
class Utility(ADSubsystem):
|
||||
loop_rate: float = 1.0
|
||||
|
||||
def start(self):
|
||||
self.AD.create_task(self.loop(), 'Utility loop')
|
||||
def __enter__(self):
|
||||
super().__enter__()
|
||||
self.create_task(self.loop(), 'utility loop', critical=True)
|
||||
return self
|
||||
|
||||
async def loop(self):
|
||||
while not self.stopping:
|
||||
self.logger.info('Looping...')
|
||||
await asyncio.sleep(self.loop_rate)
|
||||
self.logger.debug('Stopped utility loop')
|
||||
self.logger.debug('Looping...')
|
||||
await self.sleep(self.loop_rate)
|
||||
|
||||
task_name = asyncio.current_task().get_name()
|
||||
self.logger.debug(f'Gracefully stopped {task_name} task')
|
||||
|
||||
|
||||
@dataclass
|
||||
class Plugin(ADSubsystem):
|
||||
state: dict[str, int] = field(default_factory=dict)
|
||||
update_rate: float = 30.0
|
||||
update_rate: float = 5.0
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
super().__post_init__()
|
||||
self.state['update_count'] = 0
|
||||
|
||||
def start(self):
|
||||
self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update')
|
||||
def __enter__(self):
|
||||
super().__enter__()
|
||||
self.create_task(
|
||||
self.periodic_self_udpate(),
|
||||
name='plugin periodic update',
|
||||
critical=True
|
||||
)
|
||||
return self
|
||||
|
||||
async def periodic_self_udpate(self):
|
||||
loop_time = perf_counter()
|
||||
while not self.stopping:
|
||||
with self.lock:
|
||||
self.state['update_count'] += 1
|
||||
self.logger.info(f'Updated self: {self.state["update_count"]}')
|
||||
await asyncio.sleep(self.update_rate)
|
||||
self.logger.debug('Stopped plugin updates')
|
||||
# self.logger.debug('Long plugin update...')
|
||||
# await asyncio.sleep(random())
|
||||
self.logger.debug(
|
||||
'Plugin self update: %s %s',
|
||||
self.state["update_count"],
|
||||
f'{perf_counter()-loop_time:.3f}s'
|
||||
)
|
||||
loop_time = perf_counter()
|
||||
|
||||
if self.state['update_count'] == 2:
|
||||
raise ValueError('fake error')
|
||||
|
||||
await self.sleep(self.update_rate)
|
||||
|
||||
task_name = asyncio.current_task().get_name()
|
||||
self.logger.debug(f'Gracefully stopped {task_name} task')
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppDaemon:
|
||||
cfg: AppDaemonConfig
|
||||
context: AppDaemonRunContext
|
||||
_stack: ExitStack = field(default_factory=ExitStack)
|
||||
utility: Utility = field(init=False)
|
||||
plugins: dict[str, Plugin] = field(default_factory=dict)
|
||||
starts: list[Callable] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.logger = logging.getLogger('AppDaemon')
|
||||
self.utility = Utility(self, self.context.stop_event)
|
||||
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
|
||||
|
||||
def create_task(self, coro: Coroutine, name: str | None = None):
|
||||
return self.context.loop.create_task(coro, name=name)
|
||||
def __enter__(self):
|
||||
self.logger.info('Starting AppDaemon')
|
||||
self._stack.enter_context(self.utility)
|
||||
for plugin in self.plugins.values():
|
||||
self._stack.enter_context(plugin)
|
||||
return self
|
||||
|
||||
def start(self):
|
||||
for start in self.starts:
|
||||
start()
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self._stack.__exit__(exc_type, exc_value, traceback)
|
||||
|
||||
def create_task(self, coro: Coroutine, name: str | None = None, critical: bool = False):
|
||||
"""Creates an async task and adds exception callbacks"""
|
||||
task = self.context.loop.create_task(coro, name=name)
|
||||
task.add_done_callback(self.check_task_exception)
|
||||
if critical:
|
||||
task.add_done_callback(self.critical_exception)
|
||||
return task
|
||||
|
||||
@dataclass
|
||||
class ADMain:
|
||||
config_file: str
|
||||
cfg: AppDaemonConfig = field(init=False)
|
||||
def check_task_exception(self, task: asyncio.Task):
|
||||
if (exc := task.exception()) and not isinstance(exc, asyncio.CancelledError):
|
||||
self.logger.error('\n'.join(traceback.format_exception(exc)))
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
raw_cfg = read_config_file(self.config_file)
|
||||
self.cfg = AppDaemonConfig(
|
||||
config_file=self.config_file,
|
||||
**raw_cfg['appdaemon']
|
||||
)
|
||||
def critical_exception(self, task: asyncio.Task):
|
||||
if task.exception():
|
||||
self.logger.critical(
|
||||
'Critical error in %s, forcing shutdown',
|
||||
task.get_name()
|
||||
)
|
||||
self.shutdown()
|
||||
|
||||
def run(self):
|
||||
with AppDaemonRunContext() as cm:
|
||||
ad = AppDaemon(self.cfg, cm)
|
||||
ad.start()
|
||||
cm.loop.run_forever()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import logging.config
|
||||
|
||||
from appdaemon.utils import read_config_file
|
||||
from rich.console import Console
|
||||
from rich.highlighter import NullHighlighter
|
||||
|
||||
console = Console()
|
||||
logging.config.dictConfig(
|
||||
{
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False,
|
||||
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}},
|
||||
'handlers': {
|
||||
'rich': {
|
||||
'()': 'rich.logging.RichHandler',
|
||||
'formatter': 'basic',
|
||||
'console': console,
|
||||
'highlighter': NullHighlighter(),
|
||||
'markup': True
|
||||
}
|
||||
},
|
||||
'root': {'level': 'DEBUG', 'handlers': ['rich']},
|
||||
}
|
||||
)
|
||||
|
||||
main = ADMain('/conf/ad-test/conf/appdaemon.yaml')
|
||||
main.run()
|
||||
|
||||
# config_file = '/conf/ad-test/conf/appdaemon.yaml'
|
||||
# raw_cfg = read_config_file(config_file)
|
||||
|
||||
# cfg = AppDaemonConfig(
|
||||
# config_file=config_file,
|
||||
# **raw_cfg['appdaemon']
|
||||
# )
|
||||
|
||||
# with AppDaemonRunContext() as cm:
|
||||
# ad = AppDaemon(cfg, cm)
|
||||
# # ad.start()
|
||||
# # cm.loop.run_forever()
|
||||
def shutdown(self):
|
||||
self.logger.debug('Shutting down by sending SIGTERM')
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
|
||||
Reference in New Issue
Block a user