Compare commits

...

3 Commits

Author SHA1 Message Date
John Lancaster
7ea362c5fd added ad_threads 2025-01-04 14:14:14 -06:00
John Lancaster
4b748173c1 moved sleep function 2024-10-21 03:25:57 +00:00
John Lancaster
de496509a9 WIP 2024-10-21 03:15:59 +00:00
4 changed files with 87 additions and 23 deletions

54
ad_threads.py Normal file
View File

@@ -0,0 +1,54 @@
import threading
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from queue import Queue
from time import perf_counter
from typing import Callable
def worker(q: Queue):
thread = threading.current_thread()
print(thread)
while True:
try:
item = q.get()
get_time = perf_counter()
if item is None:
break
elif callable(item):
print(f'Calling {item}')
item()
else:
print(f'{thread.name}: {item}')
except Exception as exc:
print(f'Error: {exc}')
finally:
proc_time = perf_counter() - get_time
q.task_done()
print(f'{proc_time*10**3:.3f}ms')
# print('Broke worker loop')
return f'Ended {thread.name}'
def run_executor(queues: Iterable[Queue]):
with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor:
for fut in executor.map(worker, queues):
print(fut)
def main(n: int, start: bool = True):
queues = [Queue() for _ in range(n)]
thread = threading.Thread(
target=run_executor,
args=(queues,),
name='AD-ThreadPoolExecutor'
)
if start:
thread.start()
return queues
def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs):
return queue.put_nowait(partial(func, *args, **kwargs))

View File

@@ -15,7 +15,7 @@ logger = logging.getLogger(__name__)
class AppDaemonRunContext:
_stack: ExitStack = field(default_factory=ExitStack)
stop_event: Event = field(default_factory=Event)
shutdown_grace_period: float = 0.75
shutdown_grace_period: float = 0.1
loop: asyncio.AbstractEventLoop = field(init=False)
executor: ThreadPoolExecutor = field(init=False)
@@ -37,10 +37,10 @@ class AppDaemonRunContext:
logger.debug(f'Closing context from {self.__class__.__name__}')
self._stack.close()
def get_running_tasks(self, exclude_current: bool = True) -> list[asyncio.Task]:
def get_running_tasks(self) -> list[asyncio.Task]:
return [
t for t in asyncio.all_tasks(self.loop)
if exclude_current and t is not asyncio.current_task()
if t is not asyncio.current_task()
]
async def shutdown(self, signal=signal.SIGTERM):
@@ -59,15 +59,17 @@ class AppDaemonRunContext:
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
for t in tasks
)
logger.debug(f'Allowing graceful shutdown from stop event for {
self.shutdown_grace_period}s')
logger.debug(
'Allowing graceful shutdown from stop event for %ss',
self.shutdown_grace_period
)
await asyncio.gather(*graceful, return_exceptions=True)
for task in tasks:
if task.cancelled():
logger.warning(f'Cancelled {task.get_name()}')
logger.info("Stopping asyncio event loop")
logger.info("Calling stop() for asyncio event loop")
self.loop.stop()
else:
logger.warning('Already started shutdown')
@@ -109,3 +111,10 @@ class AppDaemonRunContext:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
print('Timed out')
async def sleep(self, delay: float):
"""Wrapper function for asyncio.sleep that suppresses and logs a task cancellation"""
try:
await self.run_in_executor(self.stop_event.wait, delay)
except asyncio.CancelledError:
logger.debug('Cancelled during sleep')

View File

@@ -43,6 +43,7 @@ class ADMain:
if hasattr(self, 'ad'):
self.ad.logger.info('Running asyncio event loop indefinitely...')
self.run_context.loop.run_forever()
self.ad.logger.debug('Gracefully stopped event loop')
else:
logging.error('Running ADMain without context manager')
@@ -59,7 +60,7 @@ if __name__ == '__main__':
{
'version': 1,
'disable_existing_loggers': False,
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}},
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name:20}[/] {message}'}},
'handlers': {
'rich': {
'()': 'rich.logging.RichHandler',
@@ -69,9 +70,10 @@ if __name__ == '__main__':
'markup': True
}
},
'root': {'level': 'INFO', 'handlers': ['rich']},
'root': {'level': 'DEBUG', 'handlers': ['rich']},
}
)
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
main.run()
main.ad.logger.debug('Exiting main context from with statement')

View File

@@ -29,45 +29,42 @@ class ADSubsystem:
name = f'_{self.__class__.__name__.lower()}'
self.logger = getLogger(f'AppDaemon.{name}')
self.create_task = self.AD.create_task
self.sleep = self.AD.context.sleep
def __enter__(self):
self.logger.debug(f'Starting {self.__class__.__name__}')
def __exit__(self, exc_type, exc_value, traceback):
self.logger.debug(f'Exiting {self.__class__.__name__}')
pass
# self.logger.debug(f'Exiting {self.__class__.__name__}')
@property
def stopping(self) -> bool:
return self.stop.is_set()
async def sleep(self, delay: float):
try:
if not self.stopping:
await asyncio.sleep(delay)
except asyncio.CancelledError:
self.logger.debug('Cancelled during sleep')
@dataclass
class Utility(ADSubsystem):
loop_rate: float = 0.25
loop_rate: float = 1.0
def __enter__(self):
super().__enter__()
self.create_task(self.loop(), 'Utility loop', critical=True)
self.create_task(self.loop(), 'utility loop', critical=True)
return self
async def loop(self):
while not self.stopping:
self.logger.debug('Looping...')
await self.sleep(self.loop_rate)
self.logger.debug('Stopped utility loop gracefully')
task_name = asyncio.current_task().get_name()
self.logger.debug(f'Gracefully stopped {task_name} task')
@dataclass
class Plugin(ADSubsystem):
state: dict[str, int] = field(default_factory=dict)
update_rate: float = 2.0
update_rate: float = 5.0
def __post_init__(self) -> None:
super().__post_init__()
@@ -96,11 +93,13 @@ class Plugin(ADSubsystem):
)
loop_time = perf_counter()
# if self.state['update_count'] == 2:
# raise ValueError('fake error')
if self.state['update_count'] == 2:
raise ValueError('fake error')
await self.sleep(self.update_rate)
self.logger.debug('Stopped plugin updates gracefully')
task_name = asyncio.current_task().get_name()
self.logger.debug(f'Gracefully stopped {task_name} task')
@dataclass