141 lines
4.8 KiB
Python
141 lines
4.8 KiB
Python
from collections.abc import Generator
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from itertools import count
|
|
from itertools import cycle
|
|
from itertools import pairwise
|
|
from typing import Any
|
|
|
|
from appdaemon.plugins.hass import Hass
|
|
from pydantic import TypeAdapter
|
|
from stages import Stage
|
|
|
|
|
|
class StagedControlEvent(str, Enum):
|
|
"""Enum to define the different types of valid stage control events"""
|
|
|
|
ACTIVATE = 'activate'
|
|
DEACTIVATE = 'deactivate'
|
|
|
|
|
|
class StageCondition(str, Enum):
|
|
"""Enum to define the different types of conditions for an event"""
|
|
|
|
ANY_ON = 'any_on'
|
|
ALL_OFF = 'all_off'
|
|
|
|
|
|
class StagedLight(Hass):
|
|
def initialize(self):
|
|
self.set_log_level('DEBUG')
|
|
self._type_adapter = TypeAdapter(list[Stage])
|
|
self._stages = self._type_adapter.validate_python(self.args['stages'])
|
|
|
|
self.log(f'Initialized Motion Sensor with {len(self._stages)} stages')
|
|
self.listen_event(self.handle_event, 'stage_control', app=self.name)
|
|
|
|
if self.args.get('activate-at-start', False):
|
|
self.activate()
|
|
|
|
# self._check_transition()
|
|
self.schedule_transition_checks()
|
|
self.run_daily(self.schedule_transition_checks, start='00:00:00')
|
|
|
|
### Stages
|
|
|
|
def _stage_starts(self) -> Generator[datetime]:
|
|
for offset in count(start=-1):
|
|
for stage in self._stages:
|
|
dt = self.parse_datetime(stage.start, days_offset=offset, aware=True, today=True)
|
|
dt = dt.replace(microsecond=0)
|
|
yield dt
|
|
|
|
def start_pairs(self):
|
|
"""Yield from an infinite progression of start and end times for the stages."""
|
|
yield from pairwise(self._stage_starts())
|
|
|
|
def current_stage(self) -> Stage:
|
|
for stage, (t1, t2) in zip(cycle(self._stages), self.start_pairs()):
|
|
start, end = sorted([t1, t2])
|
|
if self.now_is_between(start, end):
|
|
self.log(f'Current stage start time: {stage.start}', level='DEBUG')
|
|
stage.assign_start(start)
|
|
return stage
|
|
else:
|
|
raise ValueError
|
|
|
|
def current_scene(self):
|
|
return self.current_stage().scene_json()
|
|
|
|
### Transitions
|
|
|
|
def schedule_transition_checks(self, **_):
|
|
now = self.get_now()
|
|
for stage in self._stages:
|
|
dt = self.parse_datetime(stage.start, aware=True, today=True)
|
|
if dt > now:
|
|
self.log(f'Scehduling transition at: {dt.strftime("%I:%M %p")}', level='DEBUG')
|
|
self.run_at(self._check_transition, start=dt)
|
|
|
|
def _check_transition(self, **_):
|
|
self.log('Firing transition event', level='DEBUG')
|
|
self.fire_event(
|
|
'stage_control',
|
|
app=self.name,
|
|
condition=StageCondition.ANY_ON,
|
|
action=StagedControlEvent.ACTIVATE,
|
|
)
|
|
|
|
### Events
|
|
|
|
def handle_event(self, event_type: str, data: dict[str, Any], **kwargs: Any) -> None:
|
|
self.log(f'Event handler: {event_type}', level='DEBUG')
|
|
stage = self.current_stage()
|
|
scene = stage.scene_json()
|
|
|
|
match data:
|
|
case {'condition': StageCondition.ANY_ON}:
|
|
any_on = any(self.get_state(e) == 'on' for e in scene)
|
|
if not any_on:
|
|
self.log('Nothing is on, skipping', level='DEBUG')
|
|
return
|
|
case {'condition': StageCondition.ALL_OFF}:
|
|
all_off = all(self.get_state(e) == 'off' for e in scene)
|
|
if not all_off:
|
|
self.log('Everything is not off, skipping', level='DEBUG')
|
|
return
|
|
|
|
match data:
|
|
case {'action': StagedControlEvent.ACTIVATE}:
|
|
self.activate(stage)
|
|
case {'action': StagedControlEvent.DEACTIVATE}:
|
|
self.deactivate(stage)
|
|
case _:
|
|
self.log(str(data), level='DEBUG')
|
|
self.log(str(kwargs), level='DEBUG')
|
|
|
|
### Actions
|
|
|
|
def activate(self, stage: Stage | None = None, **kwargs: Any):
|
|
if stage is None:
|
|
stage = self.current_stage()
|
|
kwargs['entities'] = stage.scene_json()
|
|
else:
|
|
kwargs['entities'] = stage.scene_json()
|
|
|
|
if t := self.args.get('transition'):
|
|
kwargs['transition'] = t
|
|
|
|
start_time_str = stage.formatted_start('%I:%M %p')
|
|
self.log(f'Activating current stage: {start_time_str}')
|
|
return self.call_service('scene/apply', **kwargs)
|
|
|
|
def deactivate(self, stage: Stage | None = None, **kwargs):
|
|
stage = stage if stage is not None else self.current_stage()
|
|
|
|
start_time_str = stage.formatted_start('%I:%M %p')
|
|
self.log(f'Deactivating current stage: {start_time_str}')
|
|
|
|
for entity in stage.scene:
|
|
self.turn_off(entity)
|