Logger Backends API
Backend implementations for different output destinations.
Backend Interface
BackendInterface
BackendInterface
Bases: ABC
Abstract interface for logging backends following Interface Segregation.
Source code in src/jinpy_utils/logger/backends.py
| class BackendInterface(ABC):
"""
Abstract interface for logging backends following Interface Segregation.
"""
name: str
@abstractmethod
async def write_async(self, entry: LogEntry) -> None:
"""Write log entry asynchronously."""
pass
@abstractmethod
def write_sync(self, entry: LogEntry) -> None:
"""Write log entry synchronously."""
pass
@abstractmethod
async def flush(self) -> None:
"""Flush pending entries."""
pass
@abstractmethod
async def close(self) -> None:
"""Close backend and cleanup resources."""
pass
@abstractmethod
def is_healthy(self) -> bool:
"""Check if backend is healthy."""
pass
@abstractmethod
def get_stats(self) -> dict[str, Any]:
"""Get backend statistics."""
pass
|
BaseBackend
BaseBackend
Bases: BackendInterface
Base backend implementation with common functionality.
Source code in src/jinpy_utils/logger/backends.py
| class BaseBackend(BackendInterface):
"""Base backend implementation with common functionality."""
def __init__(self, config: BackendConfig):
"""Initialize base backend."""
self.config = config
self.name = config.name
self._closed = False
self._healthy = True
self._stats: dict[str, Any] = {
"messages_written": 0,
"messages_failed": 0,
"bytes_written": 0,
"last_write": None,
"last_error": None,
}
self._buffer: list[LogEntry] = []
self._buffer_lock = asyncio.Lock()
self._flush_task: asyncio.Task | None = None
# Start periodic flush
self._start_flush_timer()
def _start_flush_timer(self) -> None:
"""Start periodic flush timer.
This implementation avoids calling potentially mocked async methods
(which could create un-awaited coroutines in tests) by verifying that
the returned loop is a real ``AbstractEventLoop`` instance, and only
then consulting its ``is_closed()`` method.
"""
if self.config.flush_interval > 0:
try:
loop = asyncio.get_running_loop()
if isinstance(loop, asyncio.AbstractEventLoop) and not loop.is_closed():
self._flush_task = asyncio.create_task(self._periodic_flush())
else:
self._flush_task = None
except RuntimeError:
# No running event loop - this is fine for sync-only usage
self._flush_task = None
async def _periodic_flush(self) -> None:
"""Periodic flush coroutine."""
while not self._closed:
try:
await asyncio.sleep(self.config.flush_interval)
await self.flush()
except asyncio.CancelledError:
break
except Exception as e:
self._handle_error(e)
def _handle_error(self, error: Exception) -> None:
"""Handle backend errors."""
self._stats["messages_failed"] = self._stats["messages_failed"] + 1
self._stats["last_error"] = str(error)
self._healthy = False
# Log to stderr to avoid recursion
print(f"Backend {self.name} error: {error}", file=sys.stderr)
def _should_write(self, entry: LogEntry) -> bool:
"""Check if entry should be written based on level."""
return entry.level >= self.config.level
def _format_entry(self, entry: LogEntry) -> str:
"""Format log entry based on configuration."""
if self.config.format == LogFormat.JSON:
return entry.to_json()
elif self.config.format == LogFormat.PLAIN:
return f"{entry.timestamp.isoformat()} [{entry.level.value.upper()}] {entry.logger_name}: {entry.message}"
elif self.config.format == LogFormat.STRUCTURED:
context_str = (
json.dumps(entry.context, separators=(",", ":"))
if entry.context
else "{}"
)
return f"{entry.timestamp.isoformat()} [{entry.level.value.upper()}] {entry.logger_name}: {entry.message} {context_str}"
else:
return entry.to_json() # Default to JSON
async def _add_to_buffer(self, entry: LogEntry) -> None:
"""Add entry to buffer."""
async with self._buffer_lock:
self._buffer.append(entry)
if len(self._buffer) >= self.config.buffer_size:
await self._flush_buffer()
async def _flush_buffer(self) -> None:
"""Flush buffer contents."""
if not self._buffer:
return
try:
await self._write_batch(self._buffer.copy())
self._buffer.clear()
self._healthy = True
except Exception as e:
self._handle_error(e)
async def flush(self) -> None:
"""Flush pending entries."""
async with self._buffer_lock:
await self._flush_buffer()
def is_healthy(self) -> bool:
"""Check if backend is healthy."""
return self._healthy and not self._closed
def get_stats(self) -> dict[str, Any]:
"""Get backend statistics."""
return self._stats.copy()
async def close(self) -> None:
"""Close backend and cleanup resources."""
self._closed = True
if self._flush_task:
self._flush_task.cancel()
from contextlib import suppress as _suppress # noqa: PLC0415
with _suppress(asyncio.CancelledError):
await self._flush_task
await self.flush()
async def write_async(self, entry: LogEntry) -> None:
"""Write log entry asynchronously."""
if self._should_write(entry):
await self._add_to_buffer(entry)
# ================================================= #
# ================ Not Implemented ================ #
# ================================================= #
def write_sync(self, entry: LogEntry) -> None:
"""Write log entry synchronously."""
# Default implementation that subclasses should override
raise NotImplementedError(
f"write_sync not implemented for {self.__class__.__name__}. "
"Subclasses must implement this method."
)
async def _write_batch(self, entries: list[LogEntry]) -> None:
"""Write batch of entries (default implementation)."""
# Default implementation that subclasses should override
raise NotImplementedError(
f"_write_batch not implemented for {self.__class__.__name__}. "
"Subclasses must implement this method."
)
|
Functions
__init__
Initialize base backend.
Source code in src/jinpy_utils/logger/backends.py
| def __init__(self, config: BackendConfig):
"""Initialize base backend."""
self.config = config
self.name = config.name
self._closed = False
self._healthy = True
self._stats: dict[str, Any] = {
"messages_written": 0,
"messages_failed": 0,
"bytes_written": 0,
"last_write": None,
"last_error": None,
}
self._buffer: list[LogEntry] = []
self._buffer_lock = asyncio.Lock()
self._flush_task: asyncio.Task | None = None
# Start periodic flush
self._start_flush_timer()
|
Console Backend
ConsoleBackend
ConsoleBackend
Bases: BaseBackend
High-performance console backend with color support.
Source code in src/jinpy_utils/logger/backends.py
| class ConsoleBackend(BaseBackend):
"""High-performance console backend with color support."""
def __init__(self, config: ConsoleBackendConfig):
super().__init__(config)
self.config: ConsoleBackendConfig = config
self.stream = sys.stdout if config.stream == "stdout" else sys.stderr
# Color codes for different log levels
self._colors = {
LogLevel.TRACE: "\033[90m", # Dark gray
LogLevel.DEBUG: "\033[36m", # Cyan
LogLevel.INFO: "\033[32m", # Green
LogLevel.WARNING: "\033[33m", # Yellow
LogLevel.ERROR: "\033[31m", # Red
LogLevel.CRITICAL: "\033[35m", # Magenta
}
self._reset = "\033[0m"
def _format_console_entry(self, entry: LogEntry) -> str:
"""Format entry for console with colors."""
if self.config.format == LogFormat.CONSOLE:
color = self._colors.get(entry.level, "") if self.config.colors else ""
level_str = (
f"{color}[{entry.level.value.upper()}]{self._reset}"
if self.config.colors
else f"[{entry.level.value.upper()}]"
)
formatted = f"{entry.timestamp.strftime('%H:%M:%S.%f')[:-3]} {level_str} {entry.logger_name}: {entry.message}"
if entry.context:
context_str = json.dumps(
entry.context, default=str, separators=(",", ":")
)
formatted += f" {context_str}"
return formatted
else:
return self._format_entry(entry)
async def write_async(self, entry: LogEntry) -> None:
"""Write entry asynchronously."""
if self._should_write(entry):
await self._add_to_buffer(entry)
def write_sync(self, entry: LogEntry) -> None:
"""Write entry synchronously."""
if self._should_write(entry):
try:
formatted = self._format_console_entry(entry)
self.stream.write(formatted + "\n")
self.stream.flush()
self._stats["messages_written"] = self._stats["messages_written"] + 1
self._stats["bytes_written"] = self._stats["bytes_written"] + len(
formatted.encode()
)
self._stats["last_write"] = get_current_datetime()
except Exception as e:
self._handle_error(e)
async def _write_batch(self, entries: list[LogEntry]) -> None:
"""Write batch of entries to console."""
try:
output_lines = []
total_bytes = 0
for entry in entries:
formatted = self._format_console_entry(entry)
output_lines.append(formatted)
total_bytes += len(formatted.encode())
output = "\n".join(output_lines) + "\n"
self.stream.write(output)
self.stream.flush()
self._stats["messages_written"] = self._stats["messages_written"] + len(
entries
)
self._stats["bytes_written"] = self._stats["bytes_written"] + total_bytes
self._stats["last_write"] = get_current_datetime()
except Exception as e:
raise JPYLoggerBackendError(
message=f"Console write failed: {e}",
backend_type=self.config.backend_type.value,
backend_config={"stream": self.config.stream},
) from e
|
Functions
__init__
Source code in src/jinpy_utils/logger/backends.py
| def __init__(self, config: ConsoleBackendConfig):
super().__init__(config)
self.config: ConsoleBackendConfig = config
self.stream = sys.stdout if config.stream == "stdout" else sys.stderr
# Color codes for different log levels
self._colors = {
LogLevel.TRACE: "\033[90m", # Dark gray
LogLevel.DEBUG: "\033[36m", # Cyan
LogLevel.INFO: "\033[32m", # Green
LogLevel.WARNING: "\033[33m", # Yellow
LogLevel.ERROR: "\033[31m", # Red
LogLevel.CRITICAL: "\033[35m", # Magenta
}
self._reset = "\033[0m"
|
File Backend
FileBackend
FileBackend
Bases: BaseBackend
High-performance async file backend with rotation.
Source code in src/jinpy_utils/logger/backends.py
| class FileBackend(BaseBackend):
"""High-performance async file backend with rotation."""
def __init__(self, config: FileBackendConfig):
super().__init__(config)
self.config: FileBackendConfig = config
self._file_lock = asyncio.Lock()
# Ensure log directory exists
self.config.file_path.parent.mkdir(parents=True, exist_ok=True)
def _get_compression_value(self) -> str:
"""Get compression value handling both enum and string types."""
if hasattr(self.config.compression, "value"):
return self.config.compression.value
return str(self.config.compression)
async def _rotate_if_needed(self) -> None:
"""Rotate log file if size limit is reached."""
if not self.config.max_size_mb:
return
try:
if self.config.file_path.exists():
size_mb = self.config.file_path.stat().st_size / (1024 * 1024)
if size_mb >= self.config.max_size_mb:
await self._rotate_file()
except Exception as e:
self._handle_error(e)
async def _rotate_file(self) -> None:
"""Rotate log file."""
try:
# Move existing backup files (use filename.ext.N scheme)
for i in range(self.config.backup_count - 1, 0, -1):
old_file = Path(str(self.config.file_path) + f".{i}")
new_file = Path(str(self.config.file_path) + f".{i + 1}")
if old_file.exists():
old_file.rename(new_file)
# Move current file to .1 (filename.ext.1)
if self.config.file_path.exists():
backup_file = Path(str(self.config.file_path) + ".1")
self.config.file_path.rename(backup_file)
# Compress if configured
compression_value = self._get_compression_value()
if compression_value != "none":
await self._compress_file(backup_file)
except Exception as e:
raise JPYLoggerBackendError(
message=f"File rotation failed: {e}",
backend_type=self.config.backend_type.value,
backend_config={"file_path": str(self.config.file_path)},
) from e
async def _compress_file(self, file_path: Path) -> None:
"""Compress log file based on configuration."""
# Implementation would depend on compression type
# For now, just a placeholder
pass
async def write_async(self, entry: LogEntry) -> None:
"""Write entry asynchronously."""
if self._should_write(entry):
await self._add_to_buffer(entry)
def write_sync(self, entry: LogEntry) -> None:
"""Write entry synchronously."""
if self._should_write(entry):
try:
formatted = self._format_entry(entry)
with open(
self.config.file_path, "a", encoding=self.config.encoding
) as f:
f.write(formatted + "\n")
f.flush()
self._stats["messages_written"] = self._stats["messages_written"] + 1
self._stats["bytes_written"] = self._stats["bytes_written"] + len(
formatted.encode()
)
self._stats["last_write"] = get_current_datetime()
except Exception as e:
self._handle_error(e)
async def _write_batch(self, entries: list[LogEntry]) -> None:
"""Write batch of entries to file."""
async with self._file_lock:
try:
await self._rotate_if_needed()
lines = []
total_bytes = 0
for entry in entries:
formatted = self._format_entry(entry)
lines.append(formatted)
total_bytes += len(formatted.encode())
content = "\n".join(lines) + "\n"
async with aiofiles.open(
self.config.file_path, "a", encoding=self.config.encoding
) as f:
await f.write(content)
await f.flush()
self._stats["messages_written"] += len(entries)
self._stats["bytes_written"] += total_bytes
self._stats["last_write"] = get_current_datetime()
except Exception as e:
raise JPYLoggerBackendError(
message=f"File write failed: {e}",
backend_type=self.config.backend_type.value,
backend_config={"file_path": str(self.config.file_path)},
) from e
|
Functions
__init__
Source code in src/jinpy_utils/logger/backends.py
| def __init__(self, config: FileBackendConfig):
super().__init__(config)
self.config: FileBackendConfig = config
self._file_lock = asyncio.Lock()
# Ensure log directory exists
self.config.file_path.parent.mkdir(parents=True, exist_ok=True)
|
_rotate_file
async
Rotate log file.
Source code in src/jinpy_utils/logger/backends.py
| async def _rotate_file(self) -> None:
"""Rotate log file."""
try:
# Move existing backup files (use filename.ext.N scheme)
for i in range(self.config.backup_count - 1, 0, -1):
old_file = Path(str(self.config.file_path) + f".{i}")
new_file = Path(str(self.config.file_path) + f".{i + 1}")
if old_file.exists():
old_file.rename(new_file)
# Move current file to .1 (filename.ext.1)
if self.config.file_path.exists():
backup_file = Path(str(self.config.file_path) + ".1")
self.config.file_path.rename(backup_file)
# Compress if configured
compression_value = self._get_compression_value()
if compression_value != "none":
await self._compress_file(backup_file)
except Exception as e:
raise JPYLoggerBackendError(
message=f"File rotation failed: {e}",
backend_type=self.config.backend_type.value,
backend_config={"file_path": str(self.config.file_path)},
) from e
|
REST API Backend
RestApiBackend
RestApiBackend
Bases: BaseBackend
REST API backend with advanced retry and security.
Source code in src/jinpy_utils/logger/backends.py
| class RestApiBackend(BaseBackend):
"""REST API backend with advanced retry and security."""
def __init__(self, config: RestApiBackendConfig):
super().__init__(config)
self.config: RestApiBackendConfig = config
self._session: aiohttp.ClientSession | None = None
self._connection_state = ConnectionState.DISCONNECTED
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create HTTP session with security settings."""
if self._session is None or self._session.closed:
headers = self.config.headers.copy()
# Add authentication headers
if self.config.security.api_key:
headers["Authorization"] = f"Bearer {self.config.security.api_key}"
elif self.config.security.oauth2_token:
headers["Authorization"] = f"Bearer {self.config.security.oauth2_token}"
headers["Content-Type"] = "application/json"
headers["User-Agent"] = f"jinpy-utils-logger/{self.name}"
# SSL configuration
connector = None
if not self.config.security.verify_ssl:
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
if connector:
self._session = aiohttp.ClientSession(
headers=headers,
timeout=timeout,
connector=connector,
)
else:
self._session = aiohttp.ClientSession(
headers=headers,
timeout=timeout,
)
return self._session
async def _send_batch(self, entries: list[LogEntry]) -> None:
"""Send batch of entries to API."""
try:
self._connection_state = ConnectionState.CONNECTING
session = await self._get_session()
url = urljoin(self.config.base_url, self.config.endpoint)
payload = {
"logs": [entry.to_dict() for entry in entries],
"timestamp": get_current_datetime().isoformat(),
"source": self.name,
"count": len(entries),
}
# Make the HTTP request. ``aiohttp.ClientSession.request`` might
# return an awaitable context manager (normal) or, when mocked,
# could return the context manager directly. Handle both.
request_ctx_any = session.request(self.config.method, url, json=payload)
request_ctx_cm = (
await request_ctx_any
if inspect.isawaitable(request_ctx_any)
else request_ctx_any
)
async with request_ctx_cm as response:
if response.status >= 400: # noqa: PLR2004
error_text = await response.text()
raise JPYLoggerConnectionError(
message=f"API request failed: {response.status}",
endpoint=url,
connection_type="http",
context={
"status": response.status,
"response": error_text,
"method": self.config.method,
},
)
self._connection_state = ConnectionState.CONNECTED
self._stats["messages_written"] = self._stats["messages_written"] + len(
entries
)
self._stats["last_write"] = get_current_datetime()
except JPYLoggerConnectionError:
# Preserve connection-specific errors for tests to assert on
self._connection_state = ConnectionState.FAILED
raise
except aiohttp.ClientError as e:
self._connection_state = ConnectionState.FAILED
raise JPYLoggerConnectionError(
message=f"HTTP client error: {e}",
endpoint=self.config.base_url,
connection_type="http",
) from e
except Exception as e:
self._connection_state = ConnectionState.FAILED
raise JPYLoggerBackendError(
message=f"REST API backend error: {e}",
backend_type=self.config.backend_type.value,
backend_config={"base_url": self.config.base_url},
) from e
async def write_async(self, entry: LogEntry) -> None:
"""Write entry asynchronously."""
if self._should_write(entry):
await self._add_to_buffer(entry)
def write_sync(self, entry: LogEntry) -> None:
"""Write entry synchronously (queues for async processing)."""
if self._should_write(entry):
# Store reference for RUF006 compliance (intentionally unused)
# Assign to a module-level sink to satisfy RUF006
_BACKGROUND_TASK_SINK.append(asyncio.create_task(self.write_async(entry)))
async def _write_batch(self, entries: list[LogEntry]) -> None:
"""Write batch of entries to REST API."""
await self._send_batch(entries)
async def close(self) -> None:
"""Close REST API backend."""
await super().close()
if self._session and not self._session.closed:
await self._session.close()
self._connection_state = ConnectionState.CLOSED
|
Functions
__init__
Source code in src/jinpy_utils/logger/backends.py
| def __init__(self, config: RestApiBackendConfig):
super().__init__(config)
self.config: RestApiBackendConfig = config
self._session: aiohttp.ClientSession | None = None
self._connection_state = ConnectionState.DISCONNECTED
|
_send_batch
async
_send_batch(entries: list[LogEntry]) -> None
Send batch of entries to API.
Source code in src/jinpy_utils/logger/backends.py
| async def _send_batch(self, entries: list[LogEntry]) -> None:
"""Send batch of entries to API."""
try:
self._connection_state = ConnectionState.CONNECTING
session = await self._get_session()
url = urljoin(self.config.base_url, self.config.endpoint)
payload = {
"logs": [entry.to_dict() for entry in entries],
"timestamp": get_current_datetime().isoformat(),
"source": self.name,
"count": len(entries),
}
# Make the HTTP request. ``aiohttp.ClientSession.request`` might
# return an awaitable context manager (normal) or, when mocked,
# could return the context manager directly. Handle both.
request_ctx_any = session.request(self.config.method, url, json=payload)
request_ctx_cm = (
await request_ctx_any
if inspect.isawaitable(request_ctx_any)
else request_ctx_any
)
async with request_ctx_cm as response:
if response.status >= 400: # noqa: PLR2004
error_text = await response.text()
raise JPYLoggerConnectionError(
message=f"API request failed: {response.status}",
endpoint=url,
connection_type="http",
context={
"status": response.status,
"response": error_text,
"method": self.config.method,
},
)
self._connection_state = ConnectionState.CONNECTED
self._stats["messages_written"] = self._stats["messages_written"] + len(
entries
)
self._stats["last_write"] = get_current_datetime()
except JPYLoggerConnectionError:
# Preserve connection-specific errors for tests to assert on
self._connection_state = ConnectionState.FAILED
raise
except aiohttp.ClientError as e:
self._connection_state = ConnectionState.FAILED
raise JPYLoggerConnectionError(
message=f"HTTP client error: {e}",
endpoint=self.config.base_url,
connection_type="http",
) from e
except Exception as e:
self._connection_state = ConnectionState.FAILED
raise JPYLoggerBackendError(
message=f"REST API backend error: {e}",
backend_type=self.config.backend_type.value,
backend_config={"base_url": self.config.base_url},
) from e
|
WebSocket Backend
WebSocketBackend
WebSocketBackend
Bases: BaseBackend
WebSocket backend for real-time log streaming.
Source code in src/jinpy_utils/logger/backends.py
| class WebSocketBackend(BaseBackend):
"""WebSocket backend for real-time log streaming."""
def __init__(self, config: WebSocketBackendConfig):
super().__init__(config)
self.config: WebSocketBackendConfig = config
self._websocket: websockets.ClientConnection | None = None
self._connection_state = ConnectionState.DISCONNECTED
self._reconnect_task: asyncio.Task | None = None
self._ping_task: asyncio.Task | None = None
# Start connection management
self._start_connection_management()
def _start_connection_management(self) -> None:
"""Start connection management tasks."""
try:
# Only start if we have an event loop
asyncio.get_running_loop()
self._reconnect_task = asyncio.create_task(self._connection_manager())
except RuntimeError:
# No event loop, will be started later
self._reconnect_task = None
async def _connection_manager(self) -> None:
"""Manage WebSocket connection with auto-reconnect."""
while not self._closed:
try:
if self._connection_state in [
ConnectionState.DISCONNECTED,
ConnectionState.FAILED,
]:
await self._connect()
await asyncio.sleep(self.config.reconnect_interval)
except asyncio.CancelledError:
break
except Exception as e:
self._handle_error(e)
await asyncio.sleep(self.config.reconnect_interval)
async def _connect(self) -> None:
"""Connect to WebSocket server."""
try:
self._connection_state = ConnectionState.CONNECTING
# Build connection kwargs
kwargs: dict[str, Any] = {
"max_size": self.config.max_message_size,
"ping_interval": self.config.ping_interval,
"ping_timeout": self.config.timeout,
}
# Add SSL context if needed
if (
self.config.ws_url.startswith("wss://")
and not self.config.security.verify_ssl
):
kwargs["ssl"] = ssl._create_unverified_context()
# Add authentication headers
extra_headers: dict[str, str] = {}
if self.config.security.api_key:
extra_headers["Authorization"] = (
f"Bearer {self.config.security.api_key}"
)
if extra_headers:
kwargs["extra_headers"] = extra_headers
# Connect to WebSocket. The library normally returns an awaitable,
# but tests may stub it to return a websocket-like object directly.
ws_connect_result = websockets.connect(
self.config.ws_url,
**kwargs,
)
if inspect.isawaitable(ws_connect_result):
self._websocket = await ws_connect_result
else:
# When tests stub websockets.connect to return a connection-like object
self._websocket = cast("Any", ws_connect_result)
self._connection_state = ConnectionState.CONNECTED
# Start ping task
if self._ping_task:
self._ping_task.cancel()
self._ping_task = asyncio.create_task(self._ping_loop())
except Exception as e:
self._connection_state = ConnectionState.FAILED
raise JPYLoggerWebSocketError(
message=f"WebSocket connection failed: {e}",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
) from e
async def _ping_loop(self) -> None:
"""Send periodic ping messages."""
while not self._closed and self._websocket:
try:
await asyncio.sleep(self.config.ping_interval)
if (
self._websocket
and self._connection_state == ConnectionState.CONNECTED
and not self._closed
):
await self._websocket.ping()
except (ConnectionClosed, WebSocketException):
# Update connection state when connection is closed
self._connection_state = ConnectionState.DISCONNECTED
self._websocket = None
break
except asyncio.CancelledError:
break
except Exception as e:
self._handle_error(e)
break
async def _send_message(self, message: str) -> None:
"""Send message over WebSocket."""
if not self._websocket or self._connection_state != ConnectionState.CONNECTED:
raise JPYLoggerWebSocketError(
message="WebSocket not connected",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
)
try:
await self._websocket.send(message)
except (ConnectionClosed, WebSocketException) as e:
self._connection_state = ConnectionState.DISCONNECTED
raise JPYLoggerWebSocketError(
message=f"WebSocket send failed: {e}",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
) from e
async def write_async(self, entry: LogEntry) -> None:
"""Write entry asynchronously."""
if self._should_write(entry):
await self._add_to_buffer(entry)
def write_sync(self, entry: LogEntry) -> None:
"""Write entry synchronously (queues for async processing)."""
if self._should_write(entry):
# Store reference for RUF006 compliance (intentionally unused)
# Assign to a module-level sink to satisfy RUF006
_BACKGROUND_TASK_SINK.append(asyncio.create_task(self.write_async(entry)))
async def _write_batch(self, entries: list[LogEntry]) -> None:
"""Write batch of entries over WebSocket."""
if self._connection_state != ConnectionState.CONNECTED:
raise JPYLoggerWebSocketError(
message="WebSocket not connected for batch write",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
)
try:
# Send as batch message
batch_message = {
"type": "log_batch",
"timestamp": get_current_datetime().isoformat(),
"source": self.name,
"count": len(entries),
"logs": [entry.to_dict() for entry in entries],
}
message = json.dumps(
batch_message,
default=str,
separators=(",", ":"),
)
await self._send_message(message)
self._stats["messages_written"] += len(entries)
self._stats["bytes_written"] += len(message.encode())
self._stats["last_write"] = get_current_datetime()
except Exception as e:
raise JPYLoggerWebSocketError(
message=f"WebSocket batch write failed: {e}",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
) from e
def is_healthy(self) -> bool:
"""Check if WebSocket backend is healthy."""
return bool(
super().is_healthy()
and self._connection_state == ConnectionState.CONNECTED
and self._websocket
)
async def close(self) -> None:
"""Close WebSocket backend."""
await super().close()
if self._ping_task:
from typing import Any as _Any # noqa: PLC0415
cancel_result_ping: _Any = self._ping_task.cancel()
# Await cancel() if it returned an awaitable (e.g., AsyncMock)
import inspect as _inspect # noqa: PLC0415 # local import to avoid top-level cost
from contextlib import suppress as _suppress # noqa: PLC0415
if _inspect.isawaitable(cancel_result_ping): # mypy: treat as runtime guard
with _suppress(Exception):
await cancel_result_ping
if isinstance(self._ping_task, asyncio.Task | asyncio.Future):
with _suppress(asyncio.CancelledError):
await self._ping_task
if self._reconnect_task:
from typing import Any as _Any # noqa: PLC0415
cancel_result_reconnect: _Any = self._reconnect_task.cancel()
import inspect as _inspect # noqa: PLC0415 # local import to avoid top-level cost
from contextlib import suppress as _suppress # noqa: PLC0415
if _inspect.isawaitable(
cancel_result_reconnect
): # mypy: treat as runtime guard
with _suppress(Exception):
await cancel_result_reconnect
if isinstance(self._reconnect_task, asyncio.Task | asyncio.Future):
with _suppress(asyncio.CancelledError):
await self._reconnect_task
if self._websocket and self._connection_state != ConnectionState.CLOSED:
await self._websocket.close()
self._connection_state = ConnectionState.CLOSED
|
Functions
__init__
Source code in src/jinpy_utils/logger/backends.py
| def __init__(self, config: WebSocketBackendConfig):
super().__init__(config)
self.config: WebSocketBackendConfig = config
self._websocket: websockets.ClientConnection | None = None
self._connection_state = ConnectionState.DISCONNECTED
self._reconnect_task: asyncio.Task | None = None
self._ping_task: asyncio.Task | None = None
# Start connection management
self._start_connection_management()
|
_connect
async
Connect to WebSocket server.
Source code in src/jinpy_utils/logger/backends.py
| async def _connect(self) -> None:
"""Connect to WebSocket server."""
try:
self._connection_state = ConnectionState.CONNECTING
# Build connection kwargs
kwargs: dict[str, Any] = {
"max_size": self.config.max_message_size,
"ping_interval": self.config.ping_interval,
"ping_timeout": self.config.timeout,
}
# Add SSL context if needed
if (
self.config.ws_url.startswith("wss://")
and not self.config.security.verify_ssl
):
kwargs["ssl"] = ssl._create_unverified_context()
# Add authentication headers
extra_headers: dict[str, str] = {}
if self.config.security.api_key:
extra_headers["Authorization"] = (
f"Bearer {self.config.security.api_key}"
)
if extra_headers:
kwargs["extra_headers"] = extra_headers
# Connect to WebSocket. The library normally returns an awaitable,
# but tests may stub it to return a websocket-like object directly.
ws_connect_result = websockets.connect(
self.config.ws_url,
**kwargs,
)
if inspect.isawaitable(ws_connect_result):
self._websocket = await ws_connect_result
else:
# When tests stub websockets.connect to return a connection-like object
self._websocket = cast("Any", ws_connect_result)
self._connection_state = ConnectionState.CONNECTED
# Start ping task
if self._ping_task:
self._ping_task.cancel()
self._ping_task = asyncio.create_task(self._ping_loop())
except Exception as e:
self._connection_state = ConnectionState.FAILED
raise JPYLoggerWebSocketError(
message=f"WebSocket connection failed: {e}",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
) from e
|
_send_message
async
_send_message(message: str) -> None
Send message over WebSocket.
Source code in src/jinpy_utils/logger/backends.py
| async def _send_message(self, message: str) -> None:
"""Send message over WebSocket."""
if not self._websocket or self._connection_state != ConnectionState.CONNECTED:
raise JPYLoggerWebSocketError(
message="WebSocket not connected",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
)
try:
await self._websocket.send(message)
except (ConnectionClosed, WebSocketException) as e:
self._connection_state = ConnectionState.DISCONNECTED
raise JPYLoggerWebSocketError(
message=f"WebSocket send failed: {e}",
ws_endpoint=self.config.ws_url,
ws_state=self._connection_state.value,
) from e
|
Backend Factory
BackendFactory
BackendFactory
Factory for creating backend instances.
Source code in src/jinpy_utils/logger/backends.py
| class BackendFactory:
"""Factory for creating backend instances."""
_backend_classes: ClassVar[dict[str, type["BackendInterface"]]] = {
"console": ConsoleBackend,
"file": FileBackend,
"rest_api": RestApiBackend,
"websocket": WebSocketBackend,
}
@classmethod
def create_backend(cls, config: BackendConfig) -> BackendInterface:
"""Create backend instance from configuration."""
# Handle both enum and string values
backend_type = (
config.backend_type.value
if hasattr(config.backend_type, "value")
else config.backend_type
)
backend_class = cls._backend_classes.get(backend_type)
if not backend_class:
raise JPYLoggerConfigurationError(
message=f"Unsupported backend type: {backend_type}",
config_section="backend_type",
config_value=str(backend_type),
)
return backend_class(config) # type: ignore
@classmethod
def register_backend(cls, backend_type: str, backend_class: Any) -> None:
"""Register new backend type."""
cls._backend_classes[backend_type] = backend_class
@classmethod
def get_supported_backends(cls) -> list[str]:
"""Get list of supported backend types."""
return list(cls._backend_classes.keys())
|
Functions
create_backend
classmethod
Create backend instance from configuration.
Source code in src/jinpy_utils/logger/backends.py
| @classmethod
def create_backend(cls, config: BackendConfig) -> BackendInterface:
"""Create backend instance from configuration."""
# Handle both enum and string values
backend_type = (
config.backend_type.value
if hasattr(config.backend_type, "value")
else config.backend_type
)
backend_class = cls._backend_classes.get(backend_type)
if not backend_class:
raise JPYLoggerConfigurationError(
message=f"Unsupported backend type: {backend_type}",
config_section="backend_type",
config_value=str(backend_type),
)
return backend_class(config) # type: ignore
|
Examples
Console Backend Usage
from jinpy_utils.logger import get_logger, LoggerConfig, ConsoleBackendConfig, LogLevel
config = LoggerConfig(
backends=[
ConsoleBackendConfig(
level=LogLevel.DEBUG,
use_colors=True,
show_source=True,
timestamp_format="%H:%M:%S.%f"
)
]
)
logger = get_logger("console_app", config)
logger.info("This will be colored and formatted nicely")
File Backend with Rotation
from jinpy_utils.logger import get_logger, LoggerConfig, FileBackendConfig, LogLevel
config = LoggerConfig(
backends=[
FileBackendConfig(
level=LogLevel.INFO,
file_path="logs/app-{date}.log",
max_size_mb=100,
backup_count=10,
compression=True,
encoding="utf-8"
)
]
)
logger = get_logger("file_app", config)
logger.info("This will be written to a rotating log file")
REST API Backend
from jinpy_utils.logger import get_logger, LoggerConfig, RestApiBackendConfig, LogLevel
import os
config = LoggerConfig(
backends=[
RestApiBackendConfig(
level=LogLevel.ERROR,
endpoint="https://logs.example.com/api/v1/logs",
headers={
"Authorization": f"Bearer {os.environ['LOG_API_TOKEN']}",
"Content-Type": "application/json"
},
timeout_seconds=30,
retry_count=3,
batch_size=50,
batch_timeout=10.0
)
]
)
logger = get_logger("api_app", config)
logger.error("This error will be sent to the REST API")
WebSocket Backend
from jinpy_utils.logger import get_logger, LoggerConfig, WebSocketBackendConfig, LogLevel
config = LoggerConfig(
backends=[
WebSocketBackendConfig(
level=LogLevel.INFO,
endpoint="wss://logs.example.com/ws",
headers={
"Authorization": "Bearer YOUR_TOKEN"
},
reconnect_interval=5.0,
max_reconnect_attempts=10
)
]
)
logger = get_logger("websocket_app", config)
logger.info("This will be sent via WebSocket")
Multiple Backends
from jinpy_utils.logger import (
get_logger, LoggerConfig,
ConsoleBackendConfig, FileBackendConfig, RestApiBackendConfig,
LogLevel
)
config = LoggerConfig(
backends=[
# Console for development
ConsoleBackendConfig(
level=LogLevel.DEBUG,
use_colors=True
),
# File for persistence
FileBackendConfig(
level=LogLevel.INFO,
file_path="logs/app.log",
max_size_mb=50,
backup_count=5
),
# REST API for centralized logging
RestApiBackendConfig(
level=LogLevel.ERROR,
endpoint="https://logs.company.com/api/logs",
batch_size=10
)
]
)
logger = get_logger("multi_backend_app", config)
# DEBUG: only to console
logger.debug("Debug information")
# INFO: to console and file
logger.info("Application started")
# ERROR: to all backends
logger.error("Critical error occurred")
Custom Backend
from jinpy_utils.logger import BaseBackend, LogEntry, BackendFactory
from typing import Any
class CustomBackend(BaseBackend):
"""Custom backend implementation."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.custom_config = kwargs
def handle(self, entry: LogEntry) -> None:
"""Handle a log entry."""
if self.should_handle(entry):
formatted = self.format_entry(entry)
# Your custom logic here
self._send_to_custom_destination(formatted)
def _send_to_custom_destination(self, message: str) -> None:
"""Send message to custom destination."""
# Implement your custom sending logic
pass
def shutdown(self) -> None:
"""Clean up resources."""
# Implement cleanup logic
pass
# Register the custom backend
BackendFactory.register_backend_type("custom", CustomBackend)
# Use the custom backend
from jinpy_utils.logger import get_logger, LoggerConfig
config = LoggerConfig(
backends=[
{
"type": "custom",
"level": "INFO",
# Custom configuration parameters
"custom_param": "value"
}
]
)
logger = get_logger("custom_app", config)
Async Backend Operations
import asyncio
from jinpy_utils.logger import get_logger, create_production_config
async def async_logging_example():
"""Example of async logging operations."""
config = create_production_config()
logger = get_logger("async_app", config)
# These operations are non-blocking when async is enabled
logger.info("Starting async operation")
await asyncio.sleep(0.1)
logger.info("Async operation completed")
# Ensure all logs are flushed before exit
await logger.shutdown()
asyncio.run(async_logging_example())
Backend Configuration
from jinpy_utils.logger import LoggerConfig, FileBackendConfig
# High-performance file backend
high_perf_config = LoggerConfig(
backends=[
FileBackendConfig(
file_path="logs/high-perf.log",
buffer_size=65536, # 64KB buffer
flush_interval=5.0, # Flush every 5 seconds
async_enabled=True, # Enable async I/O
compression=False # Disable compression for speed
)
]
)
Error Handling
from jinpy_utils.logger import LoggerConfig, RestApiBackendConfig
# Robust REST API backend
robust_config = LoggerConfig(
backends=[
RestApiBackendConfig(
endpoint="https://logs.example.com/api/logs",
timeout_seconds=30,
retry_count=5,
retry_backoff_factor=2.0, # Exponential backoff
max_queue_size=10000, # Large queue for resilience
fallback_to_file=True, # Fallback to file on failure
fallback_file="logs/api_fallback.log"
)
]
)
Security Configuration
from jinpy_utils.logger import LoggerConfig, RestApiBackendConfig
# Secure API backend
secure_config = LoggerConfig(
backends=[
RestApiBackendConfig(
endpoint="https://logs.example.com/api/logs",
headers={
"Authorization": "Bearer YOUR_TOKEN",
"X-API-Key": "YOUR_API_KEY"
},
verify_ssl=True, # Verify SSL certificates
client_cert="client.pem", # Client certificate
client_key="client.key", # Client private key
encrypt_payload=True, # Encrypt log payloads
encryption_key="your_key" # Encryption key
)
]
)
Backend Lifecycle
Initialization
from jinpy_utils.logger import BackendFactory, ConsoleBackendConfig
# Backends are created by the factory
backend_config = ConsoleBackendConfig(level="INFO")
backend = BackendFactory.create_backend(backend_config)
Resource Management
from jinpy_utils.logger import get_logger, create_production_config
# Proper resource cleanup
config = create_production_config()
logger = get_logger("app", config)
try:
# Use logger
logger.info("Application running")
finally:
# Ensure proper shutdown
logger.shutdown()
Health Monitoring
from jinpy_utils.logger import get_logger
logger = get_logger("app")
# Check backend health
for backend in logger.get_backends():
if hasattr(backend, 'is_healthy'):
health_status = backend.is_healthy()
logger.info("Backend health",
backend_type=type(backend).__name__,
healthy=health_status)