import asyncio import logging from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import AsyncIterable, Dict, Iterable, List import anyio logger = logging.getLogger(__name__) @dataclass class DirectoryMonitor: dir: Path glob: str = '*' times: Dict[Path | anyio.Path, datetime] = field(default_factory=dict, repr=False) changed: List[Path] = field(init=False, repr=False) new: List[Path] = field(init=False, repr=False) deleted: List[Path] = field(init=False, repr=False) last_checked: datetime = field(default_factory=datetime.now) def _reset_counts(self): self.last_checked = datetime.now() self.new, self.changed, self.deleted = [], [], [] def _process_file(self, file: Path | anyio.Path, modified_time: float): if file in self.times: if modified_time > self.times[file]: logger.info(f'Updated file: {file}') self.changed.append(file) else: logger.debug(f'New file: {file}') self.new.append(file) self.times[file] = modified_time def rglob(self) -> Iterable[Path]: for file in self.dir.rglob(self.glob): if file.is_dir(): logger.debug(f'Skipping directory: {file}') continue # optional filter logic here yield file async def async_rglob(self) -> AsyncIterable[anyio.Path]: async for file in anyio.Path(self.dir).rglob(self.glob): if await file.is_dir(): logger.debug(f'Skipping directory: {file}') continue # optional filter logic here yield file def update(self): logger.debug('Synchronous update') self._reset_counts() for file in self.rglob(): self._process_file(file, file.stat().st_mtime) for file in list(self.times.keys()): if not file.exists(): logger.warning(f'Detected deleted file: {file}') del self.times[file] self.deleted.append(file) async def update_async(self): logger.debug('Update async') self._reset_counts() async for file in self.async_rglob(): modified_time: float = (await file.stat()).st_mtime self._process_file(file, modified_time) for file in list(self.times.keys()): exists = await file.exists() if not exists: logger.warning(f'Detected deleted file: {file}') del self.times[file] self.deleted.append(file) async def monitor(self): while True: try: await self.update_async() except KeyboardInterrupt: # if a KeyboardInterrupt happens during the update, pass it through raise except Exception as e: # if anything unexpected happens, log it and break the loop logger.exception(e) break else: await asyncio.sleep(1.0) if __name__ == '__main__': import logging.config logging.config.dictConfig( { 'version': 1, 'disable_existing_loggers': False, 'formatters': {'basic': {'style': '{', 'format': '{message}'}}, 'handlers': { 'rich': { '()': 'rich.logging.RichHandler', 'omit_repeated_times': False, 'highlighter': None, } }, 'loggers': {__name__: {'level': 'DEBUG', 'handlers': ['rich']}}, } ) dm = DirectoryMonitor(Path('../conf/apps'), '*.py') try: asyncio.run(dm.monitor()) except KeyboardInterrupt: logger.error('KeyboardInterrupt')