import asyncio from copy import deepcopy from logging import Logger from typing import TYPE_CHECKING, Any, Literal if TYPE_CHECKING: from appdaemon.appdaemon import AppDaemon class Callbacks: """Container for storing callbacks. Modified by :class:`~.events.Events` and :class:`~.state.State`""" AD: "AppDaemon" """Reference to the AppDaemon container object """ logger: Logger """Standard python logger named ``AppDaemon._callbacks`` """ diag: Logger """Standard python logger named ``Diag`` """ callbacks: dict[str, dict[str, dict[str, Any]]] """Nested dictionary to internally track the callbacks. Arranged by app_name.handle.properties """ callbacks_lock: asyncio.Lock def __init__(self, ad: "AppDaemon"): self.AD = ad self.logger = ad.logging.get_child("_callbacks") self.diag = ad.logging.get_diag() self.callbacks = {} self.callbacks_lock = asyncio.Lock() # # Diagnostic # async def add_callback(self): return async def cancel_callback(self, handle: str, name: str, silent: bool = False): async with self.callbacks_lock: if (callbacks := self.callbacks.get(name)) \ and (callback := callbacks.pop(handle, False)): await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}") return True elif not silent: self.logger.warning(f"Invalid callback handle '{ handle}' in cancel_callback()") async def cancel_all_callbacks(self, name: str, silent: bool = False): async with self.callbacks_lock: if callbacks := self.callbacks.pop(name, False): self.logger.debug( "Clearing %s callbacks for %s", len(callbacks), name) for handle, cb_info in callbacks.items(): cb_type: Literal["event", "state", "log"] = cb_info['type'] await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}") elif not silent: self.logger.debug("cancel_all_callbacks() called for %s", name) async def get_callback_entries(self, type: str = "all"): async with self.callbacks_lock: return { app_name: { handle: { "entity": cb_info.get("entity"), "event": cb_info.get("event"), "type": cb_info["type"], "kwargs": deepcopy(cb_info["kwargs"]), "function": cb_info["function"].__name__, "name": cb_info["name"], "pin_app": cb_info["pin_app"], "pin_thread": cb_info["pin_thread"] if cb_info["pin_thread"] != -1 else None, } for handle, cb_info in app_callbacks.items() if type == 'all' or type == cb_info["type"] } for app_name, app_callbacks in self.callbacks.items() } async def get_callbacks( self, namespace: str = 'all', app: str = 'all', type: str = 'all', entity_id: str = 'all', copy: bool = True, ) -> dict[str, dict[str, Any]]: async with self.callbacks_lock: return { handle: deepcopy(cb_info) if copy else cb_info for app_name, app_callbacks in self.callbacks.items() if app == 'all' or app == app_name for handle, cb_info in app_callbacks.items() if (type == 'all' or type == cb_info["type"]) and (entity_id == 'all' or entity_id == cb_info["entity"]) and ( namespace == 'all' or namespace == 'global' or cb_info["namespace"] == 'global' or namespace == cb_info["namespace"] ) } async def get_callback_handles( self, namespace: str = 'all', app: str = 'all', type: str = 'all', entity_id: str = 'all' ) -> set[str]: callbacks = await self.get_callbacks(namespace, app, type, entity_id, copy=False) return set(callbacks.keys())