Compare commits
3 Commits
5e458aca41
...
050fe75e71
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
050fe75e71 | ||
|
|
8064a73e9f | ||
|
|
9121d1ef04 |
@@ -4,26 +4,38 @@ import logging
|
|||||||
import signal
|
import signal
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from contextlib import ExitStack, contextmanager
|
from contextlib import ExitStack, contextmanager
|
||||||
|
from threading import Event, Lock
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
class CustomContextManager:
|
def handler(signum, frame):
|
||||||
|
print('Signal handler called with signal', signum)
|
||||||
|
raise OSError("Couldn't open device!")
|
||||||
|
|
||||||
|
|
||||||
|
class AppDaemonRunContext:
|
||||||
_stack: ExitStack
|
_stack: ExitStack
|
||||||
loop: asyncio.AbstractEventLoop
|
loop: asyncio.AbstractEventLoop
|
||||||
executor: ThreadPoolExecutor
|
executor: ThreadPoolExecutor
|
||||||
|
stop_event: Event
|
||||||
|
shutdown_lock: Lock
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._stack = ExitStack()
|
self._stack = ExitStack()
|
||||||
|
self.stop_event = Event()
|
||||||
|
self.shutdown_lock = Lock()
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.loop = self._stack.enter_context(self.asyncio_context())
|
self.loop = self._stack.enter_context(self.asyncio_context())
|
||||||
logger.debug("Entered asyncio context")
|
logger.debug("Entered asyncio context")
|
||||||
self.loop.add_signal_handler(signal.SIGINT, self.handle_signal)
|
|
||||||
self.loop.add_signal_handler(signal.SIGTERM, self.handle_signal)
|
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
||||||
# self.executor = self._stack.enter_context(ThreadPoolExecutor(max_workers=5))
|
for s in signals:
|
||||||
|
self.loop.add_signal_handler(s, lambda s=s: self.loop.create_task(self.shutdown(s)))
|
||||||
|
|
||||||
self.executor = self._stack.enter_context(self.thread_context())
|
self.executor = self._stack.enter_context(self.thread_context())
|
||||||
logger.debug("Entered threadpool context")
|
logger.debug("Entered threadpool context")
|
||||||
return self
|
return self
|
||||||
@@ -32,29 +44,45 @@ class CustomContextManager:
|
|||||||
self._stack.__exit__(exc_type, exc_value, traceback)
|
self._stack.__exit__(exc_type, exc_value, traceback)
|
||||||
logger.debug("Exited context")
|
logger.debug("Exited context")
|
||||||
|
|
||||||
|
async def shutdown(self, signal):
|
||||||
|
"""Cleanup tasks tied to the service's shutdown.
|
||||||
|
|
||||||
|
https://www.roguelynn.com/words/asyncio-graceful-shutdowns/
|
||||||
|
"""
|
||||||
|
logger.info(f"Received exit signal {signal.name}...")
|
||||||
|
if not self.stop_event.is_set():
|
||||||
|
logger.debug('Setting stop event')
|
||||||
|
self.stop_event.set()
|
||||||
|
|
||||||
|
graceful = (
|
||||||
|
asyncio.wait_for(t, timeout=2.0)
|
||||||
|
for t in asyncio.all_tasks()
|
||||||
|
if t is not asyncio.current_task()
|
||||||
|
)
|
||||||
|
logger.debug('Allowing graceful shutdown from stop event')
|
||||||
|
await asyncio.gather(*graceful, return_exceptions=True)
|
||||||
|
|
||||||
|
logger.debug("Stopping event loop in context shutdown")
|
||||||
|
self.loop.stop()
|
||||||
|
else:
|
||||||
|
logger.warning('Already started shutdown')
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def asyncio_context(self):
|
def asyncio_context(self):
|
||||||
try:
|
try:
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
asyncio.set_event_loop(loop)
|
|
||||||
yield loop
|
yield loop
|
||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
logger.info("Closed the event loop.")
|
if loop.is_closed():
|
||||||
|
logger.debug("Closed the event loop.")
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def thread_context(self):
|
def thread_context(self):
|
||||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||||
yield executor
|
yield executor
|
||||||
logger.debug('Shut down the ThreadPoolExecutor')
|
if executor._shutdown:
|
||||||
|
logger.debug('Shut down the ThreadPoolExecutor')
|
||||||
def handle_signal(self, signum=None, frame=None):
|
|
||||||
logger.info(f'Handle signal: {signum}, {frame}')
|
|
||||||
# match signum:
|
|
||||||
# case signal.SIGINT:
|
|
||||||
# pass
|
|
||||||
# case signal.SIGTERM:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
async def run_in_executor(
|
async def run_in_executor(
|
||||||
self,
|
self,
|
||||||
@@ -97,14 +125,17 @@ if __name__ == "__main__":
|
|||||||
await asyncio.sleep(delay)
|
await asyncio.sleep(delay)
|
||||||
logger.info(f'{id_} Done async')
|
logger.info(f'{id_} Done async')
|
||||||
|
|
||||||
with CustomContextManager() as cm:
|
with AppDaemonRunContext() as cm:
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
logger.info('Submitting random dummy_functions')
|
logger.info('Submitting random dummy_functions')
|
||||||
cm.executor.submit(dummy_function, random.random() * 3.0)
|
cm.executor.submit(dummy_function, random.random() * 10.0)
|
||||||
cm.loop.create_task(async_dummy_function(random.random() * 5.0))
|
cm.loop.create_task(async_dummy_function(random.random() * 5.0))
|
||||||
|
|
||||||
logger.info('Running until complete')
|
try:
|
||||||
cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop)))
|
logger.info('Running until complete')
|
||||||
|
cm.loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(cm.loop)))
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.error('Cancelled')
|
||||||
|
|
||||||
if cm.loop.is_closed():
|
if cm.loop.is_closed():
|
||||||
logger.debug('Loop is closed')
|
logger.debug('Loop is closed')
|
||||||
26
appdaemon/entity.py
Normal file
26
appdaemon/entity.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
from appdaemon import utils
|
||||||
|
|
||||||
|
|
||||||
|
class Entity:
|
||||||
|
@utils.sync_decorator
|
||||||
|
async def get_callbacks(self, limit_to_app: str = None) -> dict[str, dict[str, Any]]:
|
||||||
|
async with self.AD.callbacks.callbacks_lock:
|
||||||
|
return {
|
||||||
|
handle: {
|
||||||
|
'app_name': cb_info['name'],
|
||||||
|
'function': cb_info['function'].__name__,
|
||||||
|
**cb_info["kwargs"]
|
||||||
|
}
|
||||||
|
for app_name, app_callbacks in self.AD.callbacks.callbacks.items()
|
||||||
|
if limit_to_app is None or app_name == limit_to_app
|
||||||
|
for handle, cb_info in app_callbacks.items()
|
||||||
|
if self.namespace == cb_info['namespace']
|
||||||
|
and cb_info["type"] == "state"
|
||||||
|
and cb_info.get('entity') == self.entity_id
|
||||||
|
}
|
||||||
|
|
||||||
|
@utils.sync_decorator
|
||||||
|
async def cancel_callbacks(self, all: bool = False):
|
||||||
|
for handle, info in (await self.get_callbacks()).items():
|
||||||
|
if all or self.name == info['app_name']:
|
||||||
|
await self.AD.state.cancel_state_callback(handle, info['app_name'])
|
||||||
146
appdaemon/subsystem.py
Normal file
146
appdaemon/subsystem.py
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from logging import Logger, getLogger
|
||||||
|
from threading import Event, RLock
|
||||||
|
from typing import Callable, Coroutine
|
||||||
|
|
||||||
|
from appdaemon.models import AppDaemonConfig
|
||||||
|
from context_manager import AppDaemonRunContext
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ADSubsystem:
|
||||||
|
AD: "AppDaemon"
|
||||||
|
stop: Event
|
||||||
|
lock: RLock = field(default_factory=RLock)
|
||||||
|
logger: Logger = field(init=False)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
name = f'_{self.__class__.__name__.lower()}'
|
||||||
|
self.logger = getLogger(f'AppDaemon.{name}')
|
||||||
|
self.AD.starts.append(self.start)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stopping(self) -> bool:
|
||||||
|
return self.stop.is_set()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
raise NotImplementedError('Need to implement start for subsystem')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Utility(ADSubsystem):
|
||||||
|
loop_rate: float = 1.0
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.AD.create_task(self.loop(), 'Utility loop')
|
||||||
|
|
||||||
|
async def loop(self):
|
||||||
|
while not self.stopping:
|
||||||
|
self.logger.info('Looping...')
|
||||||
|
await asyncio.sleep(self.loop_rate)
|
||||||
|
self.logger.debug('Stopped utility loop')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Plugin(ADSubsystem):
|
||||||
|
state: dict[str, int] = field(default_factory=dict)
|
||||||
|
update_rate: float = 30.0
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
super().__post_init__()
|
||||||
|
self.state['update_count'] = 0
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.AD.create_task(self.periodic_self_udpate(), 'Periodic plugin update')
|
||||||
|
|
||||||
|
async def periodic_self_udpate(self):
|
||||||
|
while not self.stopping:
|
||||||
|
with self.lock:
|
||||||
|
self.state['update_count'] += 1
|
||||||
|
self.logger.info(f'Updated self: {self.state["update_count"]}')
|
||||||
|
await asyncio.sleep(self.update_rate)
|
||||||
|
self.logger.debug('Stopped plugin updates')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AppDaemon:
|
||||||
|
cfg: AppDaemonConfig
|
||||||
|
context: AppDaemonRunContext
|
||||||
|
utility: Utility = field(init=False)
|
||||||
|
plugins: dict[str, Plugin] = field(default_factory=dict)
|
||||||
|
starts: list[Callable] = field(default_factory=list)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
self.utility = Utility(self, self.context.stop_event)
|
||||||
|
self.plugins['dummy'] = Plugin(self, self.context.stop_event)
|
||||||
|
|
||||||
|
def create_task(self, coro: Coroutine, name: str | None = None):
|
||||||
|
return self.context.loop.create_task(coro, name=name)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
for start in self.starts:
|
||||||
|
start()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ADMain:
|
||||||
|
config_file: str
|
||||||
|
cfg: AppDaemonConfig = field(init=False)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
raw_cfg = read_config_file(self.config_file)
|
||||||
|
self.cfg = AppDaemonConfig(
|
||||||
|
config_file=self.config_file,
|
||||||
|
**raw_cfg['appdaemon']
|
||||||
|
)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
with AppDaemonRunContext() as cm:
|
||||||
|
ad = AppDaemon(self.cfg, cm)
|
||||||
|
ad.start()
|
||||||
|
cm.loop.run_forever()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
from appdaemon.utils import read_config_file
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.highlighter import NullHighlighter
|
||||||
|
|
||||||
|
console = Console()
|
||||||
|
logging.config.dictConfig(
|
||||||
|
{
|
||||||
|
'version': 1,
|
||||||
|
'disable_existing_loggers': False,
|
||||||
|
'formatters': {'basic': {'style': '{', 'format': '[yellow]{name}[/] {message}'}},
|
||||||
|
'handlers': {
|
||||||
|
'rich': {
|
||||||
|
'()': 'rich.logging.RichHandler',
|
||||||
|
'formatter': 'basic',
|
||||||
|
'console': console,
|
||||||
|
'highlighter': NullHighlighter(),
|
||||||
|
'markup': True
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'root': {'level': 'DEBUG', 'handlers': ['rich']},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
main = ADMain('/conf/ad-test/conf/appdaemon.yaml')
|
||||||
|
main.run()
|
||||||
|
|
||||||
|
# config_file = '/conf/ad-test/conf/appdaemon.yaml'
|
||||||
|
# raw_cfg = read_config_file(config_file)
|
||||||
|
|
||||||
|
# cfg = AppDaemonConfig(
|
||||||
|
# config_file=config_file,
|
||||||
|
# **raw_cfg['appdaemon']
|
||||||
|
# )
|
||||||
|
|
||||||
|
# with AppDaemonRunContext() as cm:
|
||||||
|
# ad = AppDaemon(cfg, cm)
|
||||||
|
# # ad.start()
|
||||||
|
# # cm.loop.run_forever()
|
||||||
Reference in New Issue
Block a user