Compare commits

...

15 Commits

Author SHA1 Message Date
John Lancaster
7ea362c5fd added ad_threads 2025-01-04 14:14:14 -06:00
John Lancaster
4b748173c1 moved sleep function 2024-10-21 03:25:57 +00:00
John Lancaster
de496509a9 WIP 2024-10-21 03:15:59 +00:00
John Lancaster
bc2224a919 formatting 2024-10-21 02:47:38 +00:00
John Lancaster
a67b2998b7 made a graceful sleep method 2024-10-21 02:47:22 +00:00
John Lancaster
0abbb9e546 context manager work for startup/shutdown 2024-10-21 02:41:19 +00:00
John Lancaster
502c218c35 more work 2024-10-20 21:48:19 +00:00
John Lancaster
050fe75e71 more context manager work 2024-10-20 21:27:51 +00:00
John Lancaster
8064a73e9f context manager and subsystem work 2024-10-20 20:56:18 +00:00
John Lancaster
9121d1ef04 added entity callback cancellation 2024-10-16 03:57:40 +00:00
John Lancaster
5e458aca41 reorg 2024-10-16 03:56:07 +00:00
John Lancaster
5af940f077 added some threading stuff to appdaemon object 2024-10-16 03:55:32 +00:00
John Lancaster
4dddc50c82 reformat 2024-10-16 03:51:16 +00:00
John Lancaster
9ccbad58cf added get_state to state 2024-10-16 03:50:57 +00:00
John Lancaster
45e435554a started context manager 2024-10-16 03:50:25 +00:00
8 changed files with 569 additions and 17 deletions

54
ad_threads.py Normal file
View File

@@ -0,0 +1,54 @@
import threading
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from queue import Queue
from time import perf_counter
from typing import Callable
def worker(q: Queue):
thread = threading.current_thread()
print(thread)
while True:
try:
item = q.get()
get_time = perf_counter()
if item is None:
break
elif callable(item):
print(f'Calling {item}')
item()
else:
print(f'{thread.name}: {item}')
except Exception as exc:
print(f'Error: {exc}')
finally:
proc_time = perf_counter() - get_time
q.task_done()
print(f'{proc_time*10**3:.3f}ms')
# print('Broke worker loop')
return f'Ended {thread.name}'
def run_executor(queues: Iterable[Queue]):
with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor:
for fut in executor.map(worker, queues):
print(fut)
def main(n: int, start: bool = True):
queues = [Queue() for _ in range(n)]
thread = threading.Thread(
target=run_executor,
args=(queues,),
name='AD-ThreadPoolExecutor'
)
if start:
thread.start()
return queues
def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs):
return queue.put_nowait(partial(func, *args, **kwargs))

View File

@@ -0,0 +1,55 @@
import asyncio
import concurrent
import concurrent.futures
import functools
import inspect
from typing import Any, Callable, Coroutine
class AppDaemon:
async def run_async_sync_func(self, method, *args, timeout: float | None = None, **kwargs):
if inspect.iscoroutinefunction(method):
result = await method(*args, timeout=timeout, **kwargs)
else:
result = await self.run_in_executor(self, method, *args, timeout=timeout, **kwargs)
return result
async def run_in_executor(
self,
func: Callable,
*args,
timeout: float | None = None,
**kwargs
) -> Any:
"""Run the sync function using the ThreadPoolExecutor and await the result"""
timeout = timeout or self.AD.internal_function_timeout
coro = self.AD.loop.run_in_executor(
self.AD.executor,
functools.partial(func, **kwargs),
*args,
)
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
self.logger.warning(
"Function (%s) took too long (%s seconds), cancelling the task...",
func.__name__, timeout,
)
def run_coroutine_threadsafe(self, coro: Coroutine, timeout: float | None = None) -> Any:
timeout = timeout or self.AD.internal_function_timeout
if self.AD.loop.is_running():
try:
future = asyncio.run_coroutine_threadsafe(coro, self.AD.loop)
return future.result(timeout)
except (asyncio.TimeoutError, concurrent.futures.TimeoutError):
self.logger.warning(
"Coroutine (%s) took too long (%s seconds), cancelling the task...",
coro, timeout,
)
future.cancel()
else:
self.logger.warning("LOOP NOT RUNNING. Returning NONE.")

