Files
ad-nix/apps/cubes/cube.py
John Lancaster a79d441668 cube updates
2024-03-04 19:23:07 -06:00

83 lines
2.9 KiB
Python

import json
import sys
from copy import deepcopy
from pathlib import Path
from appdaemon.plugins.hass.hassapi import Hass
from appdaemon.plugins.mqtt.mqttapi import Mqtt
rc_path = (Path(__file__).resolve().parents[1] / 'room_control').as_posix()
sys.path.insert(0, rc_path)
from console import console, init_logging, deinit_logging
class AqaraCube(Hass, Mqtt):
def initialize(self):
if (level := self.args.get('rich', False)):
init_logging(self, level)
topic = f'zigbee2mqtt/{self.args["cube"]}'
self.mqtt_subscribe(topic, namespace='mqtt')
self.listen_event(self.handle_event, 'MQTT_MESSAGE', topic=topic, namespace='mqtt')
self.log(f'Listening for cube events on: {topic}')
self.app = self.get_app(self.args['app'])
self.log(f'Connected to other app: {self.app.name}')
def terminate(self):
self.log('[bold red]Terminating[/]', level='DEBUG')
deinit_logging(self)
self.log('Success', level='DEBUG')
def handle_event(self, event_name, data, cb_args):
topic_prefix, cube_name = data['topic'].split('/')
self.log(f'{event_name} on [bright_black]{topic_prefix}/[/][blue]{cube_name}[/]', data, level='DEBUG')
payload = json.loads(data['payload'])
action = payload['action']
if action == '' or action == 'wakeup':
return
else:
self.log(json.dumps(payload, indent=2), level='DEBUG')
if (arg := self.args.get(action, False)):
self.action_handler(action=action, description=arg)
elif handler := getattr(self, f'handle_{action}', None):
handler(payload)
def action_handler(self, action: str, description: str):
self.log(f'{self.args["cube"]}: {action}: {description}')
if description == 'activate':
self.app.activate(cause=f'{self.args["cube"]}: {action}')
elif description.startswith('scene.'):
self.call_service('scene/turn_on', entity_id=description, namespace='default')
self.log(f'Turned on {description}')
elif description.startswith('toggle'):
cause = f'{self.args["cube"]} {action}'
self.app.toggle_activate(kwargs={'cause': cause})
else:
self.log(f'Unhandled action: {action}', level='WARNING')
# def handle_rotate_right(self, payload):
# self.log(f'{self.args["cube"]}: Rotate right')
# def handle_rotate_left(self, payload):
# self.log(f'{self.args["cube"]}: Rotate left')
# def handle_slide(self, payload):
# self.log(f'{self.args["cube"]}: Slide')
# def handle_flip180(self, payload):
# self.log(f'{self.args["cube"]}: Flipped 180')
# def handle_flip90(self, payload):
# self.log(f'{self.args["cube"]}: Flipped 90')
# def handle_shake(self, payload):
# self.log(f'{self.args["cube"]}: Shake')