82 lines
2.9 KiB
Python
82 lines
2.9 KiB
Python
from collections.abc import Generator
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from itertools import count
|
|
from itertools import cycle
|
|
from itertools import pairwise
|
|
from typing import Any
|
|
|
|
from appdaemon.plugins.hass import Hass
|
|
from pydantic import TypeAdapter
|
|
from stages import Stage
|
|
|
|
|
|
class StagedControlEvent(str, Enum):
|
|
ACTIVATE = 'activate'
|
|
DEACTIVATE = 'deactivate'
|
|
|
|
|
|
class StagedLight(Hass):
|
|
def initialize(self):
|
|
self.set_log_level('DEBUG')
|
|
self._type_adapter = TypeAdapter(list[Stage])
|
|
self._stages = self._type_adapter.validate_python(self.args['stages'])
|
|
|
|
self.log(f'Initialized Motion Sensor with {len(self._stages)} stages')
|
|
self.listen_event(self.handle_event, 'stage_control', app=self.name)
|
|
self.activate()
|
|
|
|
### Stages
|
|
|
|
def _stage_starts(self) -> Generator[datetime]:
|
|
for offset in count(start=-1):
|
|
for stage in self._stages:
|
|
dt = self.parse_datetime(stage.start, days_offset=offset, aware=True, today=True)
|
|
dt = dt.replace(microsecond=0)
|
|
yield dt
|
|
|
|
def start_pairs(self):
|
|
"""Yield from an infinite progression of start and end times for the stages."""
|
|
yield from pairwise(self._stage_starts())
|
|
|
|
def current_stage(self) -> Stage:
|
|
for stage, (t1, t2) in zip(cycle(self._stages), self.start_pairs()):
|
|
start, end = sorted([t1, t2])
|
|
if self.now_is_between(start, end):
|
|
self.log(f'Current stage start time: {stage.start}', level='DEBUG')
|
|
stage.assign_start(start)
|
|
return stage
|
|
else:
|
|
raise ValueError
|
|
|
|
def current_scene(self):
|
|
return self.current_stage().scene_json()
|
|
|
|
### Events
|
|
|
|
def handle_event(self, event_type: str, data: dict[str, Any], **kwargs: Any) -> None:
|
|
self.log(f'Event handler: {event_type}', level='DEBUG')
|
|
stage = self.current_stage()
|
|
start_time_str = stage.formatted_start('%I:%M %p')
|
|
match data:
|
|
case {'action': StagedControlEvent.ACTIVATE}:
|
|
self.log(f'Activating current stage: {start_time_str}')
|
|
self.activate(scene=stage.scene_json())
|
|
case {'action': StagedControlEvent.DEACTIVATE}:
|
|
self.log(f'Deactivating current stage: {start_time_str}')
|
|
self.deactivate()
|
|
case _:
|
|
self.log(str(data), level='DEBUG')
|
|
self.log(str(kwargs), level='DEBUG')
|
|
|
|
### Actions
|
|
|
|
def activate(self, scene: dict | None = None, **kwargs):
|
|
scene = scene if scene is not None else self.current_scene()
|
|
return self.call_service('scene/apply', entities=scene, transition=5)
|
|
|
|
def deactivate(self, stage: Stage | None = None, **kwargs):
|
|
stage = stage if stage is not None else self.current_stage()
|
|
for entity in stage.scene:
|
|
self.turn_off(entity)
|