View File

@@ -1,11 +1,8 @@
import asyncio import asyncio
from cgitb import handler
from copy import deepcopy from copy import deepcopy
from logging import Logger from logging import Logger
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
import appdaemon.utils as utils
if TYPE_CHECKING: if TYPE_CHECKING:
from appdaemon.appdaemon import AppDaemon from appdaemon.appdaemon import AppDaemon
@@ -35,27 +32,29 @@ class Callbacks:
self.diag = ad.logging.get_diag() self.diag = ad.logging.get_diag()
self.callbacks = {} self.callbacks = {}
self.callbacks_lock = asyncio.Lock() self.callbacks_lock = asyncio.Lock()
# #
# Diagnostic # Diagnostic
# #
async def add_callback(self): async def add_callback(self):
return return
async def cancel_callback(self, handle: str, name: str, silent: bool = False): async def cancel_callback(self, handle: str, name: str, silent: bool = False):
async with self.callbacks_lock: async with self.callbacks_lock:
if (callbacks := self.callbacks.get(name)) \ if (callbacks := self.callbacks.get(name)) \
and (callback := callbacks.pop(handle, False)): and (callback := callbacks.pop(handle, False)):
await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}") await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}")
return True return True
elif not silent: elif not silent:
self.logger.warning(f"Invalid callback handle '{handle}' in cancel_callback()") self.logger.warning(f"Invalid callback handle '{
handle}' in cancel_callback()")
async def cancel_all_callbacks(self, name: str, silent: bool = False): async def cancel_all_callbacks(self, name: str, silent: bool = False):
async with self.callbacks_lock: async with self.callbacks_lock:
if callbacks := self.callbacks.pop(name, False): if callbacks := self.callbacks.pop(name, False):
self.logger.debug("Clearing %s callbacks for %s", len(callbacks), name) self.logger.debug(
"Clearing %s callbacks for %s", len(callbacks), name)
for handle, cb_info in callbacks.items(): for handle, cb_info in callbacks.items():
cb_type: Literal["event", "state", "log"] = cb_info['type'] cb_type: Literal["event", "state", "log"] = cb_info['type']
await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}") await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}")
@@ -81,17 +80,37 @@ class Callbacks:
} }
for app_name, app_callbacks in self.callbacks.items() for app_name, app_callbacks in self.callbacks.items()
} }
async def get_callback_handles(self, app: str = 'all', type: str = 'all', entity_id: str = 'all'): async def get_callbacks(
self,
namespace: str = 'all',
app: str = 'all',
type: str = 'all',
entity_id: str = 'all',
copy: bool = True,
) -> dict[str, dict[str, Any]]:
async with self.callbacks_lock: async with self.callbacks_lock:
handles = set( return {
handle handle: deepcopy(cb_info) if copy else cb_info
for app_name, app_callbacks in self.callbacks.items() for app_name, app_callbacks in self.callbacks.items()
if app == 'all' or app == app_name if app == 'all' or app == app_name
for handle, cb_info in app_callbacks.items() for handle, cb_info in app_callbacks.items()
if (type == 'all' or type == cb_info["type"]) if (type == 'all' or type == cb_info["type"])
and (entity_id == 'all' or entity_id == cb_info["entity"]) and (entity_id == 'all' or entity_id == cb_info["entity"])
) and (
self.logger.debug(f"Got {len(handles)} callbacks for app={app}, type={type}, entity_id={entity_id}") namespace == 'all'
return handles or namespace == 'global'
or cb_info["namespace"] == 'global'
or namespace == cb_info["namespace"]
)
}
async def get_callback_handles(
self,
namespace: str = 'all',
app: str = 'all',
type: str = 'all',
entity_id: str = 'all'
) -> set[str]:
callbacks = await self.get_callbacks(namespace, app, type, entity_id, copy=False)
return set(callbacks.keys())

