Skip to content

aiokpl.metrics

metrics

In-process metric accumulator + scheduled flush onto a :class:MetricsSink.

Mirrors aws/metrics/metrics_manager.{h,cc} plus aws/metrics/accumulator.h of the C++ KPL. The data model is intentionally simple: each named metric owns a rolling 60-second window of (count, sum, min, max) bucketed by integer monotonic second. Metric names follow the C++ KPL constants verbatim so operators reading aiokpl dashboards next to native-KPL dashboards see the same labels.

The :class:MetricsManager is off by default: when level == MetricsLevel.NONE :meth:put returns immediately without any allocation, and __aenter__ spawns no upload task. The Manager flushes aggregated snapshots onto a :class:aiokpl.sinks.MetricsSink. The library itself knows nothing about CloudWatch, OpenTelemetry, or Datadog — those are first-party sinks shipped under :mod:aiokpl.sinks.

MetricsLevel

Bases: Enum

How much detail :class:MetricsManager records.

NONE is the zero-overhead default: :meth:MetricsManager.put returns immediately, no upload task is spawned, no sink is entered. SUMMARY keeps only the global+stream dimensions (shard_id and error_code are dropped). DETAILED keeps every dimension.

MetricKey dataclass

MetricKey(
    name: str,
    stream: str | None = None,
    shard_id: str | None = None,
    error_code: str | None = None,
)

The identity of a metric: name plus optional dimensions.

Dimensions are independent — two metrics with the same name but different stream are distinct. Frozen so it is hashable and safe as a dict key.

dimensions

dimensions() -> tuple[tuple[str, str], ...]

Render this key as the (name, value) tuple sinks consume.

Source code in aiokpl/metrics.py
def dimensions(self) -> tuple[tuple[str, str], ...]:
    """Render this key as the ``(name, value)`` tuple sinks consume."""
    dims: list[tuple[str, str]] = []
    if self.stream is not None:
        dims.append(("stream", self.stream))
    if self.shard_id is not None:
        dims.append(("shard", self.shard_id))
    if self.error_code is not None:
        dims.append(("error_code", self.error_code))
    return tuple(dims)

Metric

Metric(
    key: MetricKey,
    clock: Callable[[], float] = time.monotonic,
)

A named counter/distribution holding one :class:_Accumulator.

Source code in aiokpl/metrics.py
def __init__(
    self,
    key: MetricKey,
    clock: Callable[[], float] = time.monotonic,
) -> None:
    self.key = key
    self._acc = _Accumulator(clock=clock)

MetricsManager

MetricsManager(
    *,
    level: MetricsLevel = MetricsLevel.NONE,
    sink: MetricsSink | None = None,
    upload_interval_ms: float = 60000.0,
    clock: Callable[[], float] = time.monotonic,
    sleep_fn: Callable[
        [float], Awaitable[None]
    ] = anyio.sleep,
)

Owns the metric registry and schedules flushes onto a :class:MetricsSink.

When level == MetricsLevel.NONE: :meth:put is a no-op, no upload task is spawned, the sink is not entered. This is the zero-overhead path the Producer relies on for the default config.

When level == MetricsLevel.SUMMARY: shard_id and error_code dimensions are dropped before registry lookup so only coarse keys exist.

When level == MetricsLevel.DETAILED: every dimension is preserved.

The sink owns its own transport lifecycle; the manager enters and exits it as part of its own async with so callers only manage one context.

Source code in aiokpl/metrics.py
def __init__(
    self,
    *,
    level: MetricsLevel = MetricsLevel.NONE,
    sink: MetricsSink | None = None,
    upload_interval_ms: float = 60_000.0,
    clock: Callable[[], float] = time.monotonic,
    sleep_fn: Callable[[float], Awaitable[None]] = anyio.sleep,
) -> None:
    self._level = level
    self._sink: MetricsSink = sink if sink is not None else NullSink()
    self._upload_interval = upload_interval_ms / 1000.0
    self._clock = clock
    self._sleep_fn = sleep_fn

    self._registry: dict[MetricKey, Metric] = {}
    self._tg: TaskGroup | None = None
    self._state: _UploadState | None = None
    self._closed = False

