diff --git a/directory_monitor.py b/directory_monitor.py index 38e2486..029da2f 100644 --- a/directory_monitor.py +++ b/directory_monitor.py @@ -6,10 +6,21 @@ from pathlib import Path from typing import AsyncIterable, Awaitable, Callable, Dict, Iterable, List import anyio +import yaml +from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError logger = logging.getLogger(__name__) +class AppConfig(BaseModel): + module: str + class_: str = Field(validation_alias='class') + model_config = ConfigDict(validate_assignment=True) + + +AppConfigs = TypeAdapter(Dict[str, AppConfig]) + + @dataclass class DirectoryMonitor: dir: Path @@ -90,18 +101,18 @@ class AppConfigMonitor: def __post_init__(self): self.dir = Path(self.dir) assert self.dir.exists() - self.config_monitor = DirectoryMonitor(self.dir, '*.yaml') self.module_monitor = DirectoryMonitor(self.dir, '*.py') + self.config_monitor = DirectoryMonitor(self.dir, '*.yaml') async def monitor( self, - config_update_callback: Callable | Awaitable = None, module_update_callback: Callable | Awaitable = None, + config_update_callback: Callable | Awaitable = None, ): while True: try: - await self.config_monitor.update_async() await self.module_monitor.update_async() + await self.config_monitor.update_async() except KeyboardInterrupt: # if a KeyboardInterrupt happens during the update, pass it through raise @@ -110,19 +121,80 @@ class AppConfigMonitor: logger.exception(e) break else: - if isinstance(config_update_callback, Callable): - config_update_callback(self.config_monitor) - elif isinstance(config_update_callback, Awaitable): - await config_update_callback(self.config_monitor) + module_result = module_update_callback(self.module_monitor) + if isinstance(module_result, Awaitable): + logger.debug('Awaiting module result') + await module_result - if isinstance(module_update_callback, Callable): - module_update_callback(self.module_monitor) - elif isinstance(module_update_callback, Awaitable): - await module_update_callback(self.module_monitor) + config_result = config_update_callback(self.config_monitor) + if isinstance(config_result, Awaitable): + logger.debug('Awaiting config result') + await config_result await asyncio.sleep(self.poll_sleep_time) +def load_yaml(file: Path): + return yaml.safe_load(file) + + +@dataclass +class AppLoader: + dir: str + acm: AppConfigMonitor = field(init=False) + + def __post_init__(self): + self.dir = Path(self.dir) + self.acm = AppConfigMonitor(self.dir) + + async def monitor(self): + await self.acm.monitor( + module_update_callback=self.load_reload_modules, + config_update_callback=self.load_reload_configs, + ) + + async def load_reload_modules(self, dm: DirectoryMonitor): + if dm.new or dm.changed: + modules_to_load = dm.new + dm.changed + dep_str = '\n'.join(f' {m}' for m in modules_to_load) + logger.debug('Resolving module dependencies for \n%s', dep_str) + await asyncio.sleep(0.5) + logger.debug('(Re)loading modules and packages from the inside out') + + async def load_reload_configs(self, dm: DirectoryMonitor): + if dm.new or dm.changed: + configs_to_load = dm.new + dm.changed + app_str = '\n'.join(f' {m}' for m in configs_to_load) + logger.debug('Resolving app config dependencies for \n%s', app_str) + await asyncio.sleep(0.25) + logger.debug('Reloading app configs in dependency order') + + app_configs = { + app_name: cfg async for app_name, cfg in self.gen_valid_app_configs(configs_to_load) + } + logger.debug(app_configs) + + async def gen_raw_app_configs(self, configs_to_load: Iterable[Path]): + for config_file in configs_to_load: + try: + with Path(config_file).open('r') as f: + yield yaml.safe_load(f) + except Exception as e: + logger.exception(e) + logger.error(f'Error reading from {config_file}') + + async def gen_valid_app_configs(self, configs_to_load: Iterable[Path]): + async for raw_full_cfg in self.gen_raw_app_configs(configs_to_load): + for app_name, raw_cfg in raw_full_cfg.items(): + try: + valid_cfg = AppConfig.model_validate(raw_cfg) + except ValidationError as e: + logger.error(f'Error validating "{app_name}"') + logger.error(e) + else: + yield app_name, valid_cfg + + if __name__ == '__main__': import logging.config @@ -155,7 +227,7 @@ if __name__ == '__main__': # except KeyboardInterrupt: # logger.error('KeyboardInterrupt') - acm = AppConfigMonitor('../conf/apps') + acm = AppLoader('../conf/apps') try: with console.status(f'Monitoring {acm.dir}'):