View File

@@ -0,0 +1,120 @@
import asyncio
import functools
import logging
import signal
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack, contextmanager
from dataclasses import dataclass, field
from threading import Event
from typing import Any, Callable
logger = logging.getLogger(__name__)
@dataclass
class AppDaemonRunContext:
_stack: ExitStack = field(default_factory=ExitStack)
stop_event: Event = field(default_factory=Event)
shutdown_grace_period: float = 0.1
loop: asyncio.AbstractEventLoop = field(init=False)
executor: ThreadPoolExecutor = field(init=False)
def __enter__(self):
self.loop = self._stack.enter_context(self.asyncio_context())
logger.debug("Created event loop")
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
self.loop.add_signal_handler(
s, lambda s=s: self.loop.create_task(self.shutdown(s)))
self.executor = self._stack.enter_context(self.thread_context())
logger.debug("Started thread pool")
return self
def __exit__(self, exc_type, exc_value, traceback):
logger.debug(f'Closing context from {self.__class__.__name__}')
self._stack.close()
def get_running_tasks(self) -> list[asyncio.Task]:
return [
t for t in asyncio.all_tasks(self.loop)
if t is not asyncio.current_task()
]
async def shutdown(self, signal=signal.SIGTERM):
"""Cleanup tasks tied to the service's shutdown.
https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
"""
logger.info(f"Received exit signal {signal.name}...")
if not self.stop_event.is_set():
logger.debug('Setting stop event')
self.stop_event.set()
tasks = self.get_running_tasks()
graceful = (
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
for t in tasks
)
logger.debug(
'Allowing graceful shutdown from stop event for %ss',
self.shutdown_grace_period
)
await asyncio.gather(*graceful, return_exceptions=True)
for task in tasks:
if task.cancelled():
logger.warning(f'Cancelled {task.get_name()}')
logger.info("Calling stop() for asyncio event loop")
self.loop.stop()
else:
logger.warning('Already started shutdown')
@contextmanager
def asyncio_context(self):
try:
loop = asyncio.get_event_loop()
yield loop
finally:
loop.close()
if loop.is_closed():
logger.debug("Gracefully closed event loop.")
@contextmanager
def thread_context(self):
with ThreadPoolExecutor(max_workers=5) as executor:
yield executor
if executor._shutdown:
logger.debug('Gracefully shut down ThreadPoolExecutor')
async def run_in_executor(
self,
func: Callable,
*args,
timeout: float | None = None,
**kwargs
) -> Any:
"""Run the sync function using the ThreadPoolExecutor and await the result"""
timeout = timeout or 10.0
coro = self.loop.run_in_executor(
self.executor,
functools.partial(func, **kwargs),
*args,
)
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
print('Timed out')
async def sleep(self, delay: float):
"""Wrapper function for asyncio.sleep that suppresses and logs a task cancellation"""
try:
await self.run_in_executor(self.stop_event.wait, delay)
except asyncio.CancelledError:
logger.debug('Cancelled during sleep')

26
appdaemon/entity.py Normal file
View File

@@ -0,0 +1,26 @@
from appdaemon import utils
class Entity:
@utils.sync_decorator
async def get_callbacks(self, limit_to_app: str = None) -> dict[str, dict[str, Any]]:
async with self.AD.callbacks.callbacks_lock:
return {
handle: {
'app_name': cb_info['name'],
'function': cb_info['function'].__name__,
**cb_info["kwargs"]
}
for app_name, app_callbacks in self.AD.callbacks.callbacks.items()
if limit_to_app is None or app_name == limit_to_app
for handle, cb_info in app_callbacks.items()
if self.namespace == cb_info['namespace']
and cb_info["type"] == "state"
and cb_info.get('entity') == self.entity_id
}
@utils.sync_decorator
async def cancel_callbacks(self, all: bool = False):
for handle, info in (await self.get_callbacks()).items():
if all or self.name == info['app_name']:
await self.AD.state.cancel_state_callback(handle, info['app_name'])

