From 5af940f077f7f1095532f063ad3c599231c92aaf Mon Sep 17 00:00:00 2001 From: John Lancaster <32917998+jsl12@users.noreply.github.com> Date: Wed, 16 Oct 2024 03:55:32 +0000 Subject: [PATCH] added some threading stuff to appdaemon object --- appdaemon.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 appdaemon.py diff --git a/appdaemon.py b/appdaemon.py new file mode 100644 index 0000000..a1c5563 --- /dev/null +++ b/appdaemon.py @@ -0,0 +1,55 @@ +import asyncio +import concurrent +import concurrent.futures +import functools +import inspect +from typing import Any, Callable, Coroutine + + +class AppDaemon: + async def run_async_sync_func(self, method, *args, timeout: float | None = None, **kwargs): + if inspect.iscoroutinefunction(method): + result = await method(*args, timeout=timeout, **kwargs) + else: + result = await self.run_in_executor(self, method, *args, timeout=timeout, **kwargs) + return result + + async def run_in_executor( + self, + func: Callable, + *args, + timeout: float | None = None, + **kwargs + ) -> Any: + """Run the sync function using the ThreadPoolExecutor and await the result""" + timeout = timeout or self.AD.internal_function_timeout + + coro = self.AD.loop.run_in_executor( + self.AD.executor, + functools.partial(func, **kwargs), + *args, + ) + + try: + return await asyncio.wait_for(coro, timeout) + except asyncio.TimeoutError: + self.logger.warning( + "Function (%s) took too long (%s seconds), cancelling the task...", + func.__name__, timeout, + ) + + def run_coroutine_threadsafe(self, coro: Coroutine, timeout: float | None = None) -> Any: + timeout = timeout or self.AD.internal_function_timeout + + if self.AD.loop.is_running(): + try: + future = asyncio.run_coroutine_threadsafe(coro, self.AD.loop) + return future.result(timeout) + except (asyncio.TimeoutError, concurrent.futures.TimeoutError): + self.logger.warning( + "Coroutine (%s) took too long (%s seconds), cancelling the task...", + coro, timeout, + ) + future.cancel() + else: + self.logger.warning("LOOP NOT RUNNING. Returning NONE.")