from appdaemon import Hass from appdaemon.entity import Entity from appdaemon.plugins.hass.notifications import AndroidNotification class CriticalAlert(Hass): msg: str alert_active: bool = False alert_handle: str = None context: str = None def initialize(self): self.set_log_level('DEBUG') self.critical_sensor.listen_state(self.alert) self.listen_notification_action(self.clear_alert, 'clear') @property def msg(self) -> str: return self.args['msg'] @property def device(self) -> str: return self.args['device'] @property def critical_sensor(self) -> Entity: return self.get_entity(self.args['sensor']) @property def critical_notification(self) -> AndroidNotification: n = AndroidNotification( device=self.args['device'], message=self.msg, tag=self.name, ) n.color = 'red' n.icon = 'fire-alert' n.add_action('clear', 'Clear Alert') return n def alert(self, entity: str, attribute: str, old: str, new: str, **kwargs): self.alert_active = new == 'on' if self.alert_active: self.alert_handle = self.run_every( self.repeat_alert, start='now', interval=2.0 ) self.log(f'Alert Handle: {self.alert_handle}', level='DEBUG') else: if self.alert_handle: self.clear_alert() def repeat_alert(self, **kwargs): if self.alert_active: self.android_tts( device=self.device, tts_text=self.msg, critical=True ) self.call_service(**self.critical_notification.to_service_call()) def clear_alert(self, event_name: str = None, data: dict = None, **cb_args: dict): self.log(event_name, level='DEBUG') if self.alert_active: self.alert_active = False self.cancel_timer(self.alert_handle) self.alert_handle = None