implemented async queue
This commit is contained in:
@@ -48,6 +48,8 @@ class RCHighlighter(RegexHighlighter):
|
|||||||
def load_rich_config(
|
def load_rich_config(
|
||||||
room: str = None, component: str = None, level: str = 'INFO'
|
room: str = None, component: str = None, level: str = 'INFO'
|
||||||
) -> logging.LoggerAdapter:
|
) -> logging.LoggerAdapter:
|
||||||
|
"""Loads the config from the .rich_logging.yaml file, adds some bits, and returns a LoggerAdapter
|
||||||
|
"""
|
||||||
logger_name = f'Appdaemon.{room}'
|
logger_name = f'Appdaemon.{room}'
|
||||||
|
|
||||||
if component is not None:
|
if component is not None:
|
||||||
|
|||||||
@@ -4,6 +4,35 @@ from logging import Handler, LogRecord
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncQueueHandler(Handler):
|
||||||
|
loop: asyncio.BaseEventLoop
|
||||||
|
queue: asyncio.Queue
|
||||||
|
|
||||||
|
def __init__(self, loop: asyncio.BaseEventLoop, queue: asyncio.Queue):
|
||||||
|
super().__init__()
|
||||||
|
self.loop = loop
|
||||||
|
self.queue = queue
|
||||||
|
|
||||||
|
def emit(self, record: LogRecord):
|
||||||
|
self.loop.create_task(self.send_to_loki(record))
|
||||||
|
|
||||||
|
async def send_to_loki(self, record: LogRecord):
|
||||||
|
message = self.format(record)
|
||||||
|
ns = round(record.created * 1_000_000_000)
|
||||||
|
|
||||||
|
# https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs
|
||||||
|
payload = {
|
||||||
|
'streams': [
|
||||||
|
{
|
||||||
|
'stream': {'app': 'loki'},
|
||||||
|
'values': [[str(ns), message]],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
await self.queue.put(payload)
|
||||||
|
|
||||||
|
|
||||||
class LokiHandler(Handler):
|
class LokiHandler(Handler):
|
||||||
loop: asyncio.BaseEventLoop
|
loop: asyncio.BaseEventLoop
|
||||||
loki_url: str
|
loki_url: str
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from typing import Annotated, Dict, List, Optional, Self
|
|||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from astral import SunDirection
|
from astral import SunDirection
|
||||||
from pydantic import BaseModel, BeforeValidator, Field, root_validator
|
from pydantic import BaseModel, BeforeValidator, Field, model_validator
|
||||||
from pydantic_core import PydanticCustomError
|
from pydantic_core import PydanticCustomError
|
||||||
from rich.console import Console, ConsoleOptions, RenderResult
|
from rich.console import Console, ConsoleOptions, RenderResult
|
||||||
from rich.table import Column, Table
|
from rich.table import Column, Table
|
||||||
@@ -51,7 +51,7 @@ class ControllerStateConfig(BaseModel):
|
|||||||
off_duration: Optional[OffDuration] = None
|
off_duration: Optional[OffDuration] = None
|
||||||
scene: dict[str, State] | str
|
scene: dict[str, State] | str
|
||||||
|
|
||||||
@root_validator(pre=True)
|
@model_validator(mode='before')
|
||||||
def check_args(cls, values):
|
def check_args(cls, values):
|
||||||
time, elevation = values.get('time'), values.get('elevation')
|
time, elevation = values.get('time'), values.get('elevation')
|
||||||
if time is not None and elevation is not None:
|
if time is not None and elevation is not None:
|
||||||
|
|||||||
Reference in New Issue
Block a user