Skip to content

Logger Core API

Core logging classes and factory functions with synchronous and asynchronous APIs, context binding, correlation IDs, and multiple pluggable backends.

Factory Functions

get_logger

get_logger

get_logger(
    name: str, config: LoggerConfig | None = None, force_new: bool = False
) -> Logger

Get a :class:Logger instance.

Convenience wrapper around :meth:Logger.get_logger for callers that prefer module-level functions.

Parameters:

Name Type Description Default
name str

Name of the logger instance.

required
config LoggerConfig | None

Optional per-logger configuration.

None
force_new bool

When True, always create a new instance even if singleton mode is enabled.

False

Returns:

Name Type Description
Logger Logger

The logger instance.

Source code in src/jinpy_utils/logger/core.py
def get_logger(
    name: str, config: LoggerConfig | None = None, force_new: bool = False
) -> Logger:
    """Get a :class:`Logger` instance.

    Convenience wrapper around :meth:`Logger.get_logger` for callers that
    prefer module-level functions.

    Args:
        name: Name of the logger instance.
        config: Optional per-logger configuration.
        force_new: When ``True``, always create a new instance even if
            singleton mode is enabled.

    Returns:
        Logger: The logger instance.
    """
    return Logger.get_logger(name, config, force_new)

set_global_config

set_global_config

set_global_config(config: GlobalLoggerConfig) -> None

Set the global logger configuration.

This forwards to :meth:Logger.set_global_config for convenience.

Parameters:

Name Type Description Default
config GlobalLoggerConfig

The global configuration to apply.

required
Source code in src/jinpy_utils/logger/core.py
def set_global_config(config: GlobalLoggerConfig) -> None:
    """Set the global logger configuration.

    This forwards to :meth:`Logger.set_global_config` for convenience.

    Args:
        config: The global configuration to apply.
    """
    Logger.set_global_config(config)

configure_from_env

configure_from_env

configure_from_env(env_prefix: str = 'LOGGER_') -> None

Configure the global logger from environment variables.

Parameters:

Name Type Description Default
env_prefix str

Prefix used to read environment variables for configuration (defaults to "LOGGER_").

'LOGGER_'
Source code in src/jinpy_utils/logger/core.py
def configure_from_env(env_prefix: str = "LOGGER_") -> None:
    """Configure the global logger from environment variables.

    Args:
        env_prefix: Prefix used to read environment variables for
            configuration (defaults to ``"LOGGER_"``).
    """
    config = GlobalLoggerConfig.from_env(env_prefix)
    set_global_config(config)

shutdown_all_loggers

shutdown_all_loggers

shutdown_all_loggers() -> None

Shutdown all logger instances via :class:LoggerManager.

Source code in src/jinpy_utils/logger/core.py
def shutdown_all_loggers() -> None:
    """Shutdown all logger instances via :class:`LoggerManager`."""
    LoggerManager().shutdown_all()

Core Classes

Logger

Logger

High-performance structured logger with async and sync APIs.

The :class:Logger encapsulates configuration, backends, and operational policy for structured logging. It supports context binding, correlation IDs, asynchronous processing (when supported by the configured backends), and exposes convenience helpers for both sync and async application flows.

Parameters:

Name Type Description Default
name str

Logical name of the logger instance, typically the module or component name.

required
config LoggerConfig

Per-instance configuration model.

required
global_config GlobalLoggerConfig

Global configuration applied to all loggers via :class:LoggerManager.

required

Attributes:

Name Type Description
name str

The logger name.

config LoggerConfig

The logger-specific configuration.

global_config GlobalLoggerConfig

The active global configuration.

_context dict[str, Any]

Baseline structured context bound to the logger.

_correlation_id str | None

The current correlation identifier.

_backends list[BackendInterface]

Active backend instances.

Examples:

Basic usage

>>> from jinpy_utils.logger import Logger, LoggerManager
>>> from jinpy_utils.logger.config import GlobalLoggerConfig
>>> global_config = GlobalLoggerConfig.from_env()
>>> LoggerManager().set_global_config(global_config)
>>> logger = Logger.get_logger("example")
>>> logger.info("Started", {"version": "1.0.0"})

Context binding

