aiokpl.aggregator¶
aggregator ¶
UserRecord → AggregatedBatch reducer, one batch per predicted shard.
Mirrors aws/kinesis/core/aggregator.h in the C++ KPL: each predicted shard
owns a private :class:Reducer whose container is an :class:AggregatedBatch.
Records destined for the same shard pile into one aggregated payload; records
destined for different shards never mix — the shard is the unit of
optimization.
When the :class:ShardMap is not READY (or returns None for a hash key),
records route to a catch-all None-shard batch in single-record mode. The
same single-record path is taken when aggregation_enabled=False.
Size estimation follows the C++ KinesisRecord::accurate_size /
estimated_size distinction: a cheap monotonic running total tracks the
proto framing overhead plus deduped partition-key / explicit-hash-key
references (see kinesis_record.cc:145-173). A fully accurate size is only
needed at serialization time, which the consumer obtains via :meth:to_blob.
Lifecycle is structured: enter the :class:Aggregator as an async context
manager, which spawns an internal :class:anyio.abc.TaskGroup shared by
every per-shard :class:Reducer.
AggregatedBatch
dataclass
¶
AggregatedBatch(
predicted_shard: int | None,
_items: list[_BufferedRecord] = list(),
_size_estimate: int = 0,
_pk_set: set[str] = set(),
_ehk_set: set[str] = set(),
)
A per-shard accumulation of :class:UserRecords.
Serializes to a single Kinesis API record: raw bytes when count == 1
(the C++ KPL "single-record short-circuit"), or the KPL aggregated wire
format when count > 1.
to_blob ¶
routing_partition_key ¶
API-level partition key. "a" for aggregated batches; the
single record's partition key otherwise.
routing_explicit_hash_key ¶
API-level explicit hash key.
For aggregated batches we anchor on the first record's hash key
(mid-range of the predicted shard, guaranteed to route correctly).
For singles we forward whatever the user provided, or None.
Source code in aiokpl/aggregator.py
Aggregator ¶
Aggregator(
shard_map: _ShardLookup,
*,
on_batch_ready: Callable[
[AggregatedBatch], Awaitable[None]
],
aggregation_enabled: bool = True,
record_max_buffered_time_ms: float = 100.0,
aggregation_max_count: int = 4294967295,
aggregation_max_size: int = 51200,
clock: Callable[[], float] = time.monotonic,
metrics: MetricsManager | None = None,
stream_name: str | None = None,
)
Per-shard :class:Reducer orchestrator producing :class:AggregatedBatch.
The catch-all None reducer is used when the shard map is not READY,
when the predicted shard falls outside the cached range, and as the only
reducer when aggregation is globally disabled (in which case its
count_limit is 1 — every record flushes as its own batch).
Enter via async with Aggregator(...) as agg:. The internal
:class:anyio.abc.TaskGroup is shared with every per-shard
:class:Reducer.
Source code in aiokpl/aggregator.py
put
async
¶
Compute hash key, predict shard, route to the per-shard reducer.
If the reducer returns a closed batch (limit-trigger), dispatch it to
on_batch_ready immediately. Deadline-triggered closures go through
the same callback wired into each reducer.
Back-compat wrapper over :meth:submit that discards the buffered
record handle.
Source code in aiokpl/aggregator.py
build_buffered ¶
Build the :class:_BufferedRecord for user_record without routing.
Used by the Producer so it can register a buffered → Outcome mapping
before calling :meth:put_buffered. Splitting build from route closes
a race where a synchronous flush would otherwise fire the terminal
callback against a record we have not yet bound an Outcome to.
Source code in aiokpl/aggregator.py
submit
async
¶
Like :meth:put but returns the routed :class:_BufferedRecord.
Equivalent to put but exposes the buffered handle to the caller
so the Producer can correlate the record with its user-facing
:class:aiokpl.outcome.Outcome. The buffered record is built first,
then routed via :meth:put_buffered.
Source code in aiokpl/aggregator.py
put_buffered
async
¶
Re-entry point used by the Retrier to re-enqueue a record.
The buffered record keeps its accumulated :class:Attempt history and
its original arrival_time so the absolute TTL is honoured across
retries. The predicted shard may differ from the previous attempt
because the cached :class:ShardMap may have refreshed in between.
Source code in aiokpl/aggregator.py
flush
async
¶
Close every per-shard batch and dispatch on_batch_ready.
Source code in aiokpl/aggregator.py
aclose
async
¶
Drop in-flight items; cancel every per-shard timer. Idempotent.