Files
ad-prod/apps/motion.py
2025-11-21 07:54:49 -06:00

56 lines
1.7 KiB
Python

import json
from collections.abc import Generator
from datetime import datetime
from itertools import count, cycle, pairwise
from pathlib import Path
from appdaemon.adapi import ADAPI
from pydantic import BaseModel, TypeAdapter
from stages import Stage
adapter = TypeAdapter(list[Stage])
class StagedMotionLight(ADAPI):
def initialize(self):
self.set_log_level("DEBUG")
self._stages = adapter.validate_python(self.args["stages"])
self.log(f"Initialized Motion Sensor with {len(self._stages)} stages")
self.activate()
def _stage_starts(self) -> Generator[datetime]:
for offset in count(start=-1):
for stage in self._stages:
dt = self.parse_datetime(
stage.start,
days_offset=offset,
aware=True,
today=True,
)
dt = dt.replace(microsecond=0)
yield dt
def start_pairs(self):
yield from pairwise(self._stage_starts())
def current_stage(self) -> Stage:
for stage, (t1, t2) in zip(cycle(self._stages), self.start_pairs()):
start, end = sorted([t1, t2])
if self.now_is_between(start, end):
self.log(f"Current stage start time: {stage.start}")
return stage
else:
raise ValueError
def current_scene(self):
return self.current_stage().model_dump(mode="json")["scene"]
def activate(self, **kwargs):
return self.call_service("scene/apply", entities=self.current_scene())
# @property
# def stages(self) -> GeneratorExit:
# yield from pairwise(self._stage_starts)