import asyncio import concurrent import concurrent.futures import functools import inspect from typing import Any, Callable, Coroutine class AppDaemon: async def run_async_sync_func(self, method, *args, timeout: float | None = None, **kwargs): if inspect.iscoroutinefunction(method): result = await method(*args, timeout=timeout, **kwargs) else: result = await self.run_in_executor(self, method, *args, timeout=timeout, **kwargs) return result async def run_in_executor( self, func: Callable, *args, timeout: float | None = None, **kwargs ) -> Any: """Run the sync function using the ThreadPoolExecutor and await the result""" timeout = timeout or self.AD.internal_function_timeout coro = self.AD.loop.run_in_executor( self.AD.executor, functools.partial(func, **kwargs), *args, ) try: return await asyncio.wait_for(coro, timeout) except asyncio.TimeoutError: self.logger.warning( "Function (%s) took too long (%s seconds), cancelling the task...", func.__name__, timeout, ) def run_coroutine_threadsafe(self, coro: Coroutine, timeout: float | None = None) -> Any: timeout = timeout or self.AD.internal_function_timeout if self.AD.loop.is_running(): try: future = asyncio.run_coroutine_threadsafe(coro, self.AD.loop) return future.result(timeout) except (asyncio.TimeoutError, concurrent.futures.TimeoutError): self.logger.warning( "Coroutine (%s) took too long (%s seconds), cancelling the task...", coro, timeout, ) future.cancel() else: self.logger.warning("LOOP NOT RUNNING. Returning NONE.")