Skip to content

aiokpl.sinks

sinks

Vendor-neutral metrics sinks for aiokpl.

The library emits semantic events; the sink decides where they go. Sinks are plugged into :class:aiokpl.metrics.MetricsManager via the :class:MetricsSink Protocol.

Hot-path code only knows about :class:MetricEvent and :class:MetricSnapshot. First-party sinks (Null, InMemory, CloudWatch) ship in this package; OpenTelemetry and Datadog implementations are gated behind aiokpl[otel] / aiokpl[datadog] extras and live in their own modules (aiokpl.sinks.opentelemetry, aiokpl.sinks.datadog).

EventfulMetricsSink

Bases: Protocol

Optional extension Protocol for sinks that consume per-event data.

record is synchronous on purpose: it runs on the hot path inside :meth:MetricsManager.put (which is itself sync). Async work belongs in :meth:MetricsSink.export, which is dispatched on the upload loop.

MetricEvent dataclass

MetricEvent(
    name: str,
    value: float,
    timestamp: float,
    dimensions: tuple[tuple[str, str], ...] = (),
)

A single observation emitted by the library.

dimensions is a tuple of (name, value) pairs so the event is hashable and frozen. Common keys: "stream", "shard", "error_code".

MetricSnapshot dataclass

MetricSnapshot(
    name: str,
    count: int,
    sum: float,
    min: float,
    max: float,
    dimensions: tuple[tuple[str, str], ...] = (),
    window_start: float = 0.0,
    window_end: float = 0.0,
)

A flushed aggregate over a window.

Produced by :class:aiokpl.metrics.MetricsManager on every export tick. Sinks consume these in :meth:MetricsSink.export.

MetricsSink

Bases: Protocol

Vendor-neutral sink for metric snapshots.

The :class:aiokpl.metrics.MetricsManager batches observations into :class:MetricSnapshot windows and calls :meth:export on a schedule. Sinks that want per-event resolution can additionally implement :class:EventfulMetricsSink and the manager will dispatch every observation through :meth:EventfulMetricsSink.record.

CloudWatchSink

CloudWatchSink(
    *,
    region: str,
    namespace: str = "aiokpl",
    endpoint_url: str | None = None,
    verify_ssl: bool = True,
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    client_factory: Callable[
        [], AbstractAsyncContextManager[Any]
    ]
    | None = None,
)

Forwards aggregated :class:MetricSnapshot batches to CloudWatch.

Parameters

region: AWS region for the CloudWatch endpoint. namespace: CloudWatch namespace under which every datum is posted. endpoint_url: Override CloudWatch endpoint (for local stacks / tests). verify_ssl: Forwarded to aiobotocore. Set to False for self-signed certs. aws_access_key_id / aws_secret_access_key / aws_session_token: Explicit credentials. If None the default credential chain runs. client_factory: Override the default aiobotocore client factory. Used by tests to inject a fake CloudWatch client.

Source code in aiokpl/sinks/cloudwatch.py
def __init__(
    self,
    *,
    region: str,
    namespace: str = "aiokpl",
    endpoint_url: str | None = None,
    verify_ssl: bool = True,
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    client_factory: Callable[[], AbstractAsyncContextManager[Any]] | None = None,
) -> None:
    self._region = region
    self._namespace = namespace
    self._endpoint_url = endpoint_url
    self._verify_ssl = verify_ssl
    self._aws_access_key_id = aws_access_key_id
    self._aws_secret_access_key = aws_secret_access_key
    self._aws_session_token = aws_session_token
    self._client_factory = client_factory or self._default_factory
    self._client: Any = None
    self._client_ctx: AbstractAsyncContextManager[Any] | None = None

InMemorySink

InMemorySink()

Captures every batch of snapshots export is called with.

Order is preserved (insertion order). Both :attr:exports and the convenience accessors return immutable views.

Source code in aiokpl/sinks/memory.py
def __init__(self) -> None:
    self._exports: list[tuple[MetricSnapshot, ...]] = []

exports property

exports: tuple[tuple[MetricSnapshot, ...], ...]

Every batch of snapshots ever exported, in call order.

all_snapshots property

all_snapshots: tuple[MetricSnapshot, ...]

Flatten every batch into a single tuple, preserving order.

by_name

by_name(name: str) -> tuple[MetricSnapshot, ...]

Return every snapshot whose name matches the argument.

Source code in aiokpl/sinks/memory.py
def by_name(self, name: str) -> tuple[MetricSnapshot, ...]:
    """Return every snapshot whose ``name`` matches the argument."""
    return tuple(s for s in self.all_snapshots if s.name == name)

NullSink

Discards every metric. Default sink when config.metrics_sink is None.