>>> with logger.context(user_id=123):
...     logger.error("Something happened")
Source code in src/jinpy_utils/logger/core.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
class Logger:
    """High-performance structured logger with async and sync APIs.

    The :class:`Logger` encapsulates configuration, backends, and operational
    policy for structured logging. It supports context binding, correlation IDs,
    asynchronous processing (when supported by the configured backends), and
    exposes convenience helpers for both sync and async application flows.

    Args:
        name: Logical name of the logger instance, typically the module or
            component name.
        config: Per-instance configuration model.
        global_config: Global configuration applied to all loggers via
            :class:`LoggerManager`.

    Attributes:
        name (str): The logger name.
        config (LoggerConfig): The logger-specific configuration.
        global_config (GlobalLoggerConfig): The active global configuration.
        _context (dict[str, Any]): Baseline structured context bound to the
            logger.
        _correlation_id (str | None): The current correlation identifier.
        _backends (list[BackendInterface]): Active backend instances.

    Examples:
        Basic usage
        -----------
        >>> from jinpy_utils.logger import Logger, LoggerManager
        >>> from jinpy_utils.logger.config import GlobalLoggerConfig
        >>> global_config = GlobalLoggerConfig.from_env()
        >>> LoggerManager().set_global_config(global_config)
        >>> logger = Logger.get_logger("example")
        >>> logger.info("Started", {"version": "1.0.0"})

        Context binding
        ---------------
        >>> with logger.context(user_id=123):
        ...     logger.error("Something happened")
    """

    def __init__(
        self,
        name: str,
        config: LoggerConfig,
        global_config: GlobalLoggerConfig,
    ):
        """
        Initialize logger instance.

        Args:
            name: Logger name
            config: Logger-specific configuration
            global_config: Global logger configuration
        """
        self.name = name
        self.config = config
        self.global_config = global_config

        # State management
        self._closed = False
        self._context: dict[str, Any] = config.context.copy()
        self._correlation_id = config.correlation_id

        # Performance tracking
        self._stats: dict[str, Any] = {
            "messages_logged": 0,
            "messages_dropped": 0,
            "bytes_processed": 0,
            "start_time": get_current_datetime(),
            "last_log_time": None,
        }

        # Initialize backends
        self._backends: list[BackendInterface] = []
        self._initialize_backends()

        # Async processing
        self._async_queue: asyncio.Queue | None = None
        self._processor_task: asyncio.Task | None = None
        self._setup_async_processing()

        # Security and sanitization
        self._sensitive_fields = set(global_config.sensitive_fields)

    def _initialize_backends(self) -> None:
        """Initialize backends based on configuration."""
        try:
            # Determine which backends to use
            if self.config.backends:
                # Use specific backends
                backend_configs = [
                    backend
                    for backend in self.global_config.backends
                    if backend.name in self.config.backends and backend.enabled
                ]
            else:
                # Use all enabled global backends
                backend_configs = [
                    backend
                    for backend in self.global_config.backends
                    if backend.enabled
                ]

            if not backend_configs:
                raise JPYLoggerConfigurationError(
                    message="No enabled backends found",
                    config_section="backends",
                )

            # Create backend instances
            for backend_config in backend_configs:
                try:
                    backend = BackendFactory.create_backend(backend_config)
                    self._backends.append(backend)
                except Exception as e:
                    raise JPYLoggerConfigurationError(
                        message=f"Failed to initialize backend {backend_config.name}: {e}",
                        config_section="backends",
                        config_value=backend_config.name,
                    ) from e

        except Exception as e:
            raise JPYLoggerError(
                message=f"Backend initialization failed: {e}",
                logger_name=self.name,
                operation="initialize_backends",
            ) from e

    def _setup_async_processing(self) -> None:
        """Setup async processing queue and background task if supported.

        When at least one backend reports async capability, a bounded queue is
        created and a background task is started if a running event loop is
        present. Otherwise, the task is created later on first async usage.
        """
        if any(hasattr(backend, "_async_capable") for backend in self._backends):
            self._async_queue = asyncio.Queue(
                maxsize=self.global_config.async_queue_size
            )
            try:
                asyncio.get_running_loop()
                self._processor_task = asyncio.create_task(
                    self._process_async_queue(),
                )
            except RuntimeError:
                # No running loop; task will be created lazily by caller if needed
                self._processor_task = None

    async def _process_async_queue(self) -> None:
        """Process async logging queue."""
        while not self._closed:
            try:
                # Get entry with timeout to allow periodic cleanup
                if self._async_queue is None:
                    raise ValueError("self._async_queue can not be None")

                entry = await asyncio.wait_for(
                    self._async_queue.get(),
                    timeout=1.0,
                )

                await self._write_to_backends_async(entry)
                self._async_queue.task_done()

            except TimeoutError:
                continue
            except Exception as e:
                self._handle_processing_error(e)

    def _handle_processing_error(self, error: Exception) -> None:
        """Handle async processing errors.

        Errors are counted and emitted to stderr to avoid recursive logging.
        """
        self._stats["messages_dropped"] = int(self._stats["messages_dropped"]) + 1

        # Log to stderr to avoid recursion
        print(f"Logger {self.name} processing error: {error}", file=sys.stderr)

    def _get_effective_level(self) -> LogLevel:
        """Return the effective log level considering global defaults."""
        return self.config.level or self.global_config.default_level

    def _should_log(self, level: LogLevel) -> bool:
        """Return whether a message at ``level`` should be emitted."""
        return level >= self._get_effective_level()

    def _sanitize_context(self, context: dict[str, Any]) -> dict[str, Any]:
        """Sanitize sensitive fields from the provided context.

        Sanitization behavior is controlled by the global configuration and
        will redact values for keys matching any configured sensitive token.

        Args:
            context: Arbitrary structured context to sanitize.

        Returns:
            dict[str, Any]: A sanitized copy of ``context``.
        """
        if not self.global_config.enable_sanitization:
            return context

        sanitized = {}
        for key, value in context.items():
            if any(sensitive in key.lower() for sensitive in self._sensitive_fields):
                sanitized[key] = "[REDACTED]"
            else:
                sanitized[key] = value

        return sanitized

    def _create_log_entry(
        self,
        level: LogLevel,
        message: str,
        context: dict[str, Any] | None = None,
    ) -> LogEntry:
        """Create a structured :class:`LogEntry` with context and metadata.

        Merges bound context with provided context, applies sanitization, and
        enriches the entry with source information and a correlation ID.

        Args:
            level: Log level for the entry.
            message: Human-readable message for the entry.
            context: Optional structured context specific to this call.

        Returns:
            LogEntry: A fully-populated structured log entry.
        """
        # Merge contexts: instance -> method parameter
        merged_context = self._context.copy()
        if context:
            # Check context size limit
            context_str = str(context)
            if len(context_str) > self.global_config.max_context_size:
                context = {
                    "_context_truncated": True,
                    "_original_size": len(context_str),
                    "_truncated_at": get_current_datetime().isoformat(),
                }

            merged_context.update(context)

        # Sanitize context
        if merged_context:
            merged_context = self._sanitize_context(merged_context)

        # Generate correlation ID if needed
        correlation_id = self._correlation_id
        if self.global_config.enable_correlation_ids and not correlation_id:
            correlation_id = str(uuid.uuid4())

        # Get caller information
        frame = inspect.currentframe()
        caller_frame = frame.f_back.f_back if frame and frame.f_back else None

        module_name = None
        function_name = None
        line_number = None

        if caller_frame:
            module_name = caller_frame.f_globals.get("__name__")
            function_name = caller_frame.f_code.co_name
            line_number = caller_frame.f_lineno

        return LogEntry(
            timestamp=get_current_datetime(),
            level=level,
            message=message,
            logger_name=self.name,
            correlation_id=correlation_id,
            context=merged_context,
            module=module_name,
            function=function_name,
            line_number=line_number,
        )

    async def _write_to_backends_async(self, entry: LogEntry) -> None:
        """Write entry to all backends asynchronously."""
        tasks = []
        for backend in self._backends:
            if backend.is_healthy():
                tasks.append(self._safe_backend_write_async(backend, entry))

        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

    async def _safe_backend_write_async(
        self, backend: BackendInterface, entry: LogEntry
    ) -> None:
        """Safely write to backend with error handling."""
        try:
            await backend.write_async(entry)
        except Exception as e:
            self._stats["messages_dropped"] = int(self._stats["messages_dropped"]) + 1
            # Log backend error without recursion
            print(f"Backend {backend.name} error: {e}", file=sys.stderr)

    def _write_to_backends_sync(self, entry: LogEntry) -> None:
        """Write the entry to all configured backends synchronously."""
        for backend in self._backends:
            try:
                if backend.is_healthy():
                    backend.write_sync(entry)
            except Exception as e:
                self._stats["messages_dropped"] = (
                    int(self._stats["messages_dropped"]) + 1
                )
                # Log backend error without recursion
                print(f"Backend {backend.name} error: {e}", file=sys.stderr)

    def _update_stats(self, entry: LogEntry) -> None:
        """Update internal counters and performance metrics for ``entry``."""
        self._stats["messages_logged"] = int(self._stats["messages_logged"]) + 1
        self._stats["bytes_processed"] = int(self._stats["bytes_processed"]) + len(
            str(entry.to_dict())
        )
        self._stats["last_log_time"] = get_current_datetime()

        # Performance monitoring
        if self.global_config.enable_performance_metrics:
            current_time = get_current_datetime()
            if self._stats["last_log_time"] is not None:
                time_diff = (
                    current_time - self._stats["last_log_time"]
                ).total_seconds()
                if time_diff > 1.0:  # Alert if logging takes too long
                    raise JPYLoggerPerformanceError(
                        message="Logging performance degraded",
                        logger_name=self.name,
                        performance_metric="log_latency",
                        threshold_value=1.0,
                        actual_value=time_diff,
                    )

    # Synchronous logging methods
    def log(
        self,
        level: LogLevel,
        message: str,
        context: dict[str, Any] | None = None,
    ) -> None:
        """Log a message at the specified level synchronously.

        Args:
            level: The severity level for the message.
            message: The log message content.
            context: Optional structured context merged into the bound
                context for this call. Large contexts are truncated in a
                controlled manner.

        Raises:
            JPYLoggerError: If logging fails at runtime.
        """
        if not self._should_log(level) or self._closed:
            return

        try:
            entry = self._create_log_entry(level, message, context)
            self._write_to_backends_sync(entry)
            self._update_stats(entry)
        except Exception as e:
            raise JPYLoggerError(
                message=f"Synchronous logging failed: {e}",
                logger_name=self.name,
                operation="log",
            ) from e

    def trace(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log a TRACE-level message synchronously.

        Args:
            message: The log message content.
            context: Optional structured context for this call.
        """
        self.log(LogLevel.TRACE, message, context)

    def debug(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log a DEBUG-level message synchronously.

        Args:
            message: The log message content.
            context: Optional structured context for this call.
        """
        self.log(LogLevel.DEBUG, message, context)

    def info(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log an INFO-level message synchronously.

        Args:
            message: The log message content.
            context: Optional structured context for this call.
        """
        self.log(LogLevel.INFO, message, context)

    def warning(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log a WARNING-level message synchronously.

        Args:
            message: The log message content.
            context: Optional structured context for this call.
        """
        self.log(LogLevel.WARNING, message, context)

    def error(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log an ERROR-level message synchronously.

        Args:
            message: The log message content.
            context: Optional structured context for this call.
        """
        self.log(LogLevel.ERROR, message, context)

    def critical(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log a CRITICAL-level message synchronously.

        Args:
            message: The log message content.
            context: Optional structured context for this call.
        """
        self.log(LogLevel.CRITICAL, message, context)

    # Asynchronous logging methods
    async def alog(
        self, level: LogLevel, message: str, context: dict[str, Any] | None = None
    ) -> None:
        """Log a message at the specified level asynchronously.

        Args:
            level: The severity level for the message.
            message: The log message content.
            context: Optional structured context merged into the bound
                context for this call.

        Raises:
            JPYLoggerError: If logging fails at runtime.
        """
        if not self._should_log(level) or self._closed:
            return

        try:
            entry = self._create_log_entry(level, message, context)

            # Use async queue if available and not full
            if self._async_queue and not self._async_queue.full():
                await self._async_queue.put(entry)
            else:
                # Fallback to direct write
                await self._write_to_backends_async(entry)

            self._update_stats(entry)
        except Exception as e:
            raise JPYLoggerError(
                message=f"Asynchronous logging failed: {e}",
                logger_name=self.name,
                operation="alog",
            ) from e

    async def atrace(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log a TRACE-level message asynchronously."""
        await self.alog(LogLevel.TRACE, message, context)

    async def adebug(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log a DEBUG-level message asynchronously."""
        await self.alog(LogLevel.DEBUG, message, context)

    async def ainfo(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log an INFO-level message asynchronously."""
        await self.alog(LogLevel.INFO, message, context)

    async def awarning(
        self, message: str, context: dict[str, Any] | None = None
    ) -> None:
        """Log a WARNING-level message asynchronously."""
        await self.alog(LogLevel.WARNING, message, context)

    async def aerror(self, message: str, context: dict[str, Any] | None = None) -> None:
        """Log an ERROR-level message asynchronously."""
        await self.alog(LogLevel.ERROR, message, context)

    async def acritical(
        self, message: str, context: dict[str, Any] | None = None
    ) -> None:
        """Log a CRITICAL-level message asynchronously."""
        await self.alog(LogLevel.CRITICAL, message, context)

    # Context management
    @contextmanager
    def context(self, **kwargs: Any) -> Generator[None, None, None]:
        """Temporarily bind structured context for nested log calls.

        All keyword arguments provided are merged into the logger's bound
        context for the duration of the ``with`` block.

        Args:
            **kwargs: Key-value pairs to add to the bound context.

        Examples:
            >>> with logger.context(user_id=123, request_id="abcd"):
            ...     logger.info("processing")
        """
        old_context = self._context.copy()
        self._context.update(kwargs)
        try:
            yield
        finally:
            self._context = old_context

    @asynccontextmanager
    async def acontext(self, **kwargs: Any) -> AsyncGenerator[None, None]:
        """Temporarily bind structured context in asynchronous flows.

        Args:
            **kwargs: Key-value pairs to add to the bound context.

        Examples:
            >>> async with logger.acontext(user_id=123):
            ...     await logger.ainfo("processing")
        """
        old_context = self._context.copy()
        self._context.update(kwargs)
        try:
            yield
        finally:
            self._context = old_context

    def bind(self, **kwargs: Any) -> "Logger":
        """Return a child logger with additional bound context.

        Args:
            **kwargs: Context key-value pairs to bind permanently to the child
                logger's baseline context.

        Returns:
            Logger: A new logger instance inheriting configuration and the
            current context plus the provided key-value pairs.
        """
        child_config = LoggerConfig(
            name=f"{self.name}.child",
            level=self.config.level,
            backends=self.config.backends,
            context={**self._context, **kwargs},
            correlation_id=self._correlation_id,
        )

        return Logger(child_config.name, child_config, self.global_config)

    def set_correlation_id(self, correlation_id: str) -> None:
        """Set a correlation identifier for this logger instance.

        Correlation IDs help tie together related log entries across
        components, requests, or processes.
        """
        self._correlation_id = correlation_id

    def get_correlation_id(self) -> str | None:
        """Return the current correlation identifier, if any."""
        return self._correlation_id

    def set_level(self, level: LogLevel) -> None:
        """Set the log level for this logger instance."""
        self.config.level = level

    def get_level(self) -> LogLevel:
        """Return the current effective log level for this instance."""
        return self._get_effective_level()

    def is_enabled_for(self, level: LogLevel) -> bool:
        """Return whether logging is enabled for the given ``level``."""
        return level >= self._get_effective_level()

    # Performance and maintenance
    async def flush(self) -> None:
        """Flush all pending log entries across async and backend buffers.

        When async processing is enabled, this waits for the internal queue to
        drain and then awaits any backend ``flush`` implementations.
        """
        if self._async_queue:
            await self._async_queue.join()

        # Flush all backends
        flush_tasks = []
        for backend in self._backends:
            if hasattr(backend, "flush"):
                flush_tasks.append(backend.flush())

        if flush_tasks:
            await asyncio.gather(*flush_tasks, return_exceptions=True)

    def get_stats(self) -> dict[str, Any]:
        """Return a snapshot of logger statistics.

        Returns:
            dict[str, Any]: Statistics including counters like
            ``messages_logged``, ``messages_dropped``, ``bytes_processed``, and
            derived metrics such as ``uptime_seconds`` and backend health
            information.
        """
        stats = self._stats.copy()
        stats["backend_count"] = len(self._backends)
        stats["healthy_backends"] = sum(1 for b in self._backends if b.is_healthy())
        start_time = self._stats["start_time"]
        stats["uptime_seconds"] = (get_current_datetime() - start_time).total_seconds()
        return stats

    def get_backend_stats(self) -> dict[str, dict[str, Any]]:
        """Return statistics for all configured backends keyed by name."""
        return {
            backend.name: backend.get_stats()
            for backend in self._backends
            if hasattr(backend, "get_stats")
        }

    def _update_global_config(self, config: GlobalLoggerConfig) -> None:
        """Update the logger instance when the global configuration changes."""
        self.global_config = config
        self._sensitive_fields = set(config.sensitive_fields)

    async def close(self) -> None:
        """Close the logger and cleanup resources.

        Cancels internal background tasks (if any), flushes all buffers, and
        closes each backend. Safe to call multiple times.
        """
        if self._closed:
            return

        self._closed = True

        # Cancel async processor
        if self._processor_task:
            self._processor_task.cancel()
            with suppress(asyncio.CancelledError):
                await self._processor_task

        # Flush and close all backends
        await self.flush()
        close_tasks = []
        for backend in self._backends:
            close_tasks.append(backend.close())

        if close_tasks:
            await asyncio.gather(*close_tasks, return_exceptions=True)

    # Class methods for convenience
    @classmethod
    def get_logger(
        cls, name: str, config: LoggerConfig | None = None, force_new: bool = False
    ) -> "Logger":
        """Get a logger instance via the global :class:`LoggerManager`.

        This is a convenience wrapper around
        :meth:`LoggerManager.get_logger`.

        Args:
            name: Name of the logger instance.
            config: Optional per-logger configuration.
            force_new: When ``True``, always create a new instance even if
                singleton mode is enabled.

        Returns:
            Logger: The logger instance.
        """
        return LoggerManager().get_logger(name, config, force_new)

    @classmethod
    def set_global_config(cls, config: GlobalLoggerConfig) -> None:
        """Set the global configuration via the :class:`LoggerManager`."""
        LoggerManager().set_global_config(config)

    @classmethod
    def from_env(cls, name: str, env_prefix: str = "LOGGER_") -> "Logger":
        """Create a :class:`Logger` from environment variables.

        Args:
            name: Name of the logger instance.
            env_prefix: Prefix used to read environment variables for
                configuration.

        Returns:
            Logger: The configured logger instance.
        """
        global_config = GlobalLoggerConfig.from_env(env_prefix)
        cls.set_global_config(global_config)
        return cls.get_logger(name)

    def __del__(self) -> None:
        """Avoid async work in finalizer.

        Resource cleanup should be explicit via ``close()`` or
        ``LoggerManager.shutdown_all()``. Doing asynchronous work in
        ``__del__`` is unreliable and may emit warnings depending on
        interpreter shutdown timing and event loop state.
        """
        return None

Functions

__init__

__init__(name: str, config: LoggerConfig, global_config: GlobalLoggerConfig)

Initialize logger instance.

Parameters:

Name Type Description Default
name str

Logger name

required
config LoggerConfig

Logger-specific configuration

required
global_config GlobalLoggerConfig

Global logger configuration

required
Source code in src/jinpy_utils/logger/core.py
def __init__(
    self,
    name: str,
    config: LoggerConfig,
    global_config: GlobalLoggerConfig,
):
    """
    Initialize logger instance.

    Args:
        name: Logger name
        config: Logger-specific configuration
        global_config: Global logger configuration
    """
    self.name = name
    self.config = config
    self.global_config = global_config

    # State management
    self._closed = False
    self._context: dict[str, Any] = config.context.copy()
    self._correlation_id = config.correlation_id

    # Performance tracking
    self._stats: dict[str, Any] = {
        "messages_logged": 0,
        "messages_dropped": 0,
        "bytes_processed": 0,
        "start_time": get_current_datetime(),
        "last_log_time": None,
    }

    # Initialize backends
    self._backends: list[BackendInterface] = []
    self._initialize_backends()

    # Async processing
    self._async_queue: asyncio.Queue | None = None
    self._processor_task: asyncio.Task | None = None
    self._setup_async_processing()

    # Security and sanitization
    self._sensitive_fields = set(global_config.sensitive_fields)

debug

debug(message: str, context: dict[str, Any] | None = None) -> None

Log a DEBUG-level message synchronously.

Parameters:

Name Type Description Default
message str

The log message content.

required
context dict[str, Any] | None

Optional structured context for this call.

None
Source code in src/jinpy_utils/logger/core.py
def debug(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log a DEBUG-level message synchronously.

    Args:
        message: The log message content.
        context: Optional structured context for this call.
    """
    self.log(LogLevel.DEBUG, message, context)

info

info(message: str, context: dict[str, Any] | None = None) -> None

Log an INFO-level message synchronously.

Parameters:

Name Type Description Default
message str

The log message content.

required
context dict[str, Any] | None

Optional structured context for this call.

None
Source code in src/jinpy_utils/logger/core.py
def info(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log an INFO-level message synchronously.

    Args:
        message: The log message content.
        context: Optional structured context for this call.
    """
    self.log(LogLevel.INFO, message, context)

warning

warning(message: str, context: dict[str, Any] | None = None) -> None

Log a WARNING-level message synchronously.

Parameters:

Name Type Description Default
message str

The log message content.

required
context dict[str, Any] | None

Optional structured context for this call.

None
Source code in src/jinpy_utils/logger/core.py
def warning(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log a WARNING-level message synchronously.

    Args:
        message: The log message content.
        context: Optional structured context for this call.
    """
    self.log(LogLevel.WARNING, message, context)

error

error(message: str, context: dict[str, Any] | None = None) -> None

Log an ERROR-level message synchronously.

Parameters:

Name Type Description Default
message str

The log message content.

required
context dict[str, Any] | None

Optional structured context for this call.

None
Source code in src/jinpy_utils/logger/core.py
def error(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log an ERROR-level message synchronously.

    Args:
        message: The log message content.
        context: Optional structured context for this call.
    """
    self.log(LogLevel.ERROR, message, context)

critical

critical(message: str, context: dict[str, Any] | None = None) -> None

Log a CRITICAL-level message synchronously.

Parameters:

Name Type Description Default
message str

The log message content.

required
context dict[str, Any] | None

Optional structured context for this call.

None
Source code in src/jinpy_utils/logger/core.py
def critical(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log a CRITICAL-level message synchronously.

    Args:
        message: The log message content.
        context: Optional structured context for this call.
    """
    self.log(LogLevel.CRITICAL, message, context)

log

log(
    level: LogLevel, message: str, context: dict[str, Any] | None = None
) -> None

Log a message at the specified level synchronously.

Parameters:

Name Type Description Default
level LogLevel

The severity level for the message.

required
message str

The log message content.

required
context dict[str, Any] | None

Optional structured context merged into the bound context for this call. Large contexts are truncated in a controlled manner.

None

Raises:

Type Description
JPYLoggerError

If logging fails at runtime.

Source code in src/jinpy_utils/logger/core.py
def log(
    self,
    level: LogLevel,
    message: str,
    context: dict[str, Any] | None = None,
) -> None:
    """Log a message at the specified level synchronously.

    Args:
        level: The severity level for the message.
        message: The log message content.
        context: Optional structured context merged into the bound
            context for this call. Large contexts are truncated in a
            controlled manner.

    Raises:
        JPYLoggerError: If logging fails at runtime.
    """
    if not self._should_log(level) or self._closed:
        return

    try:
        entry = self._create_log_entry(level, message, context)
        self._write_to_backends_sync(entry)
        self._update_stats(entry)
    except Exception as e:
        raise JPYLoggerError(
            message=f"Synchronous logging failed: {e}",
            logger_name=self.name,
            operation="log",
        ) from e

adebug async

adebug(message: str, context: dict[str, Any] | None = None) -> None

Log a DEBUG-level message asynchronously.

Source code in src/jinpy_utils/logger/core.py
async def adebug(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log a DEBUG-level message asynchronously."""
    await self.alog(LogLevel.DEBUG, message, context)

ainfo async

ainfo(message: str, context: dict[str, Any] | None = None) -> None

Log an INFO-level message asynchronously.

Source code in src/jinpy_utils/logger/core.py
async def ainfo(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log an INFO-level message asynchronously."""
    await self.alog(LogLevel.INFO, message, context)

awarning async

awarning(message: str, context: dict[str, Any] | None = None) -> None

Log a WARNING-level message asynchronously.

Source code in src/jinpy_utils/logger/core.py
async def awarning(
    self, message: str, context: dict[str, Any] | None = None
) -> None:
    """Log a WARNING-level message asynchronously."""
    await self.alog(LogLevel.WARNING, message, context)

aerror async

aerror(message: str, context: dict[str, Any] | None = None) -> None

Log an ERROR-level message asynchronously.

Source code in src/jinpy_utils/logger/core.py
async def aerror(self, message: str, context: dict[str, Any] | None = None) -> None:
    """Log an ERROR-level message asynchronously."""
    await self.alog(LogLevel.ERROR, message, context)

acritical async

acritical(message: str, context: dict[str, Any] | None = None) -> None

Log a CRITICAL-level message asynchronously.

Source code in src/jinpy_utils/logger/core.py
async def acritical(
    self, message: str, context: dict[str, Any] | None = None
) -> None:
    """Log a CRITICAL-level message asynchronously."""
    await self.alog(LogLevel.CRITICAL, message, context)

set_level

set_level(level: LogLevel) -> None

Set the log level for this logger instance.

Source code in src/jinpy_utils/logger/core.py
def set_level(self, level: LogLevel) -> None:
    """Set the log level for this logger instance."""
    self.config.level = level

get_level

get_level() -> LogLevel

Return the current effective log level for this instance.

Source code in src/jinpy_utils/logger/core.py
def get_level(self) -> LogLevel:
    """Return the current effective log level for this instance."""
    return self._get_effective_level()

is_enabled_for

is_enabled_for(level: LogLevel) -> bool

Return whether logging is enabled for the given level.

Source code in src/jinpy_utils/logger/core.py
def is_enabled_for(self, level: LogLevel) -> bool:
    """Return whether logging is enabled for the given ``level``."""
    return level >= self._get_effective_level()

get_stats

get_stats() -> dict[str, Any]

Return a snapshot of logger statistics.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Statistics including counters like

dict[str, Any]

messages_logged, messages_dropped, bytes_processed, and

dict[str, Any]

derived metrics such as uptime_seconds and backend health

dict[str, Any]

information.

Source code in src/jinpy_utils/logger/core.py
def get_stats(self) -> dict[str, Any]:
    """Return a snapshot of logger statistics.

    Returns:
        dict[str, Any]: Statistics including counters like
        ``messages_logged``, ``messages_dropped``, ``bytes_processed``, and
        derived metrics such as ``uptime_seconds`` and backend health
        information.
    """
    stats = self._stats.copy()
    stats["backend_count"] = len(self._backends)
    stats["healthy_backends"] = sum(1 for b in self._backends if b.is_healthy())
    start_time = self._stats["start_time"]
    stats["uptime_seconds"] = (get_current_datetime() - start_time).total_seconds()
    return stats

get_backend_stats

get_backend_stats() -> dict[str, dict[str, Any]]

Return statistics for all configured backends keyed by name.

Source code in src/jinpy_utils/logger/core.py
def get_backend_stats(self) -> dict[str, dict[str, Any]]:
    """Return statistics for all configured backends keyed by name."""
    return {
        backend.name: backend.get_stats()
        for backend in self._backends
        if hasattr(backend, "get_stats")
    }

bind

bind(**kwargs: Any) -> Logger

Return a child logger with additional bound context.

Parameters:

Name Type Description Default
**kwargs Any

Context key-value pairs to bind permanently to the child logger's baseline context.

{}

Returns:

Name Type Description
Logger Logger

A new logger instance inheriting configuration and the

Logger

current context plus the provided key-value pairs.

Source code in src/jinpy_utils/logger/core.py
def bind(self, **kwargs: Any) -> "Logger":
    """Return a child logger with additional bound context.

    Args:
        **kwargs: Context key-value pairs to bind permanently to the child
            logger's baseline context.

    Returns:
        Logger: A new logger instance inheriting configuration and the
        current context plus the provided key-value pairs.
    """
    child_config = LoggerConfig(
        name=f"{self.name}.child",
        level=self.config.level,
        backends=self.config.backends,
        context={**self._context, **kwargs},
        correlation_id=self._correlation_id,
    )

    return Logger(child_config.name, child_config, self.global_config)

context

context(**kwargs: Any) -> Generator[None, None, None]

Temporarily bind structured context for nested log calls.

All keyword arguments provided are merged into the logger's bound context for the duration of the with block.

Parameters:

Name Type Description Default
**kwargs Any

Key-value pairs to add to the bound context.

{}

Examples:

>>> with logger.context(user_id=123, request_id="abcd"):
...     logger.info("processing")
Source code in src/jinpy_utils/logger/core.py
@contextmanager
def context(self, **kwargs: Any) -> Generator[None, None, None]:
    """Temporarily bind structured context for nested log calls.

    All keyword arguments provided are merged into the logger's bound
    context for the duration of the ``with`` block.

    Args:
        **kwargs: Key-value pairs to add to the bound context.

    Examples:
        >>> with logger.context(user_id=123, request_id="abcd"):
        ...     logger.info("processing")
    """
    old_context = self._context.copy()
    self._context.update(kwargs)
    try:
        yield
    finally:
        self._context = old_context

acontext async

acontext(**kwargs: Any) -> AsyncGenerator[None, None]

Temporarily bind structured context in asynchronous flows.

Parameters:

Name Type Description Default
**kwargs Any

Key-value pairs to add to the bound context.

{}

Examples:

>>> async with logger.acontext(user_id=123):
...     await logger.ainfo("processing")
Source code in src/jinpy_utils/logger/core.py
@asynccontextmanager
async def acontext(self, **kwargs: Any) -> AsyncGenerator[None, None]:
    """Temporarily bind structured context in asynchronous flows.

    Args:
        **kwargs: Key-value pairs to add to the bound context.

    Examples:
        >>> async with logger.acontext(user_id=123):
        ...     await logger.ainfo("processing")
    """
    old_context = self._context.copy()
    self._context.update(kwargs)
    try:
        yield
    finally:
        self._context = old_context

flush async

flush() -> None

Flush all pending log entries across async and backend buffers.

When async processing is enabled, this waits for the internal queue to drain and then awaits any backend flush implementations.

Source code in src/jinpy_utils/logger/core.py
async def flush(self) -> None:
    """Flush all pending log entries across async and backend buffers.

    When async processing is enabled, this waits for the internal queue to
    drain and then awaits any backend ``flush`` implementations.
    """
    if self._async_queue:
        await self._async_queue.join()

    # Flush all backends
    flush_tasks = []
    for backend in self._backends:
        if hasattr(backend, "flush"):
            flush_tasks.append(backend.flush())

    if flush_tasks:
        await asyncio.gather(*flush_tasks, return_exceptions=True)

close async

close() -> None

Close the logger and cleanup resources.

Cancels internal background tasks (if any), flushes all buffers, and closes each backend. Safe to call multiple times.

Source code in src/jinpy_utils/logger/core.py
async def close(self) -> None:
    """Close the logger and cleanup resources.

    Cancels internal background tasks (if any), flushes all buffers, and
    closes each backend. Safe to call multiple times.
    """
    if self._closed:
        return

    self._closed = True

    # Cancel async processor
    if self._processor_task:
        self._processor_task.cancel()
        with suppress(asyncio.CancelledError):
            await self._processor_task

    # Flush and close all backends
    await self.flush()
    close_tasks = []
    for backend in self._backends:
        close_tasks.append(backend.close())

    if close_tasks:
        await asyncio.gather(*close_tasks, return_exceptions=True)

LoggerManager

LoggerManager

Manage global logger configuration and instances.

The manager coordinates a process-wide registry of :class:Logger instances and provides a single place to set and retrieve the global configuration. It supports optional singleton behavior so repeated calls to :meth:get_logger for the same name return the same instance when enabled in the global configuration.

Thread-safety

All registry mutations are guarded by an internal lock. Reading operations are safe for concurrent use.

Source code in src/jinpy_utils/logger/core.py
class LoggerManager:
    """Manage global logger configuration and instances.

    The manager coordinates a process-wide registry of :class:`Logger`
    instances and provides a single place to set and retrieve the global
    configuration. It supports optional singleton behavior so repeated calls
    to :meth:`get_logger` for the same name return the same instance when
    enabled in the global configuration.

    Thread-safety:
        All registry mutations are guarded by an internal lock. Reading
        operations are safe for concurrent use.
    """

    _instance: Optional["LoggerManager"] = None
    _lock = threading.Lock()
    _instances: ClassVar[dict[str, "Logger"]] = {}
    _global_config: GlobalLoggerConfig | None = None

    def __new__(cls) -> "LoggerManager":
        """Create or return the singleton instance.

        Returns:
            LoggerManager: The process-wide singleton instance.
        """
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance

    def __init__(self) -> None:
        """Initialize the manager on first construction.

        This method is idempotent and safe to call multiple times; the
        initialization block runs only once per process.
        """
        if not getattr(self, "_initialized", False):
            self._initialized = True
            self._cleanup_registry: set[weakref.ReferenceType] = set()
            self._background_tasks: set[asyncio.Task] = set()

    def set_global_config(self, config: GlobalLoggerConfig) -> None:
        """Set the global logger configuration.

        Args:
            config: The global configuration to apply to the manager and all
                current and future :class:`Logger` instances.
        """
        with self._lock:
            self._global_config = config

            # Update existing loggers if needed
            for logger in self._instances.values():
                logger._update_global_config(config)

    def get_global_config(self) -> GlobalLoggerConfig | None:
        """Return the current global configuration if set.

        Returns:
            GlobalLoggerConfig | None: The active global configuration, or
            ``None`` if not configured yet.
        """
        return self._global_config

    def get_logger(
        self,
        name: str,
        config: LoggerConfig | None = None,
        force_new: bool = False,
    ) -> "Logger":
        """Get or create a :class:`Logger` instance.

        Behavior depends on the global configuration:
        - If singleton mode is enabled and ``force_new`` is ``False`` (default),
          a cached instance for ``name`` is returned if present.
        - Otherwise a new instance is created.

        Args:
            name: Name of the logger instance.
            config: Optional per-logger configuration. If omitted, a default
                :class:`LoggerConfig` is created with the given ``name``.
            force_new: When ``True``, always create a new instance even if
                singleton mode is enabled.

        Returns:
            Logger: The logger instance.

        Raises:
            JPYLoggerConfigurationError: If the global configuration is not set
                via :meth:`set_global_config` prior to this call, or if no
                enabled backends are available.
        """
        if not self._global_config:
            raise JPYLoggerConfigurationError(
                message="Global configuration not set. Call set_global_config() first.",
                config_section="global",
            )

        # Check if singleton is enabled and instance exists
        if (
            self._global_config.enable_singleton
            and not force_new
            and name in self._instances
        ):
            return self._instances[name]

        # Create new instance
        logger = Logger(
            name,
            config or LoggerConfig(name=name),
            self._global_config,
        )

        # Store if singleton is enabled or explicitly requested
        if self._global_config.enable_singleton or not force_new:
            with self._lock:
                self._instances[name] = logger

                # Register for cleanup
                ref = weakref.ref(logger, self._cleanup_instance)
                self._cleanup_registry.add(ref)

        return logger

    def _cleanup_instance(self, ref: weakref.ReferenceType) -> None:
        """Cleanup dead references.

        Args:
            ref: A weak reference to a :class:`Logger` instance that has been
                garbage collected.
        """
        self._cleanup_registry.discard(ref)

    def shutdown_all(self) -> None:
        """Shutdown all logger instances.

        If a running event loop is detected, asynchronous ``close`` operations
        are scheduled for each logger; otherwise, they are executed using
        ``asyncio.run``. This ensures cleanup occurs in both synchronous and
        asynchronous application contexts.

        This method is idempotent and safe to call multiple times.
        """
        with self._lock:
            for logger in list(self._instances.values()):
                try:
                    loop = asyncio.get_running_loop()
                except RuntimeError:
                    asyncio.run(logger.close())
                else:
                    task = loop.create_task(logger.close())
                    self._background_tasks.add(task)
                    task.add_done_callback(self._background_tasks.discard)
                    # best-effort: do not await here
            self._instances.clear()
            self._cleanup_registry.clear()

    def get_all_loggers(self) -> list["Logger"]:
        """Return a snapshot of all active logger instances.

        Returns:
            list[Logger]: A list of currently registered loggers.
        """
        with self._lock:
            return list(self._instances.values())

Functions

get_logger

get_logger(
    name: str, config: LoggerConfig | None = None, force_new: bool = False
) -> Logger

Get or create a :class:Logger instance.

Behavior depends on the global configuration: - If singleton mode is enabled and force_new is False (default), a cached instance for name is returned if present. - Otherwise a new instance is created.

Parameters:

Name Type Description Default
name str

Name of the logger instance.

required
config LoggerConfig | None

Optional per-logger configuration. If omitted, a default :class:LoggerConfig is created with the given name.

None
force_new bool

When True, always create a new instance even if singleton mode is enabled.

False

Returns:

Name Type Description
Logger Logger

The logger instance.

Raises:

Type Description
JPYLoggerConfigurationError

If the global configuration is not set via :meth:set_global_config prior to this call, or if no enabled backends are available.

Source code in src/jinpy_utils/logger/core.py
def get_logger(
    self,
    name: str,
    config: LoggerConfig | None = None,
    force_new: bool = False,
) -> "Logger":
    """Get or create a :class:`Logger` instance.

    Behavior depends on the global configuration:
    - If singleton mode is enabled and ``force_new`` is ``False`` (default),
      a cached instance for ``name`` is returned if present.
    - Otherwise a new instance is created.

    Args:
        name: Name of the logger instance.
        config: Optional per-logger configuration. If omitted, a default
            :class:`LoggerConfig` is created with the given ``name``.
        force_new: When ``True``, always create a new instance even if
            singleton mode is enabled.

    Returns:
        Logger: The logger instance.

    Raises:
        JPYLoggerConfigurationError: If the global configuration is not set
            via :meth:`set_global_config` prior to this call, or if no
            enabled backends are available.
    """
    if not self._global_config:
        raise JPYLoggerConfigurationError(
            message="Global configuration not set. Call set_global_config() first.",
            config_section="global",
        )

    # Check if singleton is enabled and instance exists
    if (
        self._global_config.enable_singleton
        and not force_new
        and name in self._instances
    ):
        return self._instances[name]

    # Create new instance
    logger = Logger(
        name,
        config or LoggerConfig(name=name),
        self._global_config,
    )

    # Store if singleton is enabled or explicitly requested
    if self._global_config.enable_singleton or not force_new:
        with self._lock:
            self._instances[name] = logger

            # Register for cleanup
            ref = weakref.ref(logger, self._cleanup_instance)
            self._cleanup_registry.add(ref)

    return logger

set_global_config

set_global_config(config: GlobalLoggerConfig) -> None

Set the global logger configuration.

Parameters:

Name Type Description Default
config GlobalLoggerConfig

The global configuration to apply to the manager and all current and future :class:Logger instances.

required
Source code in src/jinpy_utils/logger/core.py
def set_global_config(self, config: GlobalLoggerConfig) -> None:
    """Set the global logger configuration.

    Args:
        config: The global configuration to apply to the manager and all
            current and future :class:`Logger` instances.
    """
    with self._lock:
        self._global_config = config

        # Update existing loggers if needed
        for logger in self._instances.values():
            logger._update_global_config(config)

shutdown_all

shutdown_all() -> None

Shutdown all logger instances.

If a running event loop is detected, asynchronous close operations are scheduled for each logger; otherwise, they are executed using asyncio.run. This ensures cleanup occurs in both synchronous and asynchronous application contexts.

This method is idempotent and safe to call multiple times.

Source code in src/jinpy_utils/logger/core.py
def shutdown_all(self) -> None:
    """Shutdown all logger instances.

    If a running event loop is detected, asynchronous ``close`` operations
    are scheduled for each logger; otherwise, they are executed using
    ``asyncio.run``. This ensures cleanup occurs in both synchronous and
    asynchronous application contexts.

    This method is idempotent and safe to call multiple times.
    """
    with self._lock:
        for logger in list(self._instances.values()):
            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                asyncio.run(logger.close())
            else:
                task = loop.create_task(logger.close())
                self._background_tasks.add(task)
                task.add_done_callback(self._background_tasks.discard)
                # best-effort: do not await here
        self._instances.clear()
        self._cleanup_registry.clear()

get_all_loggers

get_all_loggers() -> list[Logger]

Return a snapshot of all active logger instances.

Returns:

Type Description
list[Logger]

list[Logger]: A list of currently registered loggers.

Source code in src/jinpy_utils/logger/core.py
def get_all_loggers(self) -> list["Logger"]:
    """Return a snapshot of all active logger instances.

    Returns:
        list[Logger]: A list of currently registered loggers.
    """
    with self._lock:
        return list(self._instances.values())

Data Models

LogEntry

LogEntry

Structured log entry with optimized serialization.

Source code in src/jinpy_utils/logger/backends.py
class LogEntry:
    """Structured log entry with optimized serialization."""

    __slots__ = (
        "context",
        "correlation_id",
        "function",
        "level",
        "line_number",
        "logger_name",
        "message",
        "module",
        "timestamp",
    )

    def __init__(  # noqa: PLR0913
        self,
        level: LogLevel,
        message: str,
        logger_name: str,
        timestamp: datetime | None = None,
        correlation_id: str | None = None,
        context: dict[str, Any] | None = None,
        module: str | None = None,
        function: str | None = None,
        line_number: int | None = None,
    ):
        """Initialize log entry with optimized memory usage."""
        self.timestamp = timestamp or get_current_datetime()
        self.level = level
        self.message = message
        self.logger_name = logger_name
        self.correlation_id = correlation_id
        self.context = context or {}
        self.module = module
        self.function = function
        self.line_number = line_number

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for serialization."""
        return {
            "timestamp": self.timestamp.isoformat(),
            "level": self.level.value,
            "message": self.message,
            "logger_name": self.logger_name,
            "correlation_id": self.correlation_id,
            "context": self.context,
            "module": self.module,
            "function": self.function,
            "line_number": self.line_number,
        }

    def to_json(self) -> str:
        """Convert to JSON string."""
        return json.dumps(self.to_dict(), default=str, separators=(",", ":"))

Functions

__init__

__init__(
    level: LogLevel,
    message: str,
    logger_name: str,
    timestamp: datetime | None = None,
    correlation_id: str | None = None,
    context: dict[str, Any] | None = None,
    module: str | None = None,
    function: str | None = None,
    line_number: int | None = None,
)

Initialize log entry with optimized memory usage.

Source code in src/jinpy_utils/logger/backends.py
def __init__(  # noqa: PLR0913
    self,
    level: LogLevel,
    message: str,
    logger_name: str,
    timestamp: datetime | None = None,
    correlation_id: str | None = None,
    context: dict[str, Any] | None = None,
    module: str | None = None,
    function: str | None = None,
    line_number: int | None = None,
):
    """Initialize log entry with optimized memory usage."""
    self.timestamp = timestamp or get_current_datetime()
    self.level = level
    self.message = message
    self.logger_name = logger_name
    self.correlation_id = correlation_id
    self.context = context or {}
    self.module = module
    self.function = function
    self.line_number = line_number

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Source code in src/jinpy_utils/logger/backends.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for serialization."""
    return {
        "timestamp": self.timestamp.isoformat(),
        "level": self.level.value,
        "message": self.message,
        "logger_name": self.logger_name,
        "correlation_id": self.correlation_id,
        "context": self.context,
        "module": self.module,
        "function": self.function,
        "line_number": self.line_number,
    }

to_json

to_json() -> str

Convert to JSON string.

Source code in src/jinpy_utils/logger/backends.py
def to_json(self) -> str:
    """Convert to JSON string."""
    return json.dumps(self.to_dict(), default=str, separators=(",", ":"))

Examples

Basic Usage

from jinpy_utils.logger import get_logger

# Get a logger instance
logger = get_logger("my_app")

# Log messages with structured data
logger.info("User logged in", user_id=123, ip="192.168.1.1")
logger.warning("High memory usage", usage_percent=85.2, threshold=80)
logger.error("Database error", error_code="DB001", retry_count=3)

Advanced Configuration

from jinpy_utils.logger import get_logger, LoggerConfig, ConsoleBackendConfig, LogLevel

# Custom configuration
config = LoggerConfig(
    level=LogLevel.DEBUG,
    backends=[
        ConsoleBackendConfig(
            level=LogLevel.DEBUG,
            use_colors=True,
            show_source=True
        )
    ]
)

logger = get_logger("advanced_app", config)

Async Usage

import asyncio
from jinpy_utils.logger import get_logger

async def async_operation():
    logger = get_logger("async_app")

    logger.info("Starting async operation")
    await asyncio.sleep(1)
    logger.info("Async operation completed")

# The logger automatically handles async contexts
asyncio.run(async_operation())

Context Management

from jinpy_utils.logger import get_logger
from contextlib import contextmanager

@contextmanager
def log_operation(operation_name: str):
    logger = get_logger("operations")
    logger.info(f"Starting {operation_name}")
    try:
        yield
        logger.info(f"Completed {operation_name}")
    except Exception as e:
        logger.error(f"Failed {operation_name}", error=str(e))
        raise

# Usage
with log_operation("user_registration"):
    # Your operation here
    pass

Global Configuration

from jinpy_utils.logger import set_global_config, get_logger, create_production_config

# Set global configuration
set_global_config(create_production_config())

# All subsequent loggers will use the global config
logger1 = get_logger("app1")
logger2 = get_logger("app2")

Environment-Based Setup

import os
from jinpy_utils.logger import configure_from_env, get_logger

# Set environment variables
os.environ['JINPY_LOG_LEVEL'] = 'DEBUG'
os.environ['JINPY_LOG_FORMAT'] = 'json'

# Configure from environment
config = configure_from_env()
logger = get_logger("env_app", config)

Cleanup

from jinpy_utils.logger import shutdown_all_loggers

# At application shutdown
shutdown_all_loggers()

Performance Considerations

Lazy Evaluation

The logger uses lazy evaluation for message formatting:

# Efficient - formatting only happens if DEBUG is enabled
logger.debug("Processing item %d of %d", current_item, total_items)

# Less efficient - formatting happens regardless
logger.debug(f"Processing item {current_item} of {total_items}")

Structured Logging Performance

# Use keyword arguments for structured fields
logger.info("User action", user_id=123, action="login")  # Fast

# Avoid complex objects as values
logger.info("User action", user=user_object)  # Slower (serialization)

Async Performance

# For high-throughput applications, enable async mode
config = LoggerConfig(async_enabled=True, buffer_size=10000)
logger = get_logger("high_perf", config)

Thread Safety

All logger operations are thread-safe:

import threading
from jinpy_utils.logger import get_logger

logger = get_logger("threaded_app")

def worker(worker_id: int):
    logger.info("Worker started", worker_id=worker_id)
    # Safe to use from multiple threads

# Start multiple threads
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Additional Use Cases

HTTP microservice with request-scoped context

from contextlib import asynccontextmanager
from jinpy_utils.logger import get_logger

logger = get_logger("api")

@asynccontextmanager
async def request_context(request_id: str, user_id: str | None):
    async with logger.acontext(request_id=request_id, user_id=user_id):
        yield

async def handle_request(req):
    async with request_context(req.id, req.user_id):
        await logger.ainfo("request_received", {"path": req.path})
        # ... application logic ...
        await logger.ainfo("request_completed")

Background worker with batching backends

import asyncio
from jinpy_utils.logger import get_logger

logger = get_logger("worker")

async def worker_loop():
    while True:
        await logger.ainfo("heartbeat")
        await asyncio.sleep(5)

asyncio.run(worker_loop())

Multi-tenant applications with bound context

from jinpy_utils.logger import get_logger

logger = get_logger("multi_tenant")
tenant_logger = logger.bind(tenant_id="t-1234")

tenant_logger.info("tenant_initialized")