Files
appdaemon_snippets/state.py
2024-10-16 03:50:57 +00:00

108 lines
3.9 KiB
Python

from copy import deepcopy
from logging import Logger
from typing import Any
from appdaemon import AppDaemon
class States:
AD: AppDaemon
logger: Logger
async def process_state_callbacks(self, namespace: str, state: dict[str, Any]):
data: dict[str, Any] = state["data"]
self.logger.debug(data)
entity_id: str = data["entity_id"]
device, entity = entity_id.split(".")
async def _generate_dispatch_kwargs():
state_callback_dict = await self.AD.callbacks.get_callbacks(namespace, type="state")
for uuid_, callback in state_callback_dict.items():
dispatch_kwargs = {
'name': callback["name"],
'funcref': callback["function"],
'entity': entity_id,
'attribute': callback['kwargs'].get('attribute', 'state'),
'new_state': data["new_state"],
'old_state': data["old_state"],
'cold': callback["kwargs"].get("old"),
'cnew': callback["kwargs"].get("new"),
'kwargs': callback["kwargs"],
'uuid_': uuid_,
'pin_app': callback["pin_app"],
'pin_thread': callback["pin_thread"],
}
is_oneshot = callback["kwargs"].get("oneshot", False)
remove = {'name': callback["name"], 'handle': uuid_}
if (centity := callback.get('entity')) and "." in centity:
cdevice, centity = centity.split(".")
else:
cdevice = None
if (
(cdevice is None)
or (centity is None and device == cdevice)
or (centity == entity and device == cdevice)
):
yield dispatch_kwargs, is_oneshot, remove
async def _send_dispatches():
async for dispatch_kwargs, is_oneshot, remove in _generate_dispatch_kwargs():
executed = await self.AD.threading.check_and_dispatch_state(**dispatch_kwargs)
if executed and is_oneshot:
yield remove
async for remove in _send_dispatches():
await self.cancel_state_callback(**remove)
async def get_state(
self,
name: str,
namespace: str,
entity_id: str | None = None,
attribute: str | None = None,
default: Any = None,
copy: bool = True,
):
self.logger.debug("get_state: %s.%s %s %s", entity_id, attribute, default, copy)
result = default
if ns := self.state.get(namespace):
# Process entity_id input
if entity_id is None:
result = ns
# TODO: filter by attribute?
elif "." not in entity_id:
domain = entity_id
result = {
eid: state
for eid, state in ns.items()
if eid.startswith(domain)
}
elif full_state := ns.get(entity_id):
result = full_state
else:
self.logger.warning(f"Entity {entity_id} does not exist in namespace {namespace}")
return
# Process attribute input
if attribute == "all":
result = result
elif attr := full_state.get(attribute):
result = attr
elif attr := full_state.get('attributes', {}).get(attribute):
result = attr
elif state := full_state.get("state"):
result = state
return deepcopy(result) if copy else result
else:
self.logger.warning(f"Namespace does not exist: {namespace}")
async def cancel_state_callback(self, handle: str, name: str, silent: bool = False) -> bool:
return await self.AD.callbacks.cancel_callback(handle, name, silent)