put

put(
    name: str,
    value: float = 1.0,
    *,
    stream: str | None = None,
    shard_id: str | None = None,
    error_code: str | None = None,
) -> None

Record an observation. No-op when level == NONE.

When level == SUMMARY the shard_id and error_code dimensions are dropped at lookup so the registry stays coarse.

When the underlying sink implements :class:EventfulMetricsSink, :meth:record is invoked synchronously with a :class:MetricEvent.

Source code in aiokpl/metrics.py
def put(
    self,
    name: str,
    value: float = 1.0,
    *,
    stream: str | None = None,
    shard_id: str | None = None,
    error_code: str | None = None,
) -> None:
    """Record an observation. No-op when ``level == NONE``.

    When ``level == SUMMARY`` the ``shard_id`` and ``error_code``
    dimensions are dropped at lookup so the registry stays coarse.

    When the underlying sink implements :class:`EventfulMetricsSink`,
    :meth:`record` is invoked synchronously with a :class:`MetricEvent`.
    """
    if self._level is MetricsLevel.NONE:
        return
    if self._level is MetricsLevel.SUMMARY:
        key = MetricKey(name=name, stream=stream)
    else:
        key = MetricKey(
            name=name,
            stream=stream,
            shard_id=shard_id,
            error_code=error_code,
        )
    metric = self._registry.get(key)
    if metric is None:
        metric = Metric(key, clock=self._clock)
        self._registry[key] = metric
    metric.put(value)
    sink = self._sink
    if isinstance(sink, EventfulMetricsSink):
        sink.record(
            MetricEvent(
                name=name,
                value=value,
                timestamp=self._clock(),
                dimensions=key.dimensions(),
            )
        )

snapshot

snapshot() -> dict[
    MetricKey, tuple[int, float, float, float]
]

Return (count, sum, min, max) per metric for the live window.

Kept for backward compatibility with in-process inspectors (tests, embedded callers). Sinks consume :meth:snapshots instead.

Source code in aiokpl/metrics.py
def snapshot(self) -> dict[MetricKey, tuple[int, float, float, float]]:
    """Return ``(count, sum, min, max)`` per metric for the live window.

    Kept for backward compatibility with in-process inspectors (tests,
    embedded callers). Sinks consume :meth:`snapshots` instead.
    """
    result: dict[MetricKey, tuple[int, float, float, float]] = {}
    for key, metric in self._registry.items():
        stats = metric.stats()
        if stats is not None:
            result[key] = stats
    return result

snapshots

snapshots() -> tuple[MetricSnapshot, ...]

Build :class:MetricSnapshot instances for the live window.

Source code in aiokpl/metrics.py
def snapshots(self) -> tuple[MetricSnapshot, ...]:
    """Build :class:`MetricSnapshot` instances for the live window."""
    out: list[MetricSnapshot] = []
    for key, metric in self._registry.items():
        stats = metric.stats()
        if stats is None:
            continue
        count, total, mn, mx = stats
        ws, we = metric.window_bounds()
        out.append(
            MetricSnapshot(
                name=key.name,
                count=count,
                sum=total,
                min=mn,
                max=mx,
                dimensions=key.dimensions(),
                window_start=ws,
                window_end=we,
            )
        )
    return tuple(out)

flush async

flush() -> None

Build a snapshot and call sink.export. No-op when level is NONE.

Source code in aiokpl/metrics.py
async def flush(self) -> None:
    """Build a snapshot and call ``sink.export``. No-op when level is NONE."""
    if self._level is MetricsLevel.NONE:
        return
    snaps = self.snapshots()
    if not snaps:
        return
    await self._sink.export(snaps)