79
appdaemon/main.py Normal file
View File

@@ -0,0 +1,79 @@
from contextlib import ExitStack
from dataclasses import dataclass, field
from appdaemon.models.ad_config import AppDaemonConfig
from context_manager import AppDaemonRunContext
from subsystem import AppDaemon
@dataclass
class ADMain:
"""Class to contain the mechanics to run AppDaemon as module
"""
config_file: str
cfg: AppDaemonConfig = field(init=False)
_stack: ExitStack = field(default_factory=ExitStack)
run_context: AppDaemonRunContext = field(init=False)
ad: AppDaemon = field(init=False)
def __post_init__(self) -> None:
raw_cfg = read_config_file(self.config_file)
self.cfg = AppDaemonConfig(
config_file=self.config_file,
**raw_cfg['appdaemon']
)
def __enter__(self):
"""Used to start the asyncio loop, thread pool, and AppDaemon"""
# Use the ExitStack from ADMain instead of creating a new one
self.run_context = AppDaemonRunContext(_stack=self._stack)
self._stack.enter_context(self.run_context)
# Start AppDaemon by entering it's context
self.ad = AppDaemon(self.cfg, self.run_context)
self._stack.enter_context(self.ad)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.ad.logger.debug(f'Closing context from {self.__class__.__name__}')
self._stack.close()
self.ad.logger.info('Stopped main()')
def run(self):
if hasattr(self, 'ad'):
self.ad.logger.info('Running asyncio event loop indefinitely...')
self.run_context.loop.run_forever()
self.ad.logger.debug('Gracefully stopped event loop')
else:
logging.error('Running ADMain without context manager')
if __name__ == '__main__':
import logging.config
from appdaemon.utils import read_config_file
from rich.console import Console
from rich.highlighter import NullHighlighter
console = Console()
logging.config.dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name:20}[/] {message}'}},
'handlers': {
'rich': {
'()': 'rich.logging.RichHandler',
'formatter': 'basic',
'console': console,
'highlighter': NullHighlighter(),
'markup': True
}
},
'root': {'level': 'DEBUG', 'handlers': ['rich']},
}
)
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
main.run()
main.ad.logger.debug('Exiting main context from with statement')

View File

@@ -1,4 +1,4 @@
from copy import deepcopy
from logging import Logger from logging import Logger
from typing import Any from typing import Any
@@ -56,3 +56,52 @@ class States:
async for remove in _send_dispatches(): async for remove in _send_dispatches():
await self.cancel_state_callback(**remove) await self.cancel_state_callback(**remove)
async def get_state(
self,
name: str,
namespace: str,
entity_id: str | None = None,
attribute: str | None = None,
default: Any = None,
copy: bool = True,
):
self.logger.debug("get_state: %s.%s %s %s", entity_id, attribute, default, copy)
result = default
if ns := self.state.get(namespace):
# Process entity_id input
if entity_id is None:
result = ns
# TODO: filter by attribute?
elif "." not in entity_id:
domain = entity_id
result = {
eid: state
for eid, state in ns.items()
if eid.startswith(domain)
}
elif full_state := ns.get(entity_id):
result = full_state
else:
self.logger.warning(f"Entity {entity_id} does not exist in namespace {namespace}")
return
# Process attribute input
if attribute == "all":
result = result
elif attr := full_state.get(attribute):
result = attr
elif attr := full_state.get('attributes', {}).get(attribute):
result = attr
elif state := full_state.get("state"):
result = state
return deepcopy(result) if copy else result
else:
self.logger.warning(f"Namespace does not exist: {namespace}")
async def cancel_state_callback(self, handle: str, name: str, silent: bool = False) -> bool:
return await self.AD.callbacks.cancel_callback(handle, name, silent)

150
appdaemon/subsystem.py Normal file
View File

