30 lines
880 B
Python
30 lines
880 B
Python
import json
|
|
|
|
from appdaemon.adapi import ADAPI
|
|
|
|
|
|
class Button(ADAPI):
|
|
def initialize(self):
|
|
name = self.args['button']
|
|
self.handle = self.listen_event(
|
|
self.handle_button,
|
|
'MQTT_MESSAGE',
|
|
topic=f'zigbee2mqtt/{name}',
|
|
payload=self.payload_filter,
|
|
)
|
|
self.log(f"Started MQTT app in namespace '{self._namespace}'")
|
|
# raise ValueError
|
|
|
|
@staticmethod
|
|
def payload_filter(payload: str):
|
|
try:
|
|
return json.loads(payload)['action'] != ''
|
|
except Exception:
|
|
return False
|
|
|
|
def handle_button(self, event_name, data, **kwargs):
|
|
data['payload'] = json.loads(data['payload'])
|
|
# if data['payload']['action'] != '':
|
|
json_str = json.dumps(data, indent=4)
|
|
self.logger.info(f'{event_name} callback with\n{json_str}\n{kwargs}')
|