Compare commits
15 Commits
6076329aea
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ea362c5fd | ||
|
|
4b748173c1 | ||
|
|
de496509a9 | ||
|
|
bc2224a919 | ||
|
|
a67b2998b7 | ||
|
|
0abbb9e546 | ||
|
|
502c218c35 | ||
|
|
050fe75e71 | ||
|
|
8064a73e9f | ||
|
|
9121d1ef04 | ||
|
|
5e458aca41 | ||
|
|
5af940f077 | ||
|
|
4dddc50c82 | ||
|
|
9ccbad58cf | ||
|
|
45e435554a |
54
ad_threads.py
Normal file
54
ad_threads.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
import threading
|
||||||
|
from collections.abc import Iterable
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from functools import partial
|
||||||
|
from queue import Queue
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
|
||||||
|
def worker(q: Queue):
|
||||||
|
thread = threading.current_thread()
|
||||||
|
print(thread)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
item = q.get()
|
||||||
|
get_time = perf_counter()
|
||||||
|
if item is None:
|
||||||
|
break
|
||||||
|
elif callable(item):
|
||||||
|
print(f'Calling {item}')
|
||||||
|
item()
|
||||||
|
else:
|
||||||
|
print(f'{thread.name}: {item}')
|
||||||
|
except Exception as exc:
|
||||||
|
print(f'Error: {exc}')
|
||||||
|
finally:
|
||||||
|
proc_time = perf_counter() - get_time
|
||||||
|
q.task_done()
|
||||||
|
print(f'{proc_time*10**3:.3f}ms')
|
||||||
|
|
||||||
|
# print('Broke worker loop')
|
||||||
|
return f'Ended {thread.name}'
|
||||||
|
|
||||||
|
|
||||||
|
def run_executor(queues: Iterable[Queue]):
|
||||||
|
with ThreadPoolExecutor(thread_name_prefix='AD-App-Thread') as executor:
|
||||||
|
for fut in executor.map(worker, queues):
|
||||||
|
print(fut)
|
||||||
|
|
||||||
|
|
||||||
|
def main(n: int, start: bool = True):
|
||||||
|
queues = [Queue() for _ in range(n)]
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=run_executor,
|
||||||
|
args=(queues,),
|
||||||
|
name='AD-ThreadPoolExecutor'
|
||||||
|
)
|
||||||
|
if start:
|
||||||
|
thread.start()
|
||||||
|
return queues
|
||||||
|
|
||||||
|
|
||||||
|
def dispatch_worker(queue: Queue, func: Callable, *args, **kwargs):
|
||||||
|
return queue.put_nowait(partial(func, *args, **kwargs))
|
||||||
55
appdaemon/appdaemon_dev.py
Normal file
55
appdaemon/appdaemon_dev.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
import asyncio
|
||||||
|
import concurrent
|
||||||
|
import concurrent.futures
|
||||||
|
import functools
|
||||||
|
import inspect
|
||||||
|
from typing import Any, Callable, Coroutine
|
||||||
|
|
||||||
|
|
||||||
|
class AppDaemon:
|
||||||
|
async def run_async_sync_func(self, method, *args, timeout: float | None = None, **kwargs):
|
||||||
|
if inspect.iscoroutinefunction(method):
|
||||||
|
result = await method(*args, timeout=timeout, **kwargs)
|
||||||
|
else:
|
||||||
|
result = await self.run_in_executor(self, method, *args, timeout=timeout, **kwargs)
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def run_in_executor(
|
||||||
|
self,
|
||||||
|
func: Callable,
|
||||||
|
*args,
|
||||||
|
timeout: float | None = None,
|
||||||
|
**kwargs
|
||||||
|
) -> Any:
|
||||||
|
"""Run the sync function using the ThreadPoolExecutor and await the result"""
|
||||||
|
timeout = timeout or self.AD.internal_function_timeout
|
||||||
|
|
||||||
|
coro = self.AD.loop.run_in_executor(
|
||||||
|
self.AD.executor,
|
||||||
|
functools.partial(func, **kwargs),
|
||||||
|
*args,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return await asyncio.wait_for(coro, timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.logger.warning(
|
||||||
|
"Function (%s) took too long (%s seconds), cancelling the task...",
|
||||||
|
func.__name__, timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
def run_coroutine_threadsafe(self, coro: Coroutine, timeout: float | None = None) -> Any:
|
||||||
|
timeout = timeout or self.AD.internal_function_timeout
|
||||||
|
|
||||||
|
if self.AD.loop.is_running():
|
||||||
|
try:
|
||||||
|
future = asyncio.run_coroutine_threadsafe(coro, self.AD.loop)
|
||||||
|
return future.result(timeout)
|
||||||
|
except (asyncio.TimeoutError, concurrent.futures.TimeoutError):
|
||||||
|
self.logger.warning(
|
||||||
|
"Coroutine (%s) took too long (%s seconds), cancelling the task...",
|
||||||
|
coro, timeout,
|
||||||
|
)
|
||||||
|
future.cancel()
|
||||||
|
else:
|
||||||
|
self.logger.warning("LOOP NOT RUNNING. Returning NONE.")
|
||||||
@@ -1,11 +1,8 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from cgitb import handler
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from logging import Logger
|
from logging import Logger
|
||||||
from typing import TYPE_CHECKING, Any, Literal
|
from typing import TYPE_CHECKING, Any, Literal
|
||||||
|
|
||||||
import appdaemon.utils as utils
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from appdaemon.appdaemon import AppDaemon
|
from appdaemon.appdaemon import AppDaemon
|
||||||
|
|
||||||
@@ -35,27 +32,29 @@ class Callbacks:
|
|||||||
self.diag = ad.logging.get_diag()
|
self.diag = ad.logging.get_diag()
|
||||||
self.callbacks = {}
|
self.callbacks = {}
|
||||||
self.callbacks_lock = asyncio.Lock()
|
self.callbacks_lock = asyncio.Lock()
|
||||||
|
|
||||||
#
|
#
|
||||||
# Diagnostic
|
# Diagnostic
|
||||||
#
|
#
|
||||||
|
|
||||||
async def add_callback(self):
|
async def add_callback(self):
|
||||||
return
|
return
|
||||||
|
|
||||||
async def cancel_callback(self, handle: str, name: str, silent: bool = False):
|
async def cancel_callback(self, handle: str, name: str, silent: bool = False):
|
||||||
async with self.callbacks_lock:
|
async with self.callbacks_lock:
|
||||||
if (callbacks := self.callbacks.get(name)) \
|
if (callbacks := self.callbacks.get(name)) \
|
||||||
and (callback := callbacks.pop(handle, False)):
|
and (callback := callbacks.pop(handle, False)):
|
||||||
await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}")
|
await self.AD.state.remove_entity("admin", f"{callback['type']}_callback.{handle}")
|
||||||
return True
|
return True
|
||||||
elif not silent:
|
elif not silent:
|
||||||
self.logger.warning(f"Invalid callback handle '{handle}' in cancel_callback()")
|
self.logger.warning(f"Invalid callback handle '{
|
||||||
|
handle}' in cancel_callback()")
|
||||||
|
|
||||||
async def cancel_all_callbacks(self, name: str, silent: bool = False):
|
async def cancel_all_callbacks(self, name: str, silent: bool = False):
|
||||||
async with self.callbacks_lock:
|
async with self.callbacks_lock:
|
||||||
if callbacks := self.callbacks.pop(name, False):
|
if callbacks := self.callbacks.pop(name, False):
|
||||||
self.logger.debug("Clearing %s callbacks for %s", len(callbacks), name)
|
self.logger.debug(
|
||||||
|
"Clearing %s callbacks for %s", len(callbacks), name)
|
||||||
for handle, cb_info in callbacks.items():
|
for handle, cb_info in callbacks.items():
|
||||||
cb_type: Literal["event", "state", "log"] = cb_info['type']
|
cb_type: Literal["event", "state", "log"] = cb_info['type']
|
||||||
await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}")
|
await self.AD.state.remove_entity("admin", f"{cb_type}_callback.{handle}")
|
||||||
@@ -81,17 +80,37 @@ class Callbacks:
|
|||||||
}
|
}
|
||||||
for app_name, app_callbacks in self.callbacks.items()
|
for app_name, app_callbacks in self.callbacks.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_callback_handles(self, app: str = 'all', type: str = 'all', entity_id: str = 'all'):
|
async def get_callbacks(
|
||||||
|
self,
|
||||||
|
namespace: str = 'all',
|
||||||
|
app: str = 'all',
|
||||||
|
type: str = 'all',
|
||||||
|
entity_id: str = 'all',
|
||||||
|
copy: bool = True,
|
||||||
|
) -> dict[str, dict[str, Any]]:
|
||||||
async with self.callbacks_lock:
|
async with self.callbacks_lock:
|
||||||
handles = set(
|
return {
|
||||||
handle
|
handle: deepcopy(cb_info) if copy else cb_info
|
||||||
for app_name, app_callbacks in self.callbacks.items()
|
for app_name, app_callbacks in self.callbacks.items()
|
||||||
if app == 'all' or app == app_name
|
if app == 'all' or app == app_name
|
||||||
for handle, cb_info in app_callbacks.items()
|
for handle, cb_info in app_callbacks.items()
|
||||||
if (type == 'all' or type == cb_info["type"])
|
if (type == 'all' or type == cb_info["type"])
|
||||||
and (entity_id == 'all' or entity_id == cb_info["entity"])
|
and (entity_id == 'all' or entity_id == cb_info["entity"])
|
||||||
)
|
and (
|
||||||
self.logger.debug(f"Got {len(handles)} callbacks for app={app}, type={type}, entity_id={entity_id}")
|
namespace == 'all'
|
||||||
return handles
|
or namespace == 'global'
|
||||||
|
or cb_info["namespace"] == 'global'
|
||||||
|
or namespace == cb_info["namespace"]
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async def get_callback_handles(
|
||||||
|
self,
|
||||||
|
namespace: str = 'all',
|
||||||
|
app: str = 'all',
|
||||||
|
type: str = 'all',
|
||||||
|
entity_id: str = 'all'
|
||||||
|
) -> set[str]:
|
||||||
|
callbacks = await self.get_callbacks(namespace, app, type, entity_id, copy=False)
|
||||||
|
return set(callbacks.keys())
|
||||||
120
appdaemon/context_manager.py
Normal file
120
appdaemon/context_manager.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
import asyncio
|
||||||
|
import functools
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from contextlib import ExitStack, contextmanager
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from threading import Event
|
||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AppDaemonRunContext:
|
||||||
|
_stack: ExitStack = field(default_factory=ExitStack)
|
||||||
|
stop_event: Event = field(default_factory=Event)
|
||||||
|
shutdown_grace_period: float = 0.1
|
||||||
|
|
||||||
|
loop: asyncio.AbstractEventLoop = field(init=False)
|
||||||
|
executor: ThreadPoolExecutor = field(init=False)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.loop = self._stack.enter_context(self.asyncio_context())
|
||||||
|
logger.debug("Created event loop")
|
||||||
|
|
||||||
|
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
||||||
|
for s in signals:
|
||||||
|
self.loop.add_signal_handler(
|
||||||
|
s, lambda s=s: self.loop.create_task(self.shutdown(s)))
|
||||||
|
|
||||||
|
self.executor = self._stack.enter_context(self.thread_context())
|
||||||
|
logger.debug("Started thread pool")
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
logger.debug(f'Closing context from {self.__class__.__name__}')
|
||||||
|
self._stack.close()
|
||||||
|
|
||||||
|
def get_running_tasks(self) -> list[asyncio.Task]:
|
||||||
|
return [
|
||||||
|
t for t in asyncio.all_tasks(self.loop)
|
||||||
|
if t is not asyncio.current_task()
|
||||||
|
]
|
||||||
|
|
||||||
|
async def shutdown(self, signal=signal.SIGTERM):
|
||||||
|
"""Cleanup tasks tied to the service's shutdown.
|
||||||
|
|
||||||
|
https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
|
||||||
|
"""
|
||||||
|
logger.info(f"Received exit signal {signal.name}...")
|
||||||
|
if not self.stop_event.is_set():
|
||||||
|
logger.debug('Setting stop event')
|
||||||
|
self.stop_event.set()
|
||||||
|
|
||||||
|
tasks = self.get_running_tasks()
|
||||||
|
|
||||||
|
graceful = (
|
||||||
|
asyncio.wait_for(t, timeout=self.shutdown_grace_period)
|
||||||
|
for t in tasks
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
'Allowing graceful shutdown from stop event for %ss',
|
||||||
|
self.shutdown_grace_period
|
||||||
|
)
|
||||||
|
await asyncio.gather(*graceful, return_exceptions=True)
|
||||||
|
|
||||||
|
for task in tasks:
|
||||||
|
if task.cancelled():
|
||||||
|
logger.warning(f'Cancelled {task.get_name()}')
|
||||||
|
|
||||||
|
logger.info("Calling stop() for asyncio event loop")
|
||||||
|
self.loop.stop()
|
||||||
|
else:
|
||||||
|
logger.warning('Already started shutdown')
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def asyncio_context(self):
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
yield loop
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
if loop.is_closed():
|
||||||
|
logger.debug("Gracefully closed event loop.")
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def thread_context(self):
|
||||||
|
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||||
|
yield executor
|
||||||
|
if executor._shutdown:
|
||||||
|
logger.debug('Gracefully shut down ThreadPoolExecutor')
|
||||||
|
|
||||||
|
async def run_in_executor(
|
||||||
|
self,
|
||||||
|
func: Callable,
|
||||||
|
*args,
|
||||||
|
timeout: float | None = None,
|
||||||
|
**kwargs
|
||||||
|
) -> Any:
|
||||||
|
"""Run the sync function using the ThreadPoolExecutor and await the result"""
|
||||||
|
timeout = timeout or 10.0
|
||||||
|
|
||||||
|
coro = self.loop.run_in_executor(
|
||||||
|
self.executor,
|
||||||
|
functools.partial(func, **kwargs),
|
||||||
|
*args,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return await asyncio.wait_for(coro, timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
print('Timed out')
|
||||||
|
|
||||||
|
async def sleep(self, delay: float):
|
||||||
|
"""Wrapper function for asyncio.sleep that suppresses and logs a task cancellation"""
|
||||||
|
try:
|
||||||
|
await self.run_in_executor(self.stop_event.wait, delay)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.debug('Cancelled during sleep')
|
||||||
26
appdaemon/entity.py
Normal file
26
appdaemon/entity.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
from appdaemon import utils
|
||||||
|
|
||||||
|
|
||||||
|
class Entity:
|
||||||
|
@utils.sync_decorator
|
||||||
|
async def get_callbacks(self, limit_to_app: str = None) -> dict[str, dict[str, Any]]:
|
||||||
|
async with self.AD.callbacks.callbacks_lock:
|
||||||
|
return {
|
||||||
|
handle: {
|
||||||
|
'app_name': cb_info['name'],
|
||||||
|
'function': cb_info['function'].__name__,
|
||||||
|
**cb_info["kwargs"]
|
||||||
|
}
|
||||||
|
for app_name, app_callbacks in self.AD.callbacks.callbacks.items()
|
||||||
|
if limit_to_app is None or app_name == limit_to_app
|
||||||
|
for handle, cb_info in app_callbacks.items()
|
||||||
|
if self.namespace == cb_info['namespace']
|
||||||
|
and cb_info["type"] == "state"
|
||||||
|
and cb_info.get('entity') == self.entity_id
|
||||||
|
}
|
||||||
|
|
||||||
|
@utils.sync_decorator
|
||||||
|
async def cancel_callbacks(self, all: bool = False):
|
||||||
|
for handle, info in (await self.get_callbacks()).items():
|
||||||
|
if all or self.name == info['app_name']:
|
||||||
|
await self.AD.state.cancel_state_callback(handle, info['app_name'])
|
||||||
79
appdaemon/main.py
Normal file
79
appdaemon/main.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
from contextlib import ExitStack
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
from appdaemon.models.ad_config import AppDaemonConfig
|
||||||
|
from context_manager import AppDaemonRunContext
|
||||||
|
from subsystem import AppDaemon
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ADMain:
|
||||||
|
"""Class to contain the mechanics to run AppDaemon as module
|
||||||
|
"""
|
||||||
|
config_file: str
|
||||||
|
cfg: AppDaemonConfig = field(init=False)
|
||||||
|
_stack: ExitStack = field(default_factory=ExitStack)
|
||||||
|
run_context: AppDaemonRunContext = field(init=False)
|
||||||
|
ad: AppDaemon = field(init=False)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
raw_cfg = read_config_file(self.config_file)
|
||||||
|
self.cfg = AppDaemonConfig(
|
||||||
|
config_file=self.config_file,
|
||||||
|
**raw_cfg['appdaemon']
|
||||||
|
)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
"""Used to start the asyncio loop, thread pool, and AppDaemon"""
|
||||||
|
# Use the ExitStack from ADMain instead of creating a new one
|
||||||
|
self.run_context = AppDaemonRunContext(_stack=self._stack)
|
||||||
|
self._stack.enter_context(self.run_context)
|
||||||
|
|
||||||
|
# Start AppDaemon by entering it's context
|
||||||
|
self.ad = AppDaemon(self.cfg, self.run_context)
|
||||||
|
self._stack.enter_context(self.ad)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
self.ad.logger.debug(f'Closing context from {self.__class__.__name__}')
|
||||||
|
self._stack.close()
|
||||||
|
self.ad.logger.info('Stopped main()')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if hasattr(self, 'ad'):
|
||||||
|
self.ad.logger.info('Running asyncio event loop indefinitely...')
|
||||||
|
self.run_context.loop.run_forever()
|
||||||
|
self.ad.logger.debug('Gracefully stopped event loop')
|
||||||
|
else:
|
||||||
|
logging.error('Running ADMain without context manager')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
from appdaemon.utils import read_config_file
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.highlighter import NullHighlighter
|
||||||
|
|
||||||
|
console = Console()
|
||||||
|
logging.config.dictConfig(
|
||||||
|
{
|
||||||
|
'version': 1,
|
||||||
|
'disable_existing_loggers': False,
|
||||||
|
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name:20}[/] {message}'}},
|
||||||
|
'handlers': {
|
||||||
|
'rich': {
|
||||||
|
'()': 'rich.logging.RichHandler',
|
||||||
|
'formatter': 'basic',
|
||||||
|
'console': console,
|
||||||
|
'highlighter': NullHighlighter(),
|
||||||
|
'markup': True
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'root': {'level': 'DEBUG', 'handlers': ['rich']},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
with ADMain('/conf/ad-test/conf/appdaemon.yaml') as main:
|
||||||
|
main.run()
|
||||||
|
main.ad.logger.debug('Exiting main context from with statement')
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
|
from copy import deepcopy
|
||||||
from logging import Logger
|
from logging import Logger
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -56,3 +56,52 @@ class States:
|
|||||||
|
|
||||||
async for remove in _send_dispatches():
|
async for remove in _send_dispatches():
|
||||||
await self.cancel_state_callback(**remove)
|
await self.cancel_state_callback(**remove)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_state(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
namespace: str,
|
||||||
|
entity_id: str | None = None,
|
||||||
|
attribute: str | None = None,
|
||||||
|
default: Any = None,
|
||||||
|
copy: bool = True,
|
||||||
|
):
|
||||||
|
self.logger.debug("get_state: %s.%s %s %s", entity_id, attribute, default, copy)
|
||||||
|
|
||||||
|
result = default
|
||||||
|
|
||||||
|
if ns := self.state.get(namespace):
|
||||||
|
# Process entity_id input
|
||||||
|
if entity_id is None:
|
||||||
|
result = ns
|
||||||
|
# TODO: filter by attribute?
|
||||||
|
elif "." not in entity_id:
|
||||||
|
domain = entity_id
|
||||||
|
result = {
|
||||||
|
eid: state
|
||||||
|
for eid, state in ns.items()
|
||||||
|
if eid.startswith(domain)
|
||||||
|
}
|
||||||
|
elif full_state := ns.get(entity_id):
|
||||||
|
result = full_state
|
||||||
|
else:
|
||||||
|
self.logger.warning(f"Entity {entity_id} does not exist in namespace {namespace}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Process attribute input
|
||||||
|
if attribute == "all":
|
||||||
|
result = result
|
||||||
|
elif attr := full_state.get(attribute):
|
||||||
|
result = attr
|
||||||
|
elif attr := full_state.get('attributes', {}).get(attribute):
|
||||||
|
result = attr
|
||||||
|
elif state := full_state.get("state"):
|
||||||
|
result = state
|
||||||
|
|
||||||
|
return deepcopy(result) if copy else result
|
||||||
|
else:
|
||||||
|
self.logger.warning(f"Namespace does not exist: {namespace}")
|
||||||
|
|
||||||
|
async def cancel_state_callback(self, handle: str, name: str, silent: bool = False) -> bool:
|
||||||
|
return await self.AD.callbacks.cancel_callback(handle, name, silent)
|
||||||
150
appdaemon/subsystem.py
Normal file
150
appdaemon/subsystem.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import traceback
|
||||||
|
from contextlib import ExitStack
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from logging import Logger, getLogger
|
||||||
|
from random import random
|
||||||
|
from threading import Event, RLock
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import Coroutine
|
||||||
|
|
||||||
|
from appdaemon.models import AppDaemonConfig
|
||||||
|
from context_manager import AppDaemonRunContext
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ADSubsystem:
|
||||||
|
AD: "AppDaemon"
|
||||||
|
stop: Event
|
||||||
|
"""An thread event for the subsystem to use to shutdown gracefully"""
|
||||||
|
lock: RLock = field(default_factory=RLock)
|
||||||
|
"""A threadsafe re-entrant lock to protect any internal data while it's being modified"""
|
||||||
|
logger: Logger = field(init=False)
|
||||||
|
tasks: list[asyncio.Task] = field(default_factory=list)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
name = f'_{self.__class__.__name__.lower()}'
|
||||||
|
self.logger = getLogger(f'AppDaemon.{name}')
|
||||||
|
self.create_task = self.AD.create_task
|
||||||
|
self.sleep = self.AD.context.sleep
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.logger.debug(f'Starting {self.__class__.__name__}')
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
pass
|
||||||
|
# self.logger.debug(f'Exiting {self.__class__.__name__}')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stopping(self) -> bool:
|
||||||
|
return self.stop.is_set()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Utility(ADSubsystem):
|
||||||
|
loop_rate: float = 1.0
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
super().__enter__()
|
||||||
|
self.create_task(self.loop(), 'utility loop', critical=True)
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def loop(self):
|
||||||
|
while not self.stopping:
|
||||||
|
self.logger.debug('Looping...')
|
||||||
|
await self.sleep(self.loop_rate)
|
||||||
|
|
||||||
|
task_name = asyncio.current_task().get_name()
|
||||||
|
self.logger.debug(f'Gracefully stopped {task_name} task')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Plugin(ADSubsystem):
|
||||||
|
state: dict[str, int] = field(default_factory=dict)
|
||||||
|
update_rate: float = 5.0
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
super().__post_init__()
|
||||||
|
self.state['update_count'] = 0
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
super().__enter__()
|
||||||
|
self.create_task(
|
||||||
|
self.periodic_self_udpate(),
|
||||||
|
name='plugin periodic update',
|
||||||
|
critical=True
|
||||||
|
)
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def periodic_self_udpate(self):
|
||||||
|
loop_time = perf_counter()
|
||||||
|
while not self.stopping:
|
||||||
|
with self.lock:
|
||||||
|
self.state['update_count'] += 1
|
||||||
|
# self.logger.debug('Long plugin update...')
|
||||||
|
# await asyncio.sleep(random())
|
||||||
|
self.logger.debug(
|
||||||
|
'Plugin self update: %s %s',
|
||||||
|
self.state["update_count"],
|
||||||
|
f'{perf_counter()-loop_time:.3f}s'
|
||||||
|
)
|
||||||
|
loop_time = perf_counter()
|
||||||
|
|
||||||
|
if self.state['update_count'] == 2:
|
||||||
|
raise ValueError('fake error')
|
||||||
|
|
||||||
|
await self.sleep(self.update_rate)
|
||||||
|
|
||||||
|
task_name = asyncio.current_task().get_name()
|
||||||
|
self.logger.debug(f'Gracefully stopped {task_name} task')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AppDaemon:
|
||||||
|
cfg: AppDaemonConfig
|
||||||
|
context: AppDaemonRunContext
|
||||||
|
_stack: ExitStack = field(default_factory=ExitStack)
|
||||||
|
utility: Utility = field(init=False)
|
||||||
|
plugins: dict[str, Plugin] = field(default_factory=dict)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
self.logger = logging.getLogger('AppDaemon')
|
||||||
|
self.utility = Utility(self, self.context.stop_event)
|
||||||
|
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.logger.info('Starting AppDaemon')
|
||||||
|
self._stack.enter_context(self.utility)
|
||||||
|
for plugin in self.plugins.values():
|
||||||
|
self._stack.enter_context(plugin)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
self._stack.__exit__(exc_type, exc_value, traceback)
|
||||||
|
|
||||||
|
def create_task(self, coro: Coroutine, name: str | None = None, critical: bool = False):
|
||||||
|
"""Creates an async task and adds exception callbacks"""
|
||||||
|
task = self.context.loop.create_task(coro, name=name)
|
||||||
|
task.add_done_callback(self.check_task_exception)
|
||||||
|
if critical:
|
||||||
|
task.add_done_callback(self.critical_exception)
|
||||||
|
return task
|
||||||
|
|
||||||
|
def check_task_exception(self, task: asyncio.Task):
|
||||||
|
if (exc := task.exception()) and not isinstance(exc, asyncio.CancelledError):
|
||||||
|
self.logger.error('\n'.join(traceback.format_exception(exc)))
|
||||||
|
|
||||||
|
def critical_exception(self, task: asyncio.Task):
|
||||||
|
if task.exception():
|
||||||
|
self.logger.critical(
|
||||||
|
'Critical error in %s, forcing shutdown',
|
||||||
|
task.get_name()
|
||||||
|
)
|
||||||
|
self.shutdown()
|
||||||
|
|
||||||
|
def shutdown(self):
|
||||||
|
self.logger.debug('Shutting down by sending SIGTERM')
|
||||||
|
os.kill(os.getpid(), signal.SIGTERM)
|
||||||
Reference in New Issue
Block a user