This commit is contained in:
John Lancaster
2024-10-16 03:56:07 +00:00
parent 5af940f077
commit 5e458aca41
3 changed files with 9 additions and 10 deletions

55
appdaemon/appdaemon.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
import concurrent
import concurrent.futures
import functools
import inspect
from typing import Any, Callable, Coroutine
class AppDaemon:
async def run_async_sync_func(self, method, *args, timeout: float | None = None, **kwargs):
if inspect.iscoroutinefunction(method):
result = await method(*args, timeout=timeout, **kwargs)
else:
result = await self.run_in_executor(self, method, *args, timeout=timeout, **kwargs)
return result
async def run_in_executor(
self,
func: Callable,
*args,
timeout: float | None = None,
**kwargs
) -> Any:
"""Run the sync function using the ThreadPoolExecutor and await the result"""
timeout = timeout or self.AD.internal_function_timeout
coro = self.AD.loop.run_in_executor(
self.AD.executor,
functools.partial(func, **kwargs),
*args,
)
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
self.logger.warning(
"Function (%s) took too long (%s seconds), cancelling the task...",
func.__name__, timeout,
)
def run_coroutine_threadsafe(self, coro: Coroutine, timeout: float | None = None) -> Any:
timeout = timeout or self.AD.internal_function_timeout
if self.AD.loop.is_running():
try:
future = asyncio.run_coroutine_threadsafe(coro, self.AD.loop)
return future.result(timeout)
except (asyncio.TimeoutError, concurrent.futures.TimeoutError):
self.logger.warning(
"Coroutine (%s) took too long (%s seconds), cancelling the task...",
coro, timeout,
)
future.cancel()
else:
self.logger.warning("LOOP NOT RUNNING. Returning NONE.")

116
appdaemon/callbacks.py Normal file
View File

@@ -0,0 +1,116 @@
import asyncio
from copy import deepcopy
from logging import Logger
from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING:
from appdaemon.appdaemon import AppDaemon
class Callbacks:
"""Container for storing callbacks. Modified by :class:`~.events.Events` and :class:`~.state.State`"""
AD: "AppDaemon"
"""Reference to the AppDaemon container object
"""
logger: Logger
"""Standard python logger named ``AppDaemon._callbacks``
"""
diag: Logger
"""Standard python logger named ``Diag``
"""
callbacks: dict[str, dict[str, dict[str, Any]]]
"""Nested dictionary to internally track the callbacks.
Arranged by app_name.handle.properties
"""
callbacks_lock: asyncio.Lock
def __init__(self, ad: "AppDaemon"):
self.AD = ad
self.logger = ad.logging.get_child("_callbacks")
self.diag = ad.logging.get_diag()
self.callbacks = {}
self.callbacks_lock = asyncio.Lock()
#
# Diagnostic
#
async def add_callback(self):
return
async def cancel_callback(self, handle: str, name: str, silent: bool = False):
async with self.callbacks_lock:
if (callbacks := self.callbacks.get(name)) \
and (callback := callbacks.pop(handle, False)):
await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}")
return True
elif not silent:
self.logger.warning(f"Invalid callback handle '{
handle}' in cancel_callback()")
async def cancel_all_callbacks(self, name: str, silent: bool = False):
async with self.callbacks_lock:
if callbacks := self.callbacks.pop(name, False):
self.logger.debug(
"Clearing %s callbacks for %s", len(callbacks), name)
for handle, cb_info in callbacks.items():
cb_type: Literal["event", "state", "log"] = cb_info['type']
await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}")
elif not silent:
self.logger.debug("cancel_all_callbacks() called for %s", name)
async def get_callback_entries(self, type: str = "all"):
async with self.callbacks_lock:
return {
app_name: {
handle: {
"entity": cb_info.get("entity"),
"event": cb_info.get("event"),
"type": cb_info["type"],
"kwargs": deepcopy(cb_info["kwargs"]),
"function": cb_info["function"].__name__,
"name": cb_info["name"],
"pin_app": cb_info["pin_app"],
"pin_thread": cb_info["pin_thread"] if cb_info["pin_thread"] != -1 else None,
}
for handle, cb_info in app_callbacks.items()
if type == 'all' or type == cb_info["type"]
}
for app_name, app_callbacks in self.callbacks.items()
}
async def get_callbacks(
self,
namespace: str = 'all',
app: str = 'all',
type: str = 'all',
entity_id: str = 'all',
copy: bool = True,
) -> dict[str, dict[str, Any]]:
async with self.callbacks_lock:
return {
handle: deepcopy(cb_info) if copy else cb_info
for app_name, app_callbacks in self.callbacks.items()
if app == 'all' or app == app_name
for handle, cb_info in app_callbacks.items()
if (type == 'all' or type == cb_info["type"])
and (entity_id == 'all' or entity_id == cb_info["entity"])
and (
namespace == 'all'
or namespace == 'global'
or cb_info["namespace"] == 'global'
or namespace == cb_info["namespace"]
)
}
async def get_callback_handles(
self,
namespace: str = 'all',
app: str = 'all',
type: str = 'all',
entity_id: str = 'all'
) -> set[str]:
callbacks = await self.get_callbacks(namespace, app, type, entity_id, copy=False)
return set(callbacks.keys())

