Phase 3 — Batching (Reducer / Aggregator / Collector)¶
Status: Done.
Phase 3 is where the producer earns its keep. The Sender and Retrier glue around it are mechanical; the batching stages are where every philosophical bullet in the README turns into running code: deadlines drive flushes, the shard is the unit of optimization, each stage owns one responsibility.
Why three stages, not one¶
A naïve producer interleaves two unrelated jobs into a single batcher:
"group user records into a single aggregated payload" and "group
aggregated payloads into a PutRecords call". The C++ KPL deliberately
separates them, and so does aiokpl, because their constraints are
different:
| Stage | Input | Output | Hard limits | Why the split |
|---|---|---|---|---|
| Aggregator | UserRecord |
AggregatedRecord per shard |
4 294 967 295 records, 51 200 B | One predicted shard's payload — sharing the wire-format dedup tables. |
| Collector | AggregatedRecord |
PutRecordsBatch |
500 records, 5 MiB total, 256 KiB per shard | One Kinesis API call — independent of shard identity. |
| Reducer | generic (item, batch) |
closed batch | caller-supplied | The deadline-driven mechanism reused by both. |
The Aggregator can never produce a batch that exceeds a shard's share
of the API call (it would violate Kinesis's per-shard throughput
ceiling). The Collector can never know the wire-format dedup tables (it
sees opaque AggregatedRecords). Splitting them is what makes each
stage a hundred-line file with a single test surface.
Reducer's contract¶
aiokpl.reducer.Reducer is the
generic batcher both higher stages stand on. Its contract:
- Deadline-driven. Every batchable item exposes a monotonic
deadline: float. The Reducer reprograms ananyiotimer to the current batch's earliest deadline on everyadd; when the timer fires, the batch is closed and handed toon_deadline. - FIFO-by-deadline packing. When a hard limit (count or size) is hit
or the optional
flush_predicatefires, the Reducer closes the current batch and packs whatever it can FIFO-by-deadline. Excess items are re-injected into a fresh active batch — never dropped. - Hard limits use
>=semantics.count >= count_limitorsize >= size_limittriggers a synchronous return fromadd. This matches the C++ KPL behaviour and keeps a batch that exactly meets the limit from sitting on the timer. - Cancellation-safe. Each timer lives in a per-timer
anyio.CancelScope; reschedules cancel the previous scope without taking down siblings. - Single-item-too-big. If one item alone exceeds the limits, the Reducer surfaces it as a one-item closed batch rather than silently dropping. Downstream (Kinesis) will reject with a clear error.
Cross-reference: aws/kinesis/core/reducer.h and
aws/kinesis/core/reducer.cc in the C++ KPL.
Aggregator¶
aiokpl.aggregator.Aggregator
owns one Reducer[_BufferedRecord, AggregatedBatch] per predicted
shard. The lookup is a dict[int | None, Reducer] guarded by an
anyio.Lock; creation is lazy on first put.
- Routing.
submit(user_record)computes the hash key (md5_hash_key(partition_key)orparse_explicit_hash_key) and asks the injectedShardMapfor the predicted shard. The buffered record is routed to that shard's reducer. - Single-record fallback. When the
ShardMapis notREADY(or returnsNonefor the hash key), the record routes to a catch-allNone-shard reducer in single-record mode (count_limit=1— every record flushes as its own batch). The same path is taken whenaggregation_enabled=False. This is what keeps the pipeline flowing on cold start before the firstListShardsreturns. - Size estimation. Each
AggregatedBatchmaintains a monotonic running total of proto framing overhead plus deduped partition-key and explicit-hash-key references. The exact byte count is computed only atto_blob()time. Mirrors theaccurate_size/estimated_sizedistinction inaws/kinesis/core/kinesis_record.cc. - Retry re-entry.
put_buffered(buffered)is the Retrier's re-injection point. The buffered record keeps its accumulatedAttempthistory and originalarrival_time; the predicted shard may differ from the previous attempt because the cachedShardMapmay have refreshed in between.
Cross-reference: aws/kinesis/core/aggregator.h.
Collector¶
aiokpl.collector.Collector is a
single Reducer[AggregatedBatch, PutRecordsBatch] with three closure
triggers:
- 500 records. Kinesis's
PutRecordscap. - 5 MiB total. Kinesis's per-call payload cap.
- 256 KiB per shard short-circuit. When any single shard's share of
the in-flight
PutRecordsBatchreaches 256 KiB, the batch is closed immediately even if the global caps are nowhere near. This is the per-shard latency guard: a hot shard cannot starve the rest of the batch by piling up bytes while the timer ticks.
The 256 KiB threshold is enforced via the Reducer's flush_predicate
hook — the predicate sees the just-added item and the current batch and
returns True when current.per_shard_bytes(item.predicted_shard) >=
256 KiB. Cross-reference: aws/kinesis/core/collector.h.
End-to-end shape¶
The same shape repeats one stage down: the Collector wraps a single
Reducer whose input is AggregatedBatch and whose output is
PutRecordsBatch. The Reducer code is reused verbatim — only the
container type and the flush predicate change.
Testing surface¶
- Unit. Reducer deadline ordering under sustained
adds, FIFO-by-deadline pack fairness onflush, single-item-too-big edge, cancellation under shutdown. All run on bothasyncioandtriovia the parametrisedanyio_backendfixture. - Aggregator. Per-shard routing correctness, single-record fallback
when the
ShardMapis notREADY, dedup-table accounting on the size estimator. - Collector. 256 KiB short-circuit, 5 MiB cap, 500-record cap, all three intersecting on the same input stream.
- Integration. Phase 6's
tests/integration/test_producer_integration.pydrives end-to-end againstetspaceman/kinesis-mockwith 100 random-partition-key records — exercises Aggregator and Collector in the same task tree as every other stage.