Files
ad-nix/apps/cubes/cube.py
2023-11-24 16:38:40 -06:00

66 lines
2.2 KiB
Python

import json
from copy import deepcopy
from appdaemon.plugins.mqtt.mqttapi import Mqtt
class AqaraCube(Mqtt):
def initialize(self):
self.set_namespace('mqtt')
topic = f'zigbee2mqtt/{self.args["cube"]}'
self.mqtt_subscribe(topic)
self.listen_event(self.handle_event, "MQTT_MESSAGE", topic=topic)
self.log(f'Listening for cube events on: {topic}')
self.app = self.get_app(self.args['app'])
self.log(f'Connected to other app: {self.app.name}')
def handle_event(self, event_name, data, kwargs):
topic = data['topic']
payload = json.loads(data['payload'])
action = payload['action']
if action == '':
return
if (arg := self.args.get(action, None)) is not None:
self.action_handler(action=action, description=arg)
elif (handler := getattr(self, f'handle_{action}', None)):
handler(payload)
else:
self.log(f'Unhandled cube action: {action}')
def action_handler(self, action: str , description: str):
self.log(f'{self.args["cube"]}: {action}: {description}')
if description == 'activate':
self.app.activate(cause=f'{self.args["cube"]}: {action}')
elif description.startswith('scene.'):
self.call_service('scene/turn_on', entity_id=description, namespace='default')
self.log(f'Turned on {description}')
elif description == 'toggle':
cause = f'{action} from {self.args["cube"]}'
if self.app.entity_state:
self.app.deactivate(cause=cause)
else:
self.app.activate(cause=cause)
# def handle_rotate_right(self, payload):
# self.log(f'{self.args["cube"]}: Rotate right')
# def handle_rotate_left(self, payload):
# self.log(f'{self.args["cube"]}: Rotate left')
# def handle_slide(self, payload):
# self.log(f'{self.args["cube"]}: Slide')
# def handle_flip180(self, payload):
# self.log(f'{self.args["cube"]}: Flipped 180')
# def handle_flip90(self, payload):
# self.log(f'{self.args["cube"]}: Flipped 90')
# def handle_shake(self, payload):
# self.log(f'{self.args["cube"]}: Shake')