107
appdaemon/state.py Normal file
View File

@@ -0,0 +1,107 @@
from copy import deepcopy
from logging import Logger
from typing import Any
from appdaemon import AppDaemon
class States:
AD: AppDaemon
logger: Logger
async def process_state_callbacks(self, namespace: str, state: dict[str, Any]):
data: dict[str, Any] = state["data"]
self.logger.debug(data)
entity_id: str = data["entity_id"]
device, entity = entity_id.split(".")
async def _generate_dispatch_kwargs():
state_callback_dict = await self.AD.callbacks.get_callbacks(namespace, type="state")
for uuid_, callback in state_callback_dict.items():
dispatch_kwargs = {
'name': callback["name"],
'funcref': callback["function"],
'entity': entity_id,
'attribute': callback['kwargs'].get('attribute', 'state'),
'new_state': data["new_state"],
'old_state': data["old_state"],
'cold': callback["kwargs"].get("old"),
'cnew': callback["kwargs"].get("new"),
'kwargs': callback["kwargs"],
'uuid_': uuid_,
'pin_app': callback["pin_app"],
'pin_thread': callback["pin_thread"],
}
is_oneshot = callback["kwargs"].get("oneshot", False)
remove = {'name': callback["name"], 'handle': uuid_}
if (centity := callback.get('entity')) and "." in centity:
cdevice, centity = centity.split(".")
else:
cdevice = None
if (
(cdevice is None)
or (centity is None and device == cdevice)
or (centity == entity and device == cdevice)
):
yield dispatch_kwargs, is_oneshot, remove
async def _send_dispatches():
async for dispatch_kwargs, is_oneshot, remove in _generate_dispatch_kwargs():
executed = await self.AD.threading.check_and_dispatch_state(**dispatch_kwargs)
if executed and is_oneshot:
yield remove
async for remove in _send_dispatches():
await self.cancel_state_callback(**remove)
async def get_state(
self,
name: str,
namespace: str,
entity_id: str | None = None,
attribute: str | None = None,
default: Any = None,
copy: bool = True,
):
self.logger.debug("get_state: %s.%s %s %s", entity_id, attribute, default, copy)
result = default
if ns := self.state.get(namespace):
# Process entity_id input
if entity_id is None:
result = ns
# TODO: filter by attribute?
elif "." not in entity_id:
domain = entity_id
result = {
eid: state
for eid, state in ns.items()
if eid.startswith(domain)
}
elif full_state := ns.get(entity_id):
result = full_state
else:
self.logger.warning(f"Entity {entity_id} does not exist in namespace {namespace}")
return
# Process attribute input
if attribute == "all":
result = result
elif attr := full_state.get(attribute):
result = attr
elif attr := full_state.get('attributes', {}).get(attribute):
result = attr
elif state := full_state.get("state"):
result = state
return deepcopy(result) if copy else result
else:
self.logger.warning(f"Namespace does not exist: {namespace}")
async def cancel_state_callback(self, handle: str, name: str, silent: bool = False) -> bool:
return await self.AD.callbacks.cancel_callback(handle, name, silent)