Skip to content

aiokpl.sinks.cloudwatch

cloudwatch

:class:CloudWatchSink — first-party sink that uploads to CloudWatch.

Extracted from the original Phase 7 MetricsManager. The transport remains aiobotocore's asyncio CloudWatch client; the chunking, payload shape, and dimension naming are unchanged so existing dashboards keep working without edits.

The sink does not know about :class:aiokpl.metrics.MetricsManager; it takes pre-built :class:MetricSnapshot instances and posts them on export. Constructor arguments cover the same surface Config used to expose for CloudWatch (region, endpoint, credentials, namespace).

CloudWatchSink

CloudWatchSink(
    *,
    region: str,
    namespace: str = "aiokpl",
    endpoint_url: str | None = None,
    verify_ssl: bool = True,
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    client_factory: Callable[
        [], AbstractAsyncContextManager[Any]
    ]
    | None = None,
)

Forwards aggregated :class:MetricSnapshot batches to CloudWatch.

Parameters

region: AWS region for the CloudWatch endpoint. namespace: CloudWatch namespace under which every datum is posted. endpoint_url: Override CloudWatch endpoint (for local stacks / tests). verify_ssl: Forwarded to aiobotocore. Set to False for self-signed certs. aws_access_key_id / aws_secret_access_key / aws_session_token: Explicit credentials. If None the default credential chain runs. client_factory: Override the default aiobotocore client factory. Used by tests to inject a fake CloudWatch client.

Source code in aiokpl/sinks/cloudwatch.py
def __init__(
    self,
    *,
    region: str,
    namespace: str = "aiokpl",
    endpoint_url: str | None = None,
    verify_ssl: bool = True,
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    client_factory: Callable[[], AbstractAsyncContextManager[Any]] | None = None,
) -> None:
    self._region = region
    self._namespace = namespace
    self._endpoint_url = endpoint_url
    self._verify_ssl = verify_ssl
    self._aws_access_key_id = aws_access_key_id
    self._aws_secret_access_key = aws_secret_access_key
    self._aws_session_token = aws_session_token
    self._client_factory = client_factory or self._default_factory
    self._client: Any = None
    self._client_ctx: AbstractAsyncContextManager[Any] | None = None