added AppLoader
This commit is contained in:
@@ -6,10 +6,21 @@ from pathlib import Path
|
|||||||
from typing import AsyncIterable, Awaitable, Callable, Dict, Iterable, List
|
from typing import AsyncIterable, Awaitable, Callable, Dict, Iterable, List
|
||||||
|
|
||||||
import anyio
|
import anyio
|
||||||
|
import yaml
|
||||||
|
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AppConfig(BaseModel):
|
||||||
|
module: str
|
||||||
|
class_: str = Field(validation_alias='class')
|
||||||
|
model_config = ConfigDict(validate_assignment=True)
|
||||||
|
|
||||||
|
|
||||||
|
AppConfigs = TypeAdapter(Dict[str, AppConfig])
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DirectoryMonitor:
|
class DirectoryMonitor:
|
||||||
dir: Path
|
dir: Path
|
||||||
@@ -90,18 +101,18 @@ class AppConfigMonitor:
|
|||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
self.dir = Path(self.dir)
|
self.dir = Path(self.dir)
|
||||||
assert self.dir.exists()
|
assert self.dir.exists()
|
||||||
self.config_monitor = DirectoryMonitor(self.dir, '*.yaml')
|
|
||||||
self.module_monitor = DirectoryMonitor(self.dir, '*.py')
|
self.module_monitor = DirectoryMonitor(self.dir, '*.py')
|
||||||
|
self.config_monitor = DirectoryMonitor(self.dir, '*.yaml')
|
||||||
|
|
||||||
async def monitor(
|
async def monitor(
|
||||||
self,
|
self,
|
||||||
config_update_callback: Callable | Awaitable = None,
|
|
||||||
module_update_callback: Callable | Awaitable = None,
|
module_update_callback: Callable | Awaitable = None,
|
||||||
|
config_update_callback: Callable | Awaitable = None,
|
||||||
):
|
):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
await self.config_monitor.update_async()
|
|
||||||
await self.module_monitor.update_async()
|
await self.module_monitor.update_async()
|
||||||
|
await self.config_monitor.update_async()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
# if a KeyboardInterrupt happens during the update, pass it through
|
# if a KeyboardInterrupt happens during the update, pass it through
|
||||||
raise
|
raise
|
||||||
@@ -110,19 +121,80 @@ class AppConfigMonitor:
|
|||||||
logger.exception(e)
|
logger.exception(e)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
if isinstance(config_update_callback, Callable):
|
module_result = module_update_callback(self.module_monitor)
|
||||||
config_update_callback(self.config_monitor)
|
if isinstance(module_result, Awaitable):
|
||||||
elif isinstance(config_update_callback, Awaitable):
|
logger.debug('Awaiting module result')
|
||||||
await config_update_callback(self.config_monitor)
|
await module_result
|
||||||
|
|
||||||
if isinstance(module_update_callback, Callable):
|
config_result = config_update_callback(self.config_monitor)
|
||||||
module_update_callback(self.module_monitor)
|
if isinstance(config_result, Awaitable):
|
||||||
elif isinstance(module_update_callback, Awaitable):
|
logger.debug('Awaiting config result')
|
||||||
await module_update_callback(self.module_monitor)
|
await config_result
|
||||||
|
|
||||||
await asyncio.sleep(self.poll_sleep_time)
|
await asyncio.sleep(self.poll_sleep_time)
|
||||||
|
|
||||||
|
|
||||||
|
def load_yaml(file: Path):
|
||||||
|
return yaml.safe_load(file)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AppLoader:
|
||||||
|
dir: str
|
||||||
|
acm: AppConfigMonitor = field(init=False)
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self.dir = Path(self.dir)
|
||||||
|
self.acm = AppConfigMonitor(self.dir)
|
||||||
|
|
||||||
|
async def monitor(self):
|
||||||
|
await self.acm.monitor(
|
||||||
|
module_update_callback=self.load_reload_modules,
|
||||||
|
config_update_callback=self.load_reload_configs,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def load_reload_modules(self, dm: DirectoryMonitor):
|
||||||
|
if dm.new or dm.changed:
|
||||||
|
modules_to_load = dm.new + dm.changed
|
||||||
|
dep_str = '\n'.join(f' {m}' for m in modules_to_load)
|
||||||
|
logger.debug('Resolving module dependencies for \n%s', dep_str)
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
logger.debug('(Re)loading modules and packages from the inside out')
|
||||||
|
|
||||||
|
async def load_reload_configs(self, dm: DirectoryMonitor):
|
||||||
|
if dm.new or dm.changed:
|
||||||
|
configs_to_load = dm.new + dm.changed
|
||||||
|
app_str = '\n'.join(f' {m}' for m in configs_to_load)
|
||||||
|
logger.debug('Resolving app config dependencies for \n%s', app_str)
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
logger.debug('Reloading app configs in dependency order')
|
||||||
|
|
||||||
|
app_configs = {
|
||||||
|
app_name: cfg async for app_name, cfg in self.gen_valid_app_configs(configs_to_load)
|
||||||
|
}
|
||||||
|
logger.debug(app_configs)
|
||||||
|
|
||||||
|
async def gen_raw_app_configs(self, configs_to_load: Iterable[Path]):
|
||||||
|
for config_file in configs_to_load:
|
||||||
|
try:
|
||||||
|
with Path(config_file).open('r') as f:
|
||||||
|
yield yaml.safe_load(f)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(e)
|
||||||
|
logger.error(f'Error reading from {config_file}')
|
||||||
|
|
||||||
|
async def gen_valid_app_configs(self, configs_to_load: Iterable[Path]):
|
||||||
|
async for raw_full_cfg in self.gen_raw_app_configs(configs_to_load):
|
||||||
|
for app_name, raw_cfg in raw_full_cfg.items():
|
||||||
|
try:
|
||||||
|
valid_cfg = AppConfig.model_validate(raw_cfg)
|
||||||
|
except ValidationError as e:
|
||||||
|
logger.error(f'Error validating "{app_name}"')
|
||||||
|
logger.error(e)
|
||||||
|
else:
|
||||||
|
yield app_name, valid_cfg
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
import logging.config
|
import logging.config
|
||||||
|
|
||||||
@@ -155,7 +227,7 @@ if __name__ == '__main__':
|
|||||||
# except KeyboardInterrupt:
|
# except KeyboardInterrupt:
|
||||||
# logger.error('KeyboardInterrupt')
|
# logger.error('KeyboardInterrupt')
|
||||||
|
|
||||||
acm = AppConfigMonitor('../conf/apps')
|
acm = AppLoader('../conf/apps')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with console.status(f'Monitoring {acm.dir}'):
|
with console.status(f'Monitoring {acm.dir}'):
|
||||||
|
|||||||
Reference in New Issue
Block a user