callbacks and state work
This commit is contained in:
97
callbacks.py
Normal file
97
callbacks.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
import asyncio
|
||||||
|
from cgitb import handler
|
||||||
|
from copy import deepcopy
|
||||||
|
from logging import Logger
|
||||||
|
from typing import TYPE_CHECKING, Any, Literal
|
||||||
|
|
||||||
|
import appdaemon.utils as utils
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from appdaemon.appdaemon import AppDaemon
|
||||||
|
|
||||||
|
|
||||||
|
class Callbacks:
|
||||||
|
"""Container for storing callbacks. Modified by :class:`~.events.Events` and :class:`~.state.State`"""
|
||||||
|
|
||||||
|
AD: "AppDaemon"
|
||||||
|
"""Reference to the AppDaemon container object
|
||||||
|
"""
|
||||||
|
logger: Logger
|
||||||
|
"""Standard python logger named ``AppDaemon._callbacks``
|
||||||
|
"""
|
||||||
|
diag: Logger
|
||||||
|
"""Standard python logger named ``Diag``
|
||||||
|
"""
|
||||||
|
callbacks: dict[str, dict[str, dict[str, Any]]]
|
||||||
|
"""Nested dictionary to internally track the callbacks.
|
||||||
|
|
||||||
|
Arranged by app_name.handle.properties
|
||||||
|
"""
|
||||||
|
callbacks_lock: asyncio.Lock
|
||||||
|
|
||||||
|
def __init__(self, ad: "AppDaemon"):
|
||||||
|
self.AD = ad
|
||||||
|
self.logger = ad.logging.get_child("_callbacks")
|
||||||
|
self.diag = ad.logging.get_diag()
|
||||||
|
self.callbacks = {}
|
||||||
|
self.callbacks_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
#
|
||||||
|
# Diagnostic
|
||||||
|
#
|
||||||
|
|
||||||
|
async def add_callback(self):
|
||||||
|
return
|
||||||
|
|
||||||
|
async def cancel_callback(self, handle: str, name: str, silent: bool = False):
|
||||||
|
async with self.callbacks_lock:
|
||||||
|
if (callbacks := self.callbacks.get(name)) \
|
||||||
|
and (callback := callbacks.pop(handle, False)):
|
||||||
|
await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}")
|
||||||
|
return True
|
||||||
|
elif not silent:
|
||||||
|
self.logger.warning(f"Invalid callback handle '{handle}' in cancel_callback()")
|
||||||
|
|
||||||
|
async def cancel_all_callbacks(self, name: str, silent: bool = False):
|
||||||
|
async with self.callbacks_lock:
|
||||||
|
if callbacks := self.callbacks.pop(name, False):
|
||||||
|
self.logger.debug("Clearing %s callbacks for %s", len(callbacks), name)
|
||||||
|
for handle, cb_info in callbacks.items():
|
||||||
|
cb_type: Literal["event", "state", "log"] = cb_info['type']
|
||||||
|
await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}")
|
||||||
|
elif not silent:
|
||||||
|
self.logger.debug("cancel_all_callbacks() called for %s", name)
|
||||||
|
|
||||||
|
async def get_callback_entries(self, type: str = "all"):
|
||||||
|
async with self.callbacks_lock:
|
||||||
|
return {
|
||||||
|
app_name: {
|
||||||
|
handle: {
|
||||||
|
"entity": cb_info.get("entity"),
|
||||||
|
"event": cb_info.get("event"),
|
||||||
|
"type": cb_info["type"],
|
||||||
|
"kwargs": deepcopy(cb_info["kwargs"]),
|
||||||
|
"function": cb_info["function"].__name__,
|
||||||
|
"name": cb_info["name"],
|
||||||
|
"pin_app": cb_info["pin_app"],
|
||||||
|
"pin_thread": cb_info["pin_thread"] if cb_info["pin_thread"] != -1 else None,
|
||||||
|
}
|
||||||
|
for handle, cb_info in app_callbacks.items()
|
||||||
|
if type == 'all' or type == cb_info["type"]
|
||||||
|
}
|
||||||
|
for app_name, app_callbacks in self.callbacks.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
async def get_callback_handles(self, app: str = 'all', type: str = 'all', entity_id: str = 'all'):
|
||||||
|
async with self.callbacks_lock:
|
||||||
|
handles = set(
|
||||||
|
handle
|
||||||
|
for app_name, app_callbacks in self.callbacks.items()
|
||||||
|
if app == 'all' or app == app_name
|
||||||
|
for handle, cb_info in app_callbacks.items()
|
||||||
|
if (type == 'all' or type == cb_info["type"])
|
||||||
|
and (entity_id == 'all' or entity_id == cb_info["entity"])
|
||||||
|
)
|
||||||
|
self.logger.debug(f"Got {len(handles)} callbacks for app={app}, type={type}, entity_id={entity_id}")
|
||||||
|
return handles
|
||||||
|
|
||||||
58
state.py
Normal file
58
state.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
|
||||||
|
from logging import Logger
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from appdaemon import AppDaemon
|
||||||
|
|
||||||
|
|
||||||
|
class States:
|
||||||
|
AD: AppDaemon
|
||||||
|
logger: Logger
|
||||||
|
|
||||||
|
async def process_state_callbacks(self, namespace: str, state: dict[str, Any]):
|
||||||
|
data: dict[str, Any] = state["data"]
|
||||||
|
self.logger.debug(data)
|
||||||
|
|
||||||
|
entity_id: str = data["entity_id"]
|
||||||
|
device, entity = entity_id.split(".")
|
||||||
|
|
||||||
|
async def _generate_dispatch_kwargs():
|
||||||
|
state_callback_dict = await self.AD.callbacks.get_callbacks(namespace, type="state")
|
||||||
|
for uuid_, callback in state_callback_dict.items():
|
||||||
|
dispatch_kwargs = {
|
||||||
|
'name': callback["name"],
|
||||||
|
'funcref': callback["function"],
|
||||||
|
'entity': entity_id,
|
||||||
|
'attribute': callback['kwargs'].get('attribute', 'state'),
|
||||||
|
'new_state': data["new_state"],
|
||||||
|
'old_state': data["old_state"],
|
||||||
|
'cold': callback["kwargs"].get("old"),
|
||||||
|
'cnew': callback["kwargs"].get("new"),
|
||||||
|
'kwargs': callback["kwargs"],
|
||||||
|
'uuid_': uuid_,
|
||||||
|
'pin_app': callback["pin_app"],
|
||||||
|
'pin_thread': callback["pin_thread"],
|
||||||
|
}
|
||||||
|
is_oneshot = callback["kwargs"].get("oneshot", False)
|
||||||
|
remove = {'name': callback["name"], 'handle': uuid_}
|
||||||
|
|
||||||
|
if (centity := callback.get('entity')) and "." in centity:
|
||||||
|
cdevice, centity = centity.split(".")
|
||||||
|
else:
|
||||||
|
cdevice = None
|
||||||
|
|
||||||
|
if (
|
||||||
|
(cdevice is None)
|
||||||
|
or (centity is None and device == cdevice)
|
||||||
|
or (centity == entity and device == cdevice)
|
||||||
|
):
|
||||||
|
yield dispatch_kwargs, is_oneshot, remove
|
||||||
|
|
||||||
|
async def _send_dispatches():
|
||||||
|
async for dispatch_kwargs, is_oneshot, remove in _generate_dispatch_kwargs():
|
||||||
|
executed = await self.AD.threading.check_and_dispatch_state(**dispatch_kwargs)
|
||||||
|
if executed and is_oneshot:
|
||||||
|
yield remove
|
||||||
|
|
||||||
|
async for remove in _send_dispatches():
|
||||||
|
await self.cancel_state_callback(**remove)
|
||||||
Reference in New Issue
Block a user