Skip to content

aiokpl.aggregator

aggregator

UserRecord → AggregatedBatch reducer, one batch per predicted shard.

Mirrors aws/kinesis/core/aggregator.h in the C++ KPL: each predicted shard owns a private :class:Reducer whose container is an :class:AggregatedBatch. Records destined for the same shard pile into one aggregated payload; records destined for different shards never mix — the shard is the unit of optimization.

When the :class:ShardMap is not READY (or returns None for a hash key), records route to a catch-all None-shard batch in single-record mode. The same single-record path is taken when aggregation_enabled=False.

Size estimation follows the C++ KinesisRecord::accurate_size / estimated_size distinction: a cheap monotonic running total tracks the proto framing overhead plus deduped partition-key / explicit-hash-key references (see kinesis_record.cc:145-173). A fully accurate size is only needed at serialization time, which the consumer obtains via :meth:to_blob.

Lifecycle is structured: enter the :class:Aggregator as an async context manager, which spawns an internal :class:anyio.abc.TaskGroup shared by every per-shard :class:Reducer.

AggregatedBatch dataclass

AggregatedBatch(
    predicted_shard: int | None,
    _items: list[_BufferedRecord] = list(),
    _size_estimate: int = 0,
    _pk_set: set[str] = set(),
    _ehk_set: set[str] = set(),
)

A per-shard accumulation of :class:UserRecords.

Serializes to a single Kinesis API record: raw bytes when count == 1 (the C++ KPL "single-record short-circuit"), or the KPL aggregated wire format when count > 1.

to_blob

to_blob() -> bytes

Encode to the on-the-wire Kinesis Data payload.

Source code in aiokpl/aggregator.py
def to_blob(self) -> bytes:
    """Encode to the on-the-wire Kinesis Data payload."""
    return encode_aggregated([it.user_record for it in self._items])

routing_partition_key

routing_partition_key() -> str

API-level partition key. "a" for aggregated batches; the single record's partition key otherwise.

Source code in aiokpl/aggregator.py
def routing_partition_key(self) -> str:
    """API-level partition key. ``"a"`` for aggregated batches; the
    single record's partition key otherwise.
    """
    if len(self._items) > 1:
        return "a"
    return self._items[0].user_record.partition_key

routing_explicit_hash_key

routing_explicit_hash_key() -> str | None

API-level explicit hash key.

For aggregated batches we anchor on the first record's hash key (mid-range of the predicted shard, guaranteed to route correctly). For singles we forward whatever the user provided, or None.

Source code in aiokpl/aggregator.py
def routing_explicit_hash_key(self) -> str | None:
    """API-level explicit hash key.

    For aggregated batches we anchor on the first record's hash key
    (mid-range of the predicted shard, guaranteed to route correctly).
    For singles we forward whatever the user provided, or ``None``.
    """
    if len(self._items) > 1:
        return str(self._items[0].hash_key)
    return self._items[0].user_record.explicit_hash_key

Aggregator

Aggregator(
    shard_map: _ShardLookup,
    *,
    on_batch_ready: Callable[
        [AggregatedBatch], Awaitable[None]
    ],
    aggregation_enabled: bool = True,
    record_max_buffered_time_ms: float = 100.0,
    aggregation_max_count: int = 4294967295,
    aggregation_max_size: int = 51200,
    clock: Callable[[], float] = time.monotonic,
    metrics: MetricsManager | None = None,
    stream_name: str | None = None,
)

Per-shard :class:Reducer orchestrator producing :class:AggregatedBatch.

The catch-all None reducer is used when the shard map is not READY, when the predicted shard falls outside the cached range, and as the only reducer when aggregation is globally disabled (in which case its count_limit is 1 — every record flushes as its own batch).

Enter via async with Aggregator(...) as agg:. The internal :class:anyio.abc.TaskGroup is shared with every per-shard :class:Reducer.

Source code in aiokpl/aggregator.py
def __init__(
    self,
    shard_map: _ShardLookup,
    *,
    on_batch_ready: Callable[[AggregatedBatch], Awaitable[None]],
    aggregation_enabled: bool = True,
    record_max_buffered_time_ms: float = 100.0,
    aggregation_max_count: int = 4_294_967_295,
    aggregation_max_size: int = 51_200,
    clock: Callable[[], float] = time.monotonic,
    metrics: MetricsManager | None = None,
    stream_name: str | None = None,
) -> None:
    self._shard_map = shard_map
    self._on_batch_ready = on_batch_ready
    self._aggregation_enabled = aggregation_enabled
    self._buffered_time = record_max_buffered_time_ms / 1000.0
    self._max_count = aggregation_max_count if aggregation_enabled else 1
    self._max_size = aggregation_max_size
    self._clock = clock
    self._metrics = metrics
    self._stream_name = stream_name

    self._reducers: dict[int | None, Reducer[_BufferedRecord, AggregatedBatch]] = {}
    self._lock = anyio.Lock()
    self._closed = False
    self._tg: TaskGroup | None = None

