import asyncio import logging from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import AsyncIterable, Awaitable, Callable, Dict, Iterable, List import anyio logger = logging.getLogger(__name__) @dataclass class DirectoryMonitor: dir: Path glob: str = '*' times: Dict[Path | anyio.Path, float] = field(default_factory=dict, repr=False) last_checked: datetime = field(default_factory=datetime.now) new: List[Path | anyio.Path] = field(init=False, repr=False) changed: List[Path | anyio.Path] = field(init=False, repr=False) deleted: List[Path | anyio.Path] = field(init=False, repr=False) def _reset_counts(self): self.last_checked = datetime.now() self.new, self.changed, self.deleted = [], [], [] def _update_file(self, file: Path | anyio.Path, modified_time: float): if file in self.times: if modified_time > self.times[file]: logger.info(f'Updated file: {file}') self.changed.append(file) else: logger.debug(f'New file: {file}') self.new.append(file) self.times[file] = modified_time def _delete_file(self, file: Path | anyio.Path): logger.warning(f'Detected deleted file: {file}') del self.times[file] self.deleted.append(file) def rglob(self) -> Iterable[Path]: for file in self.dir.rglob(self.glob): if file.is_dir(): logger.debug(f'Skipping directory: {file}') continue # optional filter logic here yield file async def async_rglob(self) -> AsyncIterable[anyio.Path]: async for file in anyio.Path(self.dir).rglob(self.glob): if await file.is_dir(): logger.debug(f'Skipping directory: {file}') continue # optional filter logic here yield file def update(self): logger.debug('Synchronous update') self._reset_counts() for file in self.rglob(): self._update_file(file, file.stat().st_mtime) for file in list(self.times.keys()): if not file.exists(): self._delete_file(file) async def update_async(self): # logger.debug('Async update') self._reset_counts() async for file in self.async_rglob(): modified_time: float = (await file.stat()).st_mtime self._update_file(file, modified_time) for file in list(self.times.keys()): if not (await file.exists()): self._delete_file(file) @dataclass class AppConfigMonitor: dir: str | Path poll_sleep_time: float = 0.5 config_monitor: DirectoryMonitor = field(init=False) module_monitor: DirectoryMonitor = field(init=False) def __post_init__(self): self.dir = Path(self.dir) assert self.dir.exists() self.config_monitor = DirectoryMonitor(self.dir, '*.yaml') self.module_monitor = DirectoryMonitor(self.dir, '*.py') async def monitor( self, config_update_callback: Callable | Awaitable = None, module_update_callback: Callable | Awaitable = None, ): while True: try: await self.config_monitor.update_async() await self.module_monitor.update_async() except KeyboardInterrupt: # if a KeyboardInterrupt happens during the update, pass it through raise except Exception as e: # if anything unexpected happens, log it and break the loop cleanly logger.exception(e) break else: if isinstance(config_update_callback, Callable): config_update_callback(self.config_monitor) elif isinstance(config_update_callback, Awaitable): await config_update_callback(self.config_monitor) if isinstance(module_update_callback, Callable): module_update_callback(self.module_monitor) elif isinstance(module_update_callback, Awaitable): await module_update_callback(self.module_monitor) await asyncio.sleep(self.poll_sleep_time) if __name__ == '__main__': import logging.config from rich.console import Console console = Console() logging.config.dictConfig( { 'version': 1, 'disable_existing_loggers': False, 'formatters': {'basic': {'style': '{', 'format': '{message}'}}, 'handlers': { 'rich': { '()': 'rich.logging.RichHandler', 'omit_repeated_times': False, 'highlighter': None, 'console': console, } }, 'loggers': {__name__: {'level': 'DEBUG', 'handlers': ['rich']}}, } ) # dm = DirectoryMonitor(Path('../conf/apps'), '*.py') # try: # with console.status(f'Monitoring {dm.dir}'): # asyncio.run(dm.monitor()) # except KeyboardInterrupt: # logger.error('KeyboardInterrupt') acm = AppConfigMonitor('../conf/apps') try: with console.status(f'Monitoring {acm.dir}'): asyncio.run(acm.monitor()) except KeyboardInterrupt: logger.error('KeyboardInterrupt')