diff --git a/callbacks.py b/callbacks.py new file mode 100644 index 0000000..f0771a4 --- /dev/null +++ b/callbacks.py @@ -0,0 +1,97 @@ +import asyncio +from cgitb import handler +from copy import deepcopy +from logging import Logger +from typing import TYPE_CHECKING, Any, Literal + +import appdaemon.utils as utils + +if TYPE_CHECKING: + from appdaemon.appdaemon import AppDaemon + + +class Callbacks: + """Container for storing callbacks. Modified by :class:`~.events.Events` and :class:`~.state.State`""" + + AD: "AppDaemon" + """Reference to the AppDaemon container object + """ + logger: Logger + """Standard python logger named ``AppDaemon._callbacks`` + """ + diag: Logger + """Standard python logger named ``Diag`` + """ + callbacks: dict[str, dict[str, dict[str, Any]]] + """Nested dictionary to internally track the callbacks. + + Arranged by app_name.handle.properties + """ + callbacks_lock: asyncio.Lock + + def __init__(self, ad: "AppDaemon"): + self.AD = ad + self.logger = ad.logging.get_child("_callbacks") + self.diag = ad.logging.get_diag() + self.callbacks = {} + self.callbacks_lock = asyncio.Lock() + + # + # Diagnostic + # + + async def add_callback(self): + return + + async def cancel_callback(self, handle: str, name: str, silent: bool = False): + async with self.callbacks_lock: + if (callbacks := self.callbacks.get(name)) \ + and (callback := callbacks.pop(handle, False)): + await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}") + return True + elif not silent: + self.logger.warning(f"Invalid callback handle '{handle}' in cancel_callback()") + + async def cancel_all_callbacks(self, name: str, silent: bool = False): + async with self.callbacks_lock: + if callbacks := self.callbacks.pop(name, False): + self.logger.debug("Clearing %s callbacks for %s", len(callbacks), name) + for handle, cb_info in callbacks.items(): + cb_type: Literal["event", "state", "log"] = cb_info['type'] + await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}") + elif not silent: + self.logger.debug("cancel_all_callbacks() called for %s", name) + + async def get_callback_entries(self, type: str = "all"): + async with self.callbacks_lock: + return { + app_name: { + handle: { + "entity": cb_info.get("entity"), + "event": cb_info.get("event"), + "type": cb_info["type"], + "kwargs": deepcopy(cb_info["kwargs"]), + "function": cb_info["function"].__name__, + "name": cb_info["name"], + "pin_app": cb_info["pin_app"], + "pin_thread": cb_info["pin_thread"] if cb_info["pin_thread"] != -1 else None, + } + for handle, cb_info in app_callbacks.items() + if type == 'all' or type == cb_info["type"] + } + for app_name, app_callbacks in self.callbacks.items() + } + + async def get_callback_handles(self, app: str = 'all', type: str = 'all', entity_id: str = 'all'): + async with self.callbacks_lock: + handles = set( + handle + for app_name, app_callbacks in self.callbacks.items() + if app == 'all' or app == app_name + for handle, cb_info in app_callbacks.items() + if (type == 'all' or type == cb_info["type"]) + and (entity_id == 'all' or entity_id == cb_info["entity"]) + ) + self.logger.debug(f"Got {len(handles)} callbacks for app={app}, type={type}, entity_id={entity_id}") + return handles + diff --git a/state.py b/state.py new file mode 100644 index 0000000..eadef00 --- /dev/null +++ b/state.py @@ -0,0 +1,58 @@ + +from logging import Logger +from typing import Any + +from appdaemon import AppDaemon + + +class States: + AD: AppDaemon + logger: Logger + + async def process_state_callbacks(self, namespace: str, state: dict[str, Any]): + data: dict[str, Any] = state["data"] + self.logger.debug(data) + + entity_id: str = data["entity_id"] + device, entity = entity_id.split(".") + + async def _generate_dispatch_kwargs(): + state_callback_dict = await self.AD.callbacks.get_callbacks(namespace, type="state") + for uuid_, callback in state_callback_dict.items(): + dispatch_kwargs = { + 'name': callback["name"], + 'funcref': callback["function"], + 'entity': entity_id, + 'attribute': callback['kwargs'].get('attribute', 'state'), + 'new_state': data["new_state"], + 'old_state': data["old_state"], + 'cold': callback["kwargs"].get("old"), + 'cnew': callback["kwargs"].get("new"), + 'kwargs': callback["kwargs"], + 'uuid_': uuid_, + 'pin_app': callback["pin_app"], + 'pin_thread': callback["pin_thread"], + } + is_oneshot = callback["kwargs"].get("oneshot", False) + remove = {'name': callback["name"], 'handle': uuid_} + + if (centity := callback.get('entity')) and "." in centity: + cdevice, centity = centity.split(".") + else: + cdevice = None + + if ( + (cdevice is None) + or (centity is None and device == cdevice) + or (centity == entity and device == cdevice) + ): + yield dispatch_kwargs, is_oneshot, remove + + async def _send_dispatches(): + async for dispatch_kwargs, is_oneshot, remove in _generate_dispatch_kwargs(): + executed = await self.AD.threading.check_and_dispatch_state(**dispatch_kwargs) + if executed and is_oneshot: + yield remove + + async for remove in _send_dispatches(): + await self.cancel_state_callback(**remove)