import json from copy import deepcopy from appdaemon.plugins.mqtt.mqttapi import Mqtt class AqaraCube(Mqtt): def initialize(self): self.set_namespace('mqtt') topic = f'zigbee2mqtt/{self.args["cube"]}' self.mqtt_subscribe(topic) self.listen_event(self.handle_event, "MQTT_MESSAGE", topic=topic) self.log(f'Listening for cube events on: {topic}') def handle_event(self, event_name, data, kwargs): topic = data['topic'] payload = json.loads(data['payload']) action = payload['action'] if action == '': return if (arg := self.args.get(action, None)) is not None: self.action_handler(action=action, description=arg) elif (handler := getattr(self, f'handle_{action}', None)): handler(payload) else: self.log(f'Unhandled cube action: {action}') def action_handler(self, action: str , description: str): self.log(f'{self.args["cube"]}: {action}: {description}') def handle_rotate_right(self, payload): self.log(f'{self.args["cube"]}: Rotate right') def handle_rotate_left(self, payload): self.log(f'{self.args["cube"]}: Rotate left') def handle_slide(self, payload): self.log(f'{self.args["cube"]}: Slide') def handle_flip180(self, payload): self.log(f'{self.args["cube"]}: Flipped 180') def handle_flip90(self, payload): self.log(f'{self.args["cube"]}: Flipped 90') def handle_shake(self, payload): self.log(f'{self.args["cube"]}: Shake')