from collections.abc import Generator from datetime import datetime from enum import Enum from itertools import count from itertools import cycle from itertools import pairwise from typing import Any from appdaemon.plugins.hass import Hass from pydantic import TypeAdapter from stages import Stage class StagedControlEvent(str, Enum): """Enum to define the different types of valid stage control events""" ACTIVATE = 'activate' DEACTIVATE = 'deactivate' class StagedLight(Hass): def initialize(self): self.set_log_level('DEBUG') self._type_adapter = TypeAdapter(list[Stage]) self._stages = self._type_adapter.validate_python(self.args['stages']) self.log(f'Initialized Motion Sensor with {len(self._stages)} stages') self.listen_event(self.handle_event, 'stage_control', app=self.name) if self.args.get('activate-at-start', False): self.activate() ### Stages def _stage_starts(self) -> Generator[datetime]: for offset in count(start=-1): for stage in self._stages: dt = self.parse_datetime(stage.start, days_offset=offset, aware=True, today=True) dt = dt.replace(microsecond=0) yield dt def start_pairs(self): """Yield from an infinite progression of start and end times for the stages.""" yield from pairwise(self._stage_starts()) def current_stage(self) -> Stage: for stage, (t1, t2) in zip(cycle(self._stages), self.start_pairs()): start, end = sorted([t1, t2]) if self.now_is_between(start, end): self.log(f'Current stage start time: {stage.start}', level='DEBUG') stage.assign_start(start) return stage else: raise ValueError def current_scene(self): return self.current_stage().scene_json() ### Events def handle_event(self, event_type: str, data: dict[str, Any], **kwargs: Any) -> None: self.log(f'Event handler: {event_type}', level='DEBUG') stage = self.current_stage() start_time_str = stage.formatted_start('%I:%M %p') match data: case {'action': StagedControlEvent.ACTIVATE}: self.log(f'Activating current stage: {start_time_str}') self.activate(scene=stage.scene_json()) case {'action': StagedControlEvent.DEACTIVATE}: self.log(f'Deactivating current stage: {start_time_str}') self.deactivate() case _: self.log(str(data), level='DEBUG') self.log(str(kwargs), level='DEBUG') ### Actions def activate(self, scene: dict | None = None, **kwargs: Any): if scene is None: stage = self.current_stage() kwargs['entities'] = stage.scene_json() else: kwargs['entities'] = scene if t := self.args.get('transition'): kwargs['transition'] = t return self.call_service('scene/apply', **kwargs) def deactivate(self, stage: Stage | None = None, **kwargs): stage = stage if stage is not None else self.current_stage() for entity in stage.scene: self.turn_off(entity)