import json import logging import sys from enum import Enum from pathlib import Path from typing import Annotated, Literal, Optional, TypedDict from appdaemon.plugins.hass.hassapi import Hass from appdaemon.plugins.mqtt.mqttapi import Mqtt from pydantic import BaseModel, Field, TypeAdapter, field_validator rc_path = (Path(__file__).resolve().parents[1] / 'room_control').as_posix() sys.path.insert(0, rc_path) from console import console, setup_component_logging class Side(int, Enum): back = 0 right = 1 top = 2 front = 3 left = 4 bottom = 5 Actions = Literal[ '', 'wakeup', 'slide', 'shake', 'rotate_left', 'rotate_right', 'flip180', 'flip90' ] class CubeEvent(BaseModel): action: Optional[Actions] = None side: Optional[Side] = Field(default=None, ge=0) action_angle: Optional[float] = None action_side: Optional[Side] = None action_from_side: Optional[Side] = None action_to_side: Optional[Side] = None battery: Optional[int] = Field(default=None, ge=0) current: Optional[int] = Field(default=None, ge=0) device_temperature: Optional[int] = Field(default=None, ge=0) linkquality: Optional[int] = Field(default=None, ge=0) power: Optional[int] = Field(default=None, ge=0) power_outage_count: Optional[int] = Field(default=None, ge=0) voltage: Optional[int] = Field(default=None, ge=0) class MQTTResponse(BaseModel): topic: str payload: CubeEvent @field_validator('payload', mode='before') def payload_str(cls, v: str): return CubeEvent.model_validate_json(v) class CallbackEntry(BaseModel): entity: str event: Optional[str] = None type: str kwargs: str function: str name: str pin_app: bool pin_thread: int class Callbacks(TypedDict): pass Callbacks = dict[str, dict[str, CallbackEntry]] class AqaraCube(Hass, Mqtt): def initialize(self): setup_component_logging(self) self.logger.setLevel(self.args.get('rich', logging.INFO)) topic = f'zigbee2mqtt/{self.args["cube"]}' self.mqtt_subscribe(topic, namespace='mqtt') self.listen_event(self.handle_event, 'MQTT_MESSAGE', topic=topic, namespace='mqtt') self.log(f'Listening for cube events on: [topic]{topic}[/]') self.app = self.get_app(self.args['app']) self.log(f'Connected to AD app: [room]{self.app.name}[/]') self.log(self.callbacks()) def terminate(self): self.log('[bold red]Terminating[/]', level='DEBUG') def callbacks(self) -> dict[str, CallbackEntry]: data = TypeAdapter(Callbacks).validate_python(self.get_callback_entries()) try: return data[self.name] except KeyError: return [] def handle_event(self, event_name, data, cb_args): data = MQTTResponse.model_validate(data) action = data.payload.action if action == '' or action == 'wakeup': return else: self.log( f'{event_name} on [topic]{data.topic}[/], Action: "[yellow]{str(action)}[/]"', level='DEBUG' ) if (arg := self.args.get(action, False)): self.action_handler(action=action, description=arg) elif handler := getattr(self, f'handle_{action}', None): handler(data.payload) def action_handler(self, action: str, description: str): self.log(f'{self.args["cube"]}: {action}: {description}') if description == 'activate': self.app.activate(cause=f'{self.args["cube"]}: {action}') elif description.startswith('scene.'): self.call_service('scene/turn_on', entity_id=description, namespace='default') self.log(f'Turned on {description}') elif description.startswith('toggle'): cause = f'{self.args["cube"]} {action}' self.app.toggle_activate(kwargs={'cause': cause}) else: self.log(f'Unhandled action: {action}', level='WARNING')