diff --git a/direcory_monitor.py b/direcory_monitor.py deleted file mode 100644 index c864a09..0000000 --- a/direcory_monitor.py +++ /dev/null @@ -1,49 +0,0 @@ -import logging -from dataclasses import dataclass, field -from datetime import datetime -from pathlib import Path -from typing import Dict, List - -logger = logging.getLogger(__name__) - - -@dataclass -class DirectoryMonitor: - dir: Path - glob: str = '*' - times: Dict[Path, datetime] = field(default_factory=dict, repr=False) - changed: List[Path] = field(init=False, repr=False) - new: List[Path] = field(init=False, repr=False) - deleted: List[Path] = field(init=False, repr=False) - last_checked: datetime = field(default_factory=datetime.now) - - def __post_init__(self): - self.update() - - def update(self): - self.last_checked = datetime.now() - self.new, self.changed, self.deleted = [], [], [] - - # iterate through the directory and update the stored times - for file in self.dir.rglob(self.glob): - if file.is_dir(): - logger.debug(f'Skipping directory: {file}') - continue - - modified_time = datetime.fromtimestamp(file.stat().st_mtime) - if file in self.times: - if modified_time > self.times[file]: - logger.info(f'Updated file: {file}') - self.changed.append(file) - else: - logger.debug(f'New file: {file}') - self.new.append(file) - - self.times[file] = modified_time - - # remove any entries for files that no longer exist - for file in list(self.times.keys()): - if not file.exists(): - logger.warning(f'Detected deleted file: {file}') - del self.times[file] - self.deleted.append(file) diff --git a/directory_monitor.py b/directory_monitor.py new file mode 100644 index 0000000..fa6e0c1 --- /dev/null +++ b/directory_monitor.py @@ -0,0 +1,121 @@ +import asyncio +import logging +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import AsyncIterable, Dict, Iterable, List + +import anyio + +logger = logging.getLogger(__name__) + + +@dataclass +class DirectoryMonitor: + dir: Path + glob: str = '*' + times: Dict[Path | anyio.Path, datetime] = field(default_factory=dict, repr=False) + changed: List[Path] = field(init=False, repr=False) + new: List[Path] = field(init=False, repr=False) + deleted: List[Path] = field(init=False, repr=False) + last_checked: datetime = field(default_factory=datetime.now) + + def _reset_counts(self): + self.last_checked = datetime.now() + self.new, self.changed, self.deleted = [], [], [] + + def _process_file(self, file: Path | anyio.Path, modified_time: float): + if file in self.times: + if modified_time > self.times[file]: + logger.info(f'Updated file: {file}') + self.changed.append(file) + else: + logger.debug(f'New file: {file}') + self.new.append(file) + + self.times[file] = modified_time + + def rglob(self) -> Iterable[Path]: + for file in self.dir.rglob(self.glob): + if file.is_dir(): + logger.debug(f'Skipping directory: {file}') + continue + # optional filter logic here + yield file + + async def async_rglob(self) -> AsyncIterable[anyio.Path]: + async for file in anyio.Path(self.dir).rglob(self.glob): + if await file.is_dir(): + logger.debug(f'Skipping directory: {file}') + continue + # optional filter logic here + yield file + + def update(self): + logger.debug('Synchronous update') + self._reset_counts() + + for file in self.rglob(): + self._process_file(file, file.stat().st_mtime) + + for file in list(self.times.keys()): + if not file.exists(): + logger.warning(f'Detected deleted file: {file}') + del self.times[file] + self.deleted.append(file) + + async def update_async(self): + logger.debug('Update async') + self._reset_counts() + + async for file in self.async_rglob(): + modified_time: float = (await file.stat()).st_mtime + self._process_file(file, modified_time) + + for file in list(self.times.keys()): + exists = await file.exists() + if not exists: + logger.warning(f'Detected deleted file: {file}') + del self.times[file] + self.deleted.append(file) + + async def monitor(self): + while True: + try: + await self.update_async() + except KeyboardInterrupt: + # if a KeyboardInterrupt happens during the update, pass it through + raise + except Exception as e: + # if anything unexpected happens, log it and break the loop + logger.exception(e) + break + else: + await asyncio.sleep(1.0) + + +if __name__ == '__main__': + import logging.config + + logging.config.dictConfig( + { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': {'basic': {'style': '{', 'format': '{message}'}}, + 'handlers': { + 'rich': { + '()': 'rich.logging.RichHandler', + 'omit_repeated_times': False, + 'highlighter': None, + } + }, + 'loggers': {__name__: {'level': 'DEBUG', 'handlers': ['rich']}}, + } + ) + + dm = DirectoryMonitor(Path('../conf/apps'), '*.py') + + try: + asyncio.run(dm.monitor()) + except KeyboardInterrupt: + logger.error('KeyboardInterrupt')