Skip to content

aiokpl.collector

collector

AggregatedBatch → PutRecordsBatch reducer.

Mirrors aws/kinesis/core/collector.h in the C++ KPL: a single :class:Reducer over a stream of :class:AggregatedBatch, with the should_flush short-circuit that closes the batch once any single shard's share exceeds 256 KiB. Combined with the 500-record and 5 MiB hard caps imposed by the Kinesis PutRecords API, this keeps per-shard latency bounded even under skewed write patterns.

Lifecycle is structured: enter the :class:Collector as an async context manager, which spawns an internal :class:anyio.abc.TaskGroup used by its single :class:Reducer.

PutRecordsBatch dataclass

PutRecordsBatch(
    _items: list[AggregatedBatch] = list(),
    _size_bytes: int = 0,
    _per_shard_bytes: dict[int | None, int] = dict(),
)

A collection of :class:AggregatedBatch destined for one PutRecords call.

Collector

Collector(
    *,
    on_batch_ready: Callable[
        [PutRecordsBatch], Awaitable[None]
    ],
    collection_max_count: int = 500,
    collection_max_size: int = 5 * 1024 * 1024,
    per_shard_short_circuit_bytes: int = 256 * 1024,
    clock: Callable[[], float] = time.monotonic,
)

Single-instance :class:Reducer packing aggregated batches into PutRecords calls.

Source code in aiokpl/collector.py
def __init__(
    self,
    *,
    on_batch_ready: Callable[[PutRecordsBatch], Awaitable[None]],
    collection_max_count: int = 500,
    collection_max_size: int = 5 * 1024 * 1024,
    per_shard_short_circuit_bytes: int = 256 * 1024,
    clock: Callable[[], float] = time.monotonic,
) -> None:
    self._on_batch_ready = on_batch_ready
    self._threshold = per_shard_short_circuit_bytes
    self._collection_max_count = collection_max_count
    self._collection_max_size = collection_max_size
    self._clock = clock

    self._reducer: Reducer[AggregatedBatch, PutRecordsBatch] | None = None
    self._tg: TaskGroup | None = None