put async

put(user_record: UserRecord) -> None

Compute hash key, predict shard, route to the per-shard reducer.

If the reducer returns a closed batch (limit-trigger), dispatch it to on_batch_ready immediately. Deadline-triggered closures go through the same callback wired into each reducer.

Back-compat wrapper over :meth:submit that discards the buffered record handle.

Source code in aiokpl/aggregator.py
async def put(self, user_record: UserRecord) -> None:
    """Compute hash key, predict shard, route to the per-shard reducer.

    If the reducer returns a closed batch (limit-trigger), dispatch it to
    ``on_batch_ready`` immediately. Deadline-triggered closures go through
    the same callback wired into each reducer.

    Back-compat wrapper over :meth:`submit` that discards the buffered
    record handle.
    """
    await self.submit(user_record)

build_buffered

build_buffered(user_record: UserRecord) -> _BufferedRecord

Build the :class:_BufferedRecord for user_record without routing.

Used by the Producer so it can register a buffered → Outcome mapping before calling :meth:put_buffered. Splitting build from route closes a race where a synchronous flush would otherwise fire the terminal callback against a record we have not yet bound an Outcome to.

Source code in aiokpl/aggregator.py
def build_buffered(self, user_record: UserRecord) -> _BufferedRecord:
    """Build the :class:`_BufferedRecord` for ``user_record`` *without* routing.

    Used by the Producer so it can register a buffered → Outcome mapping
    before calling :meth:`put_buffered`. Splitting build from route closes
    a race where a synchronous flush would otherwise fire the terminal
    callback against a record we have not yet bound an Outcome to.
    """
    if user_record.explicit_hash_key is not None:
        hash_key = parse_explicit_hash_key(user_record.explicit_hash_key)
    else:
        hash_key = md5_hash_key(user_record.partition_key)
    now = self._clock()
    if self._metrics is not None:
        self._metrics.put(
            NAME_USER_RECORDS_RECEIVED,
            1.0,
            stream=self._stream_name,
        )
    return _BufferedRecord(
        user_record=user_record,
        deadline=now + self._buffered_time,
        hash_key=hash_key,
        arrival_time=now,
    )

submit async

submit(user_record: UserRecord) -> _BufferedRecord

Like :meth:put but returns the routed :class:_BufferedRecord.

Equivalent to put but exposes the buffered handle to the caller so the Producer can correlate the record with its user-facing :class:aiokpl.outcome.Outcome. The buffered record is built first, then routed via :meth:put_buffered.

Source code in aiokpl/aggregator.py
async def submit(self, user_record: UserRecord) -> _BufferedRecord:
    """Like :meth:`put` but returns the routed :class:`_BufferedRecord`.

    Equivalent to ``put`` but exposes the buffered handle to the caller
    so the Producer can correlate the record with its user-facing
    :class:`aiokpl.outcome.Outcome`. The buffered record is built first,
    then routed via :meth:`put_buffered`.
    """
    buffered = self.build_buffered(user_record)
    await self.put_buffered(buffered)
    return buffered

put_buffered async

put_buffered(buffered: _BufferedRecord) -> None

Re-entry point used by the Retrier to re-enqueue a record.

The buffered record keeps its accumulated :class:Attempt history and its original arrival_time so the absolute TTL is honoured across retries. The predicted shard may differ from the previous attempt because the cached :class:ShardMap may have refreshed in between.

Source code in aiokpl/aggregator.py
async def put_buffered(self, buffered: _BufferedRecord) -> None:
    """Re-entry point used by the Retrier to re-enqueue a record.

    The buffered record keeps its accumulated :class:`Attempt` history and
    its original ``arrival_time`` so the absolute TTL is honoured across
    retries. The predicted shard may differ from the previous attempt
    because the cached :class:`ShardMap` may have refreshed in between.
    """
    if self._shard_map.state is ShardMapState.READY:
        predicted = self._shard_map.predict(buffered.hash_key)
    else:
        predicted = None

    reducer = await self._get_or_create_reducer(predicted)
    closed = await reducer.add(buffered)
    if closed is not None:
        await self._on_batch_ready(closed)

flush async

flush() -> None

Close every per-shard batch and dispatch on_batch_ready.

Source code in aiokpl/aggregator.py
async def flush(self) -> None:
    """Close every per-shard batch and dispatch ``on_batch_ready``."""
    async with self._lock:
        reducers = list(self._reducers.values())
    for r in reducers:
        closed = await r.flush()
        if closed is not None:
            await self._on_batch_ready(closed)

aclose async

aclose() -> None

Drop in-flight items; cancel every per-shard timer. Idempotent.

Source code in aiokpl/aggregator.py
async def aclose(self) -> None:
    """Drop in-flight items; cancel every per-shard timer. Idempotent."""
    async with self._lock:
        self._closed = True
        reducers = list(self._reducers.values())
        self._reducers.clear()
    for r in reducers:
        await r.aclose()