127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
from enum import Enum
|
|
from logging import Logger
|
|
from typing import TYPE_CHECKING, Literal, Optional
|
|
|
|
from appdaemon.plugins.hass.hassapi import Hass
|
|
from appdaemon.plugins.mqtt.mqttapi import Mqtt
|
|
from pydantic import BaseModel, Field, TypeAdapter, field_validator
|
|
from room_control import console
|
|
|
|
if TYPE_CHECKING:
|
|
from room_control import RoomController
|
|
|
|
|
|
class Side(int, Enum):
|
|
back = 0
|
|
right = 1
|
|
top = 2
|
|
front = 3
|
|
left = 4
|
|
bottom = 5
|
|
|
|
|
|
Actions = Literal[
|
|
'', 'wakeup', 'slide', 'shake', 'rotate_left', 'rotate_right', 'flip180', 'flip90'
|
|
]
|
|
|
|
|
|
class CubeEvent(BaseModel):
|
|
action: Optional[Actions] = None
|
|
side: Optional[Side] = Field(default=None, ge=0)
|
|
action_angle: Optional[float] = None
|
|
action_side: Optional[Side] = None
|
|
action_from_side: Optional[Side] = None
|
|
action_to_side: Optional[Side] = None
|
|
battery: Optional[int] = Field(default=None, ge=0)
|
|
current: Optional[int] = Field(default=None, ge=0)
|
|
device_temperature: Optional[int] = Field(default=None, ge=0)
|
|
linkquality: Optional[int] = Field(default=None, ge=0)
|
|
power: Optional[int] = Field(default=None, ge=0)
|
|
power_outage_count: Optional[int] = Field(default=None, ge=0)
|
|
voltage: Optional[int] = Field(default=None, ge=0)
|
|
|
|
|
|
class MQTTResponse(BaseModel):
|
|
topic: str
|
|
payload: CubeEvent
|
|
|
|
@field_validator('payload', mode='before')
|
|
def payload_str(cls, v: str):
|
|
return CubeEvent.model_validate_json(v)
|
|
|
|
|
|
class CallbackEntry(BaseModel):
|
|
entity: str
|
|
event: Optional[str] = None
|
|
type: Literal['state', 'event']
|
|
kwargs: str
|
|
function: str
|
|
name: str
|
|
pin_app: bool
|
|
pin_thread: int
|
|
|
|
|
|
Callbacks = dict[str, dict[str, CallbackEntry]]
|
|
|
|
|
|
class AqaraCube(Hass, Mqtt):
|
|
app: 'RoomController'
|
|
logger: Logger
|
|
|
|
def initialize(self):
|
|
self.app: 'RoomController' = self.get_app(self.args['app'])
|
|
self.logger = console.load_rich_config(self.app.name, type(self).__name__, level='DEBUG')
|
|
|
|
topic = f'zigbee2mqtt/{self.args["cube"]}'
|
|
self.mqtt_subscribe(topic, namespace='mqtt')
|
|
self.listen_event(self.handle_event, 'MQTT_MESSAGE', topic=topic, namespace='mqtt')
|
|
self.log(f'Listening for cube events on: [topic]{topic}[/]')
|
|
|
|
# self.log(f'Number of callbacks: {len(self.callbacks())}')
|
|
# for handle, cb in self.callbacks().items():
|
|
# self.log(repr(cb))
|
|
|
|
def terminate(self):
|
|
self.log('[bold red]Terminating[/]', level='DEBUG')
|
|
|
|
def callbacks(self) -> dict[str, CallbackEntry]:
|
|
data = TypeAdapter(Callbacks).validate_python(self.get_callback_entries())
|
|
name: str = self.name
|
|
try:
|
|
return data[name]
|
|
except KeyError:
|
|
return
|
|
|
|
def handle_event(self, event_name, data, **kwargs):
|
|
data = MQTTResponse.model_validate(data)
|
|
action = data.payload.action
|
|
|
|
if action == '' or action == 'wakeup':
|
|
return
|
|
else:
|
|
self.log(
|
|
f'{event_name} on [topic]{data.topic}[/], Action: "[yellow]{str(action)}[/]"',
|
|
level='DEBUG',
|
|
)
|
|
if arg := self.args.get(action, False):
|
|
self.action_handler(action=action, description=arg)
|
|
elif handler := getattr(self, f'handle_{action}', None):
|
|
handler(data.payload)
|
|
|
|
def action_handler(self, action: str, description: str):
|
|
self.log(f'{self.args["cube"]}: {action}: {description}')
|
|
|
|
if description == 'activate':
|
|
self.app.activate(cause=f'{self.args["cube"]}: {action}')
|
|
|
|
elif description.startswith('scene.'):
|
|
self.call_service('scene/turn_on', entity_id=description, namespace='default')
|
|
self.log(f'Turned on {description}')
|
|
|
|
elif description.startswith('toggle'):
|
|
cause = f'{self.args["cube"]} {action}'
|
|
self.app.toggle_activate(kwargs={'cause': cause})
|
|
|
|
else:
|
|
self.log(f'Unhandled action: {action}', level='WARNING')
|