Compare commits
4 Commits
bc2224a919
...
threads
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4aabf977e | ||
|
|
7ea362c5fd | ||
|
|
4b748173c1 | ||
|
|
de496509a9 |
78
ad_threads.py
Normal file
78
ad_threads.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from collections.abc import Iterable
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from functools import partial
|
||||||
|
from queue import Queue
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def worker(q: Queue):
|
||||||
|
thread = threading.current_thread()
|
||||||
|
logger.info(thread)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
item = q.get()
|
||||||
|
get_time = perf_counter()
|
||||||
|
if item is None:
|
||||||
|
break
|
||||||
|
elif callable(item):
|
||||||
|
logger.info(f'Calling {item}')
|
||||||
|
item()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.info(f'Error: {exc}')
|
||||||
|
finally:
|
||||||
|
q.task_done()
|
||||||
|
proc_time = perf_counter() - get_time
|
||||||
|
logger.info(f'{thread.name}: {proc_time*10**3:.3f}ms {item}')
|
||||||
|
|
||||||
|
# logger.info('Broke worker loop')
|
||||||
|
return f'Ended {thread.name}'
|
||||||
|
|
||||||
|
|
||||||
|
def run_executor(queues: Iterable[Queue]):
|
||||||
|
with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor:
|
||||||
|
for q in queues:
|
||||||
|
executor.submit(worker, q=q)
|
||||||
|
# for fut in executor.map(worker, queues):
|
||||||
|
# logger.info(fut)
|
||||||
|
|
||||||
|
|
||||||
|
def main(n: int, start: bool = True):
|
||||||
|
queues = [Queue() for _ in range(n)]
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=run_executor,
|
||||||
|
args=(queues,),
|
||||||
|
name='AD-ThreadPoolExecutor'
|
||||||
|
)
|
||||||
|
if start:
|
||||||
|
thread.start()
|
||||||
|
return queues
|
||||||
|
|
||||||
|
|
||||||
|
def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs):
|
||||||
|
return queue.put_nowait(partial(func, *args, **kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level='INFO',
|
||||||
|
format='{asctime} [{levelname}] {message}',
|
||||||
|
style='{'
|
||||||
|
)
|
||||||
|
|
||||||
|
queues = main(5)
|
||||||
|
|
||||||
|
logger.info('Starting')
|
||||||
|
|
||||||
|
for i in range(5):
|
||||||
|
for q in queues:
|
||||||
|
q.put_nowait(i)
|
||||||
|
logger.info('Sleeping')
|
||||||
|
sleep(1.0)
|
||||||
|
logger.info('Done')
|
||||||
@@ -15,7 +15,7 @@ logger = logging.getLogger(__name__)
|
|||||||
class AppDaemonRunContext:
|
class AppDaemonRunContext:
|
||||||
_stack: ExitStack = field(default_factory=ExitStack)
|
_stack: ExitStack = field(default_factory=ExitStack)
|
||||||
stop_event: Event = field(default_factory=Event)
|
stop_event: Event = field(default_factory=Event)
|
||||||
shutdown_grace_period: float = 0.75
|
shutdown_grace_period: float = 0.1
|
||||||
|
|
||||||
loop: asyncio.AbstractEventLoop = field(init=False)
|
loop: asyncio.AbstractEventLoop = field(init=False)
|
||||||
executor: ThreadPoolExecutor = field(init=False)
|
executor: ThreadPoolExecutor = field(init=False)
|
||||||
@@ -37,10 +37,10 @@ class AppDaemonRunContext:
|
|||||||
logger.debug(f'Closing context from {self.__class__.__name__}')
|
logger.debug(f'Closing context from {self.__class__.__name__}')
|
||||||
self._stack.close()
|
self._stack.close()
|
||||||
|
|
||||||
def get_running_tasks(self, exclude_current: bool = True) -> list[asyncio.Task]:
|
def get_running_tasks(self) -> list[asyncio.Task]:
|
||||||
return [
|
return [
|
||||||
t for t in asyncio.all_tasks(self.loop)
|
t for t in asyncio.all_tasks(self.loop)
|
||||||
if exclude_current and t is not asyncio.current_task()
|
if t is not asyncio.current_task()
|
||||||
]
|
]
|
||||||
|
|
||||||
async def shutdown(self, signal=signal.SIGTERM):
|
async def shutdown(self, signal=signal.SIGTERM):
|
||||||
@@ -59,15 +59,17 @@ class AppDaemonRunContext:
|
|||||||
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
|
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
|
||||||
for t in tasks
|
for t in tasks
|
||||||
)
|
)
|
||||||
logger.debug(f'Allowing graceful shutdown from stop event for {
|
logger.debug(
|
||||||
self.shutdown_grace_period}s')
|
'Allowing graceful shutdown from stop event for %ss',
|
||||||
|
self.shutdown_grace_period
|
||||||
|
)
|
||||||
await asyncio.gather(*graceful, return_exceptions=True)
|
await asyncio.gather(*graceful, return_exceptions=True)
|
||||||
|
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
if task.cancelled():
|
if task.cancelled():
|
||||||
logger.warning(f'Cancelled {task.get_name()}')
|
logger.warning(f'Cancelled {task.get_name()}')
|
||||||
|
|
||||||
logger.info("Stopping asyncio event loop")
|
logger.info("Calling stop() for asyncio event loop")
|
||||||
self.loop.stop()
|
self.loop.stop()
|
||||||
else:
|
else:
|
||||||
logger.warning('Already started shutdown')
|
logger.warning('Already started shutdown')
|
||||||
@@ -109,3 +111,10 @@ class AppDaemonRunContext:
|
|||||||
return await asyncio.wait_for(coro, timeout)
|
return await asyncio.wait_for(coro, timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
print('Timed out')
|
print('Timed out')
|
||||||
|
|
||||||
|
async def sleep(self, delay: float):
|
||||||
|
"""Wrapper function for asyncio.sleep that suppresses and logs a task cancellation"""
|
||||||
|
try:
|
||||||
|
await self.run_in_executor(self.stop_event.wait, delay)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.debug('Cancelled during sleep')
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ class ADMain:
|
|||||||
if hasattr(self, 'ad'):
|
if hasattr(self, 'ad'):
|
||||||
self.ad.logger.info('Running asyncio event loop indefinitely...')
|
self.ad.logger.info('Running asyncio event loop indefinitely...')
|
||||||
self.run_context.loop.run_forever()
|
self.run_context.loop.run_forever()
|
||||||
|
self.ad.logger.debug('Gracefully stopped event loop')
|
||||||
else:
|
else:
|
||||||
logging.error('Running ADMain without context manager')
|
logging.error('Running ADMain without context manager')
|
||||||
|
|
||||||
@@ -59,7 +60,7 @@ if __name__ == '__main__':
|
|||||||
{
|
{
|
||||||
'version': 1,
|
'version': 1,
|
||||||
'disable_existing_loggers': False,
|
'disable_existing_loggers': False,
|
||||||
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}},
|
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name:20}[/] {message}'}},
|
||||||
'handlers': {
|
'handlers': {
|
||||||
'rich': {
|
'rich': {
|
||||||
'()': 'rich.logging.RichHandler',
|
'()': 'rich.logging.RichHandler',
|
||||||
@@ -69,9 +70,10 @@ if __name__ == '__main__':
|
|||||||
'markup': True
|
'markup': True
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
'root': {'level': 'INFO', 'handlers': ['rich']},
|
'root': {'level': 'DEBUG', 'handlers': ['rich']},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
|
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
|
||||||
main.run()
|
main.run()
|
||||||
|
main.ad.logger.debug('Exiting main context from with statement')
|
||||||
|
|||||||
@@ -29,45 +29,42 @@ class ADSubsystem:
|
|||||||
name = f'_{self.__class__.__name__.lower()}'
|
name = f'_{self.__class__.__name__.lower()}'
|
||||||
self.logger = getLogger(f'AppDaemon.{name}')
|
self.logger = getLogger(f'AppDaemon.{name}')
|
||||||
self.create_task = self.AD.create_task
|
self.create_task = self.AD.create_task
|
||||||
|
self.sleep = self.AD.context.sleep
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.logger.debug(f'Starting {self.__class__.__name__}')
|
self.logger.debug(f'Starting {self.__class__.__name__}')
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
self.logger.debug(f'Exiting {self.__class__.__name__}')
|
pass
|
||||||
|
# self.logger.debug(f'Exiting {self.__class__.__name__}')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def stopping(self) -> bool:
|
def stopping(self) -> bool:
|
||||||
return self.stop.is_set()
|
return self.stop.is_set()
|
||||||
|
|
||||||
async def sleep(self, delay: float):
|
|
||||||
try:
|
|
||||||
if not self.stopping:
|
|
||||||
await asyncio.sleep(delay)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
self.logger.debug('Cancelled during sleep')
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Utility(ADSubsystem):
|
class Utility(ADSubsystem):
|
||||||
loop_rate: float = 0.25
|
loop_rate: float = 1.0
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
super().__enter__()
|
super().__enter__()
|
||||||
self.create_task(self.loop(), 'Utility loop', critical=True)
|
self.create_task(self.loop(), 'utility loop', critical=True)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def loop(self):
|
async def loop(self):
|
||||||
while not self.stopping:
|
while not self.stopping:
|
||||||
self.logger.debug('Looping...')
|
self.logger.debug('Looping...')
|
||||||
await self.sleep(self.loop_rate)
|
await self.sleep(self.loop_rate)
|
||||||
self.logger.debug('Stopped utility loop gracefully')
|
|
||||||
|
task_name = asyncio.current_task().get_name()
|
||||||
|
self.logger.debug(f'Gracefully stopped {task_name} task')
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Plugin(ADSubsystem):
|
class Plugin(ADSubsystem):
|
||||||
state: dict[str, int] = field(default_factory=dict)
|
state: dict[str, int] = field(default_factory=dict)
|
||||||
update_rate: float = 2.0
|
update_rate: float = 5.0
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
super().__post_init__()
|
super().__post_init__()
|
||||||
@@ -96,11 +93,13 @@ class Plugin(ADSubsystem):
|
|||||||
)
|
)
|
||||||
loop_time = perf_counter()
|
loop_time = perf_counter()
|
||||||
|
|
||||||
# if self.state['update_count'] == 2:
|
if self.state['update_count'] == 2:
|
||||||
# raise ValueError('fake error')
|
raise ValueError('fake error')
|
||||||
|
|
||||||
await self.sleep(self.update_rate)
|
await self.sleep(self.update_rate)
|
||||||
self.logger.debug('Stopped plugin updates gracefully')
|
|
||||||
|
task_name = asyncio.current_task().get_name()
|
||||||
|
self.logger.debug(f'Gracefully stopped {task_name} task')
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
Reference in New Issue
Block a user