from collections.abc import Generator from datetime import datetime from enum import Enum from itertools import count from itertools import cycle from itertools import pairwise from typing import Any from appdaemon.plugins.hass import Hass from pydantic import TypeAdapter from stages import Stage class StagedControlEvent(str, Enum): """Enum to define the different types of valid stage control events""" ACTIVATE = 'activate' DEACTIVATE = 'deactivate' class StageCondition(str, Enum): """Enum to define the different types of conditions for an event""" ANY_ON = 'any_on' ALL_OFF = 'all_off' class StagedLight(Hass): def initialize(self): self.set_log_level('DEBUG') self._type_adapter = TypeAdapter(list[Stage]) self._stages = self._type_adapter.validate_python(self.args['stages']) self.log(f'Initialized Motion Sensor with {len(self._stages)} stages') self.listen_event(self.handle_event, 'stage_control', app=self.name) if self.args.get('activate-at-start', False): self.activate() # self._check_transition() self.schedule_transition_checks() self.run_daily(self.schedule_transition_checks, start='00:00:00') ### Stages def _stage_starts(self) -> Generator[datetime]: for offset in count(start=-1): for stage in self._stages: dt = self.parse_datetime(stage.start, days_offset=offset, aware=True, today=True) dt = dt.replace(microsecond=0) yield dt def start_pairs(self): """Yield from an infinite progression of start and end times for the stages.""" yield from pairwise(self._stage_starts()) def current_stage(self) -> Stage: for stage, (t1, t2) in zip(cycle(self._stages), self.start_pairs()): start, end = sorted([t1, t2]) if self.now_is_between(start, end): self.log(f'Current stage start time: {stage.start}', level='DEBUG') stage.assign_start(start) return stage else: raise ValueError def current_scene(self): return self.current_stage().scene_json() ### Transitions def schedule_transition_checks(self, **_): now = self.get_now() for stage in self._stages: dt = self.parse_datetime(stage.start, aware=True, today=True) if dt > now: self.log(f'Scehduling transition at: {dt.strftime("%I:%M %p")}', level='DEBUG') self.run_at(self._check_transition, start=dt) def _check_transition(self, **_): self.log('Firing transition event', level='DEBUG') self.fire_event( 'stage_control', app=self.name, condition=StageCondition.ANY_ON, action=StagedControlEvent.ACTIVATE, ) ### Events def handle_event(self, event_type: str, data: dict[str, Any], **kwargs: Any) -> None: self.log(f'Event handler: {event_type}', level='DEBUG') stage = self.current_stage() scene = stage.scene_json() match data: case {'condition': StageCondition.ANY_ON}: any_on = any(self.get_state(e) == 'on' for e in scene) if not any_on: self.log('Nothing is on, skipping', level='DEBUG') return case {'condition': StageCondition.ALL_OFF}: all_off = all(self.get_state(e) == 'off' for e in scene) if not all_off: self.log('Everything is not off, skipping', level='DEBUG') return match data: case {'action': StagedControlEvent.ACTIVATE}: self.activate(stage) case {'action': StagedControlEvent.DEACTIVATE}: self.deactivate(stage) case _: self.log(str(data), level='DEBUG') self.log(str(kwargs), level='DEBUG') ### Actions def activate(self, stage: Stage | None = None, **kwargs: Any): if stage is None: stage = self.current_stage() kwargs['entities'] = stage.scene_json() else: kwargs['entities'] = stage.scene_json() if t := self.args.get('transition'): kwargs['transition'] = t start_time_str = stage.formatted_start('%I:%M %p') self.log(f'Activating current stage: {start_time_str}') return self.call_service('scene/apply', **kwargs) def deactivate(self, stage: Stage | None = None, **kwargs): stage = stage if stage is not None else self.current_stage() start_time_str = stage.formatted_start('%I:%M %p') self.log(f'Deactivating current stage: {start_time_str}') for entity in stage.scene: self.turn_off(entity)