from copy import deepcopy from logging import Logger from typing import Any from appdaemon import AppDaemon class States: AD: AppDaemon logger: Logger async def process_state_callbacks(self, namespace: str, state: dict[str, Any]): data: dict[str, Any] = state["data"] self.logger.debug(data) entity_id: str = data["entity_id"] device, entity = entity_id.split(".") async def _generate_dispatch_kwargs(): state_callback_dict = await self.AD.callbacks.get_callbacks(namespace, type="state") for uuid_, callback in state_callback_dict.items(): dispatch_kwargs = { 'name': callback["name"], 'funcref': callback["function"], 'entity': entity_id, 'attribute': callback['kwargs'].get('attribute', 'state'), 'new_state': data["new_state"], 'old_state': data["old_state"], 'cold': callback["kwargs"].get("old"), 'cnew': callback["kwargs"].get("new"), 'kwargs': callback["kwargs"], 'uuid_': uuid_, 'pin_app': callback["pin_app"], 'pin_thread': callback["pin_thread"], } is_oneshot = callback["kwargs"].get("oneshot", False) remove = {'name': callback["name"], 'handle': uuid_} if (centity := callback.get('entity')) and "." in centity: cdevice, centity = centity.split(".") else: cdevice = None if ( (cdevice is None) or (centity is None and device == cdevice) or (centity == entity and device == cdevice) ): yield dispatch_kwargs, is_oneshot, remove async def _send_dispatches(): async for dispatch_kwargs, is_oneshot, remove in _generate_dispatch_kwargs(): executed = await self.AD.threading.check_and_dispatch_state(**dispatch_kwargs) if executed and is_oneshot: yield remove async for remove in _send_dispatches(): await self.cancel_state_callback(**remove) async def get_state( self, name: str, namespace: str, entity_id: str | None = None, attribute: str | None = None, default: Any = None, copy: bool = True, ): self.logger.debug("get_state: %s.%s %s %s", entity_id, attribute, default, copy) result = default if ns := self.state.get(namespace): # Process entity_id input if entity_id is None: result = ns # TODO: filter by attribute? elif "." not in entity_id: domain = entity_id result = { eid: state for eid, state in ns.items() if eid.startswith(domain) } elif full_state := ns.get(entity_id): result = full_state else: self.logger.warning(f"Entity {entity_id} does not exist in namespace {namespace}") return # Process attribute input if attribute == "all": result = result elif attr := full_state.get(attribute): result = attr elif attr := full_state.get('attributes', {}).get(attribute): result = attr elif state := full_state.get("state"): result = state return deepcopy(result) if copy else result else: self.logger.warning(f"Namespace does not exist: {namespace}") async def cancel_state_callback(self, handle: str, name: str, silent: bool = False) -> bool: return await self.AD.callbacks.cancel_callback(handle, name, silent)