from enum import Enum from logging import Logger from typing import TYPE_CHECKING, Literal, Optional from appdaemon.plugins.hass.hassapi import Hass from appdaemon.plugins.mqtt.mqttapi import Mqtt from pydantic import BaseModel, Field, TypeAdapter, field_validator from room_control import console if TYPE_CHECKING: from room_control import RoomController class Side(int, Enum): back = 0 right = 1 top = 2 front = 3 left = 4 bottom = 5 Actions = Literal[ '', 'wakeup', 'slide', 'shake', 'rotate_left', 'rotate_right', 'flip180', 'flip90' ] class CubeEvent(BaseModel): action: Optional[Actions] = None side: Optional[Side] = Field(default=None, ge=0) action_angle: Optional[float] = None action_side: Optional[Side] = None action_from_side: Optional[Side] = None action_to_side: Optional[Side] = None battery: Optional[int] = Field(default=None, ge=0) current: Optional[int] = Field(default=None, ge=0) device_temperature: Optional[int] = Field(default=None, ge=0) linkquality: Optional[int] = Field(default=None, ge=0) power: Optional[int] = Field(default=None, ge=0) power_outage_count: Optional[int] = Field(default=None, ge=0) voltage: Optional[int] = Field(default=None, ge=0) class MQTTResponse(BaseModel): topic: str payload: CubeEvent @field_validator('payload', mode='before') def payload_str(cls, v: str): return CubeEvent.model_validate_json(v) class CallbackEntry(BaseModel): entity: str event: Optional[str] = None type: Literal['state', 'event'] kwargs: str function: str name: str pin_app: bool pin_thread: int Callbacks = dict[str, dict[str, CallbackEntry]] class AqaraCube(Hass, Mqtt): app: 'RoomController' logger: Logger def initialize(self): self.app: 'RoomController' = self.get_app(self.args['app']) self.logger = console.load_rich_config(self.app.name, type(self).__name__, level='DEBUG') topic = f'zigbee2mqtt/{self.args["cube"]}' self.mqtt_subscribe(topic, namespace='mqtt') self.listen_event(self.handle_event, 'MQTT_MESSAGE', topic=topic, namespace='mqtt') self.log(f'Listening for cube events on: [topic]{topic}[/]') # self.log(f'Number of callbacks: {len(self.callbacks())}') # for handle, cb in self.callbacks().items(): # self.log(repr(cb)) def terminate(self): self.log('[bold red]Terminating[/]', level='DEBUG') def callbacks(self) -> dict[str, CallbackEntry]: data = TypeAdapter(Callbacks).validate_python(self.get_callback_entries()) name: str = self.name try: return data[name] except KeyError: return def handle_event(self, event_name, data, cb_args): data = MQTTResponse.model_validate(data) action = data.payload.action if action == '' or action == 'wakeup': return else: self.log( f'{event_name} on [topic]{data.topic}[/], Action: "[yellow]{str(action)}[/]"', level='DEBUG', ) if arg := self.args.get(action, False): self.action_handler(action=action, description=arg) elif handler := getattr(self, f'handle_{action}', None): handler(data.payload) def action_handler(self, action: str, description: str): self.log(f'{self.args["cube"]}: {action}: {description}') if description == 'activate': self.app.activate(cause=f'{self.args["cube"]}: {action}') elif description.startswith('scene.'): self.call_service('scene/turn_on', entity_id=description, namespace='default') self.log(f'Turned on {description}') elif description.startswith('toggle'): cause = f'{self.args["cube"]} {action}' self.app.toggle_activate(kwargs={'cause': cause}) else: self.log(f'Unhandled action: {action}', level='WARNING')