added some threading stuff to appdaemon object
This commit is contained in:
55
appdaemon.py
Normal file
55
appdaemon.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
import asyncio
|
||||||
|
import concurrent
|
||||||
|
import concurrent.futures
|
||||||
|
import functools
|
||||||
|
import inspect
|
||||||
|
from typing import Any, Callable, Coroutine
|
||||||
|
|
||||||
|
|
||||||
|
class AppDaemon:
|
||||||
|
async def run_async_sync_func(self, method, *args, timeout: float | None = None, **kwargs):
|
||||||
|
if inspect.iscoroutinefunction(method):
|
||||||
|
result = await method(*args, timeout=timeout, **kwargs)
|
||||||
|
else:
|
||||||
|
result = await self.run_in_executor(self, method, *args, timeout=timeout, **kwargs)
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def run_in_executor(
|
||||||
|
self,
|
||||||
|
func: Callable,
|
||||||
|
*args,
|
||||||
|
timeout: float | None = None,
|
||||||
|
**kwargs
|
||||||
|
) -> Any:
|
||||||
|
"""Run the sync function using the ThreadPoolExecutor and await the result"""
|
||||||
|
timeout = timeout or self.AD.internal_function_timeout
|
||||||
|
|
||||||
|
coro = self.AD.loop.run_in_executor(
|
||||||
|
self.AD.executor,
|
||||||
|
functools.partial(func, **kwargs),
|
||||||
|
*args,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return await asyncio.wait_for(coro, timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.logger.warning(
|
||||||
|
"Function (%s) took too long (%s seconds), cancelling the task...",
|
||||||
|
func.__name__, timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
def run_coroutine_threadsafe(self, coro: Coroutine, timeout: float | None = None) -> Any:
|
||||||
|
timeout = timeout or self.AD.internal_function_timeout
|
||||||
|
|
||||||
|
if self.AD.loop.is_running():
|
||||||
|
try:
|
||||||
|
future = asyncio.run_coroutine_threadsafe(coro, self.AD.loop)
|
||||||
|
return future.result(timeout)
|
||||||
|
except (asyncio.TimeoutError, concurrent.futures.TimeoutError):
|
||||||
|
self.logger.warning(
|
||||||
|
"Coroutine (%s) took too long (%s seconds), cancelling the task...",
|
||||||
|
coro, timeout,
|
||||||
|
)
|
||||||
|
future.cancel()
|
||||||
|
else:
|
||||||
|
self.logger.warning("LOOP NOT RUNNING. Returning NONE.")
|
||||||
Reference in New Issue
Block a user