@@ -0,0 +1,150 @@
import asyncio
import logging
import os
import signal
import traceback
from contextlib import ExitStack
from dataclasses import dataclass, field
from logging import Logger, getLogger
from random import random
from threading import Event, RLock
from time import perf_counter
from typing import Coroutine
from appdaemon.models import AppDaemonConfig
from context_manager import AppDaemonRunContext
@dataclass
class ADSubsystem:
AD: "AppDaemon"
stop: Event
"""An thread event for the subsystem to use to shutdown gracefully"""
lock: RLock = field(default_factory=RLock)
"""A threadsafe re-entrant lock to protect any internal data while it's being modified"""
logger: Logger = field(init=False)
tasks: list[asyncio.Task] = field(default_factory=list)
def __post_init__(self) -> None:
name = f'_{self.__class__.__name__.lower()}'
self.logger = getLogger(f'AppDaemon.{name}')
self.create_task = self.AD.create_task
self.sleep = self.AD.context.sleep
def __enter__(self):
self.logger.debug(f'Starting {self.__class__.__name__}')
def __exit__(self, exc_type, exc_value, traceback):
pass
# self.logger.debug(f'Exiting {self.__class__.__name__}')
@property
def stopping(self) -> bool:
return self.stop.is_set()
@dataclass
class Utility(ADSubsystem):
loop_rate: float = 1.0
def __enter__(self):
super().__enter__()
self.create_task(self.loop(), 'utility loop', critical=True)
return self
async def loop(self):
while not self.stopping:
self.logger.debug('Looping...')
await self.sleep(self.loop_rate)
task_name = asyncio.current_task().get_name()
self.logger.debug(f'Gracefully stopped {task_name} task')
@dataclass
class Plugin(ADSubsystem):
state: dict[str, int] = field(default_factory=dict)
update_rate: float = 5.0
def __post_init__(self) -> None:
super().__post_init__()
self.state['update_count'] = 0
def __enter__(self):
super().__enter__()
self.create_task(
self.periodic_self_udpate(),
name='plugin periodic update',
critical=True
)
return self
async def periodic_self_udpate(self):
loop_time = perf_counter()
while not self.stopping:
with self.lock:
self.state['update_count'] += 1
# self.logger.debug('Long plugin update...')
# await asyncio.sleep(random())
self.logger.debug(
'Plugin self update: %s %s',
self.state["update_count"],
f'{perf_counter()-loop_time:.3f}s'
)
loop_time = perf_counter()
if self.state['update_count'] == 2:
raise ValueError('fake error')
await self.sleep(self.update_rate)
task_name = asyncio.current_task().get_name()
self.logger.debug(f'Gracefully stopped {task_name} task')
@dataclass
class AppDaemon:
cfg: AppDaemonConfig
context: AppDaemonRunContext
_stack: ExitStack = field(default_factory=ExitStack)
utility: Utility = field(init=False)
plugins: dict[str, Plugin] = field(default_factory=dict)
def __post_init__(self) -> None:
self.logger = logging.getLogger('AppDaemon')
self.utility = Utility(self, self.context.stop_event)
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
def __enter__(self):
self.logger.info('Starting AppDaemon')
self._stack.enter_context(self.utility)
for plugin in self.plugins.values():
self._stack.enter_context(plugin)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._stack.__exit__(exc_type, exc_value, traceback)
def create_task(self, coro: Coroutine, name: str | None = None, critical: bool = False):
"""Creates an async task and adds exception callbacks"""
task = self.context.loop.create_task(coro, name=name)
task.add_done_callback(self.check_task_exception)
if critical:
task.add_done_callback(self.critical_exception)
return task
def check_task_exception(self, task: asyncio.Task):
if (exc := task.exception()) and not isinstance(exc, asyncio.CancelledError):
self.logger.error('\n'.join(traceback.format_exception(exc)))
def critical_exception(self, task: asyncio.Task):
if task.exception():
self.logger.critical(
'Critical error in %s, forcing shutdown',
task.get_name()
)
self.shutdown()
def shutdown(self):
self.logger.debug('Shutting down by sending SIGTERM')
os.kill(os.getpid(), signal.SIGTERM)