import asyncio from io import StringIO import json import logging from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import AsyncIterable, Awaitable, Callable, Dict, Iterable, List import anyio import yaml from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError logger = logging.getLogger(__name__) class AppConfig(BaseModel): module: str class_: str = Field(validation_alias='class', serialization_alias='class') model_config = ConfigDict(validate_assignment=True) AppConfigs = TypeAdapter(Dict[str, AppConfig]) @dataclass class DirectoryMonitor: dir: Path glob: str = '*' times: Dict[Path | anyio.Path, float] = field(default_factory=dict, repr=False) last_checked: datetime = field(default_factory=datetime.now) new: List[Path | anyio.Path] = field(init=False, repr=False) changed: List[Path | anyio.Path] = field(init=False, repr=False) deleted: List[Path | anyio.Path] = field(init=False, repr=False) def _reset_counts(self): self.last_checked = datetime.now() self.new, self.changed, self.deleted = [], [], [] def _update_file(self, file: Path | anyio.Path, modified_time: float): if file in self.times: if modified_time > self.times[file]: logger.info(f'Updated file: {file}') self.changed.append(file) else: logger.debug(f'New file: {file}') self.new.append(file) self.times[file] = modified_time def _delete_file(self, file: Path | anyio.Path): logger.warning(f'Detected deleted file: {file}') del self.times[file] self.deleted.append(file) def rglob(self) -> Iterable[Path]: for file in self.dir.rglob(self.glob): if file.is_dir(): logger.debug(f'Skipping directory: {file}') continue # optional filter logic here yield file async def async_rglob(self) -> AsyncIterable[anyio.Path]: async for file in anyio.Path(self.dir).rglob(self.glob): if await file.is_dir(): logger.debug(f'Skipping directory: {file}') continue # optional filter logic here yield file def update(self): logger.debug('Synchronous update') self._reset_counts() for file in self.rglob(): self._update_file(file, file.stat().st_mtime) for file in list(self.times.keys()): if not file.exists(): self._delete_file(file) async def update_async(self): # logger.debug('Async update') self._reset_counts() async for file in self.async_rglob(): modified_time: float = (await file.stat()).st_mtime self._update_file(file, modified_time) for file in list(self.times.keys()): if not (await file.exists()): self._delete_file(file) @dataclass class AppConfigMonitor: dir: str | Path poll_sleep_time: float = 0.5 config_monitor: DirectoryMonitor = field(init=False) module_monitor: DirectoryMonitor = field(init=False) def __post_init__(self): self.dir = Path(self.dir) assert self.dir.exists() self.module_monitor = DirectoryMonitor(self.dir, '*.py') self.config_monitor = DirectoryMonitor(self.dir, '*.yaml') async def monitor( self, module_update_callback: Callable | Awaitable = None, config_update_callback: Callable | Awaitable = None, ): while True: try: await self.module_monitor.update_async() await self.config_monitor.update_async() except KeyboardInterrupt: # if a KeyboardInterrupt happens during the update, pass it through raise except Exception as e: # if anything unexpected happens, log it and break the loop cleanly logger.exception(e) break else: module_result = module_update_callback(self.module_monitor) if isinstance(module_result, Awaitable): logger.debug('Awaiting module result') await module_result config_result = config_update_callback(self.config_monitor) if isinstance(config_result, Awaitable): logger.debug('Awaiting config result') await config_result await asyncio.sleep(self.poll_sleep_time) def load_yaml(file: Path): return yaml.safe_load(file) @dataclass class AppLoader: dir: str acm: AppConfigMonitor = field(init=False) def __post_init__(self): self.dir = Path(self.dir) self.acm = AppConfigMonitor(self.dir) async def monitor(self): await self.acm.monitor( module_update_callback=self.load_reload_modules, config_update_callback=self.load_reload_configs, ) async def load_reload_modules(self, dm: DirectoryMonitor): if dm.new or dm.changed: modules_to_load = dm.new + dm.changed dep_str = '\n'.join(f' {m}' for m in modules_to_load) logger.debug('Resolving module dependencies for \n%s', dep_str) await asyncio.sleep(0.5) logger.debug('(Re)loading modules and packages from the inside out') async def load_reload_configs(self, dm: DirectoryMonitor): if dm.new or dm.changed: configs_to_load = dm.new + dm.changed app_str = '\n'.join(f' {m}' for m in configs_to_load) logger.debug('Resolving app config dependencies for \n%s', app_str) await asyncio.sleep(0.25) logger.debug('Reloading app configs in dependency order') app_configs = { app_name: cfg async for app_name, cfg in self.gen_valid_app_configs(configs_to_load) } app_configs = AppConfigs.dump_python(app_configs) logger.debug(json.dumps(app_configs, indent=2)) async def gen_raw_config_text(self, config_files: Iterable[Path]): for file in config_files: try: yield file, await file.read_text() except Exception as e: # logger.exception(e) logger.error(f'Error reading {file}') async def gen_raw_app_configs(self, config_files: Iterable[Path]): async for file, text in self.gen_raw_config_text(config_files): try: yield file, yaml.safe_load(StringIO(text)) except Exception as e: # logger.exception(e) logger.error(f'Error parsing YAML from {file}') async def gen_valid_app_configs(self, config_files: Iterable[Path]): async for file, raw_full_cfg in self.gen_raw_app_configs(config_files): for app_name, raw_cfg in raw_full_cfg.items(): try: valid_cfg = AppConfig.model_validate(raw_cfg) except ValidationError as e: logger.error(f'Error validating "{app_name}" from {file}') logger.error(e) else: yield app_name, valid_cfg if __name__ == '__main__': import logging.config from rich.console import Console console = Console() logging.config.dictConfig( { 'version': 1, 'disable_existing_loggers': False, 'formatters': {'basic': {'style': '{', 'format': '{message}'}}, 'handlers': { 'rich': { '()': 'rich.logging.RichHandler', 'omit_repeated_times': False, 'highlighter': None, 'console': console, } }, 'loggers': {__name__: {'level': 'DEBUG', 'handlers': ['rich']}}, } ) # dm = DirectoryMonitor(Path('../conf/apps'), '*.py') # try: # with console.status(f'Monitoring {dm.dir}'): # asyncio.run(dm.monitor()) # except KeyboardInterrupt: # logger.error('KeyboardInterrupt') acm = AppLoader('../conf/apps') try: with console.status(f'Monitoring {acm.dir}'): asyncio.run(acm.monitor()) except KeyboardInterrupt: logger.error('KeyboardInterrupt')