Writing a custom MetricsSink¶
aiokpl emits semantic events; the destination is yours. To plug a new
backend in, satisfy the MetricsSink Protocol.
Minimal sink¶
from collections.abc import Sequence
from aiokpl.sinks import MetricSnapshot
class StdoutSink:
async def export(self, snapshots: Sequence[MetricSnapshot]) -> None:
for s in snapshots:
print(f"{s.name} count={s.count} sum={s.sum} dims={s.dimensions}")
async def __aenter__(self) -> "StdoutSink":
return self
async def __aexit__(self, *_exc: object) -> None:
return None
Plug it into the Producer:
from aiokpl import Config, MetricsLevel, Producer
cfg = Config(
region="us-east-1",
metrics_level=MetricsLevel.DETAILED,
metrics_sink=StdoutSink(),
metrics_upload_interval_ms=10_000,
)
async with Producer(cfg) as producer:
...
Per-event resolution¶
If you want every observation (not just aggregated snapshots), implement
EventfulMetricsSink too — add a
synchronous record(event) method:
from aiokpl.sinks import MetricEvent
class EventfulStdoutSink(StdoutSink):
def record(self, event: MetricEvent) -> None:
print(f"event {event.name}={event.value} dims={event.dimensions}")
MetricsManager checks isinstance(sink, EventfulMetricsSink) on every
put and calls record(event) when the check passes. record is sync on
purpose: it runs inside the hot path. Async I/O belongs in export.
Lifecycle¶
MetricsManager.__aenter__ enters the sink and starts the upload loop.
__aexit__ runs a final flush() before tearing the sink down, so no
window is dropped. Sinks should release their transports in __aexit__.
Dimensions¶
MetricSnapshot.dimensions is a tuple[tuple[str, str], ...]. aiokpl uses
the keys "stream", "shard", "error_code". Translate to whatever your
backend expects (tags, attributes, dimensions, labels). The CloudWatch sink
uses a small lookup table; OpenTelemetry passes dimensions through as
attributes; Datadog renders them as key:value tags. Unknown keys are
forwarded verbatim.