Phase 6 — Producer + lifecycle (v0.1)¶
Status: Done. First usable release.
What ships¶
aiokpl.producer.Producer— top-level orchestrator.async with Producer(config)lifecycle, per-stream pipelines composed lazily on firstput_record, single sharedaiobotocoreKinesis client owned by anAsyncExitStack.aiokpl.config.Config— frozen dataclass with every tunable: aggregation, deadlines, rate limits, retry policy, backpressure, AWS endpoint overrides.aiokpl.outcome.Outcome— anyio-friendly one-shot value-bearing event. Replacesasyncio.Futurefor cross-backend portability — works on bothasyncioandtrio.put_recordreturns one; the callerawaits it for the terminalRecordResult.
aiobotocore is asyncio-only, so the public Producer API is asyncio-only.
Phases 1–4 remain backend-agnostic and continue to be tested on both
backends.
Pipeline composition¶
Each stream gets its own _StreamPipeline wired through six private
callbacks, one per stage transition:
UserRecord
│ Producer.put_record()
▼
Aggregator ──on_batch_ready──► Limiter.put
▼
Limiter ──on_admit────────► Collector.put
──on_expired───────► synthesize SendOutcome("Expired") ► Retrier.handle
▼
Collector ──on_batch_ready──► tg.start_soon(Sender.send + Retrier.handle)
▼
Sender ─ aiobotocore.put_records ─► Kinesis
▼
Retrier ──on_finish────────► Outcome.set_value(RecordResult), release semaphore
──on_retry─────────► Aggregator.put_buffered
The dispatch from Collector to Sender is tg.start_soon-ed in the
Producer's background task group so collecting more records is not blocked
on the network round-trip. This mirrors the C++ KPL comment in
pipeline.h:206 about not hammering downstream from SDK callback threads.
Expired-batch unification¶
When the Limiter's TTL elapses before tokens become available, it calls
on_expired(batch, reason). The Producer synthesizes a SendOutcome with
request_error=("Expired", reason) and routes it through Retrier.handle.
That keeps a single classification code path: every record's attempt list
and terminal RecordResult is assembled the same way whether the failure
came from the wire or from the rate limiter.
Backpressure¶
Config.max_outstanding_records caps the number of records in the pipeline
between put_record and outcome resolution. The Producer's
anyio.Semaphore is acquired on submit and released by the Retrier's
on_finish (the terminal callback). When saturated, put_record blocks —
the user gets bounded memory by default.
Get started¶
import anyio
from aiokpl import Producer, Config
async def main() -> None:
cfg = Config(
region="us-east-1",
aggregation_enabled=True,
record_max_buffered_time_ms=100,
record_ttl_ms=30_000,
fail_if_throttled=False,
)
async with Producer(cfg) as producer:
outcome = await producer.put_record(
stream="my-stream",
partition_key="user-123",
data=b"hello",
)
result = await outcome.wait()
if result.success:
print(result.shard_id, result.sequence_number)
else:
print("failed:", result.attempts[-1].error_code)
anyio.run(main)
Coverage¶
100% line + branch on every module in aiokpl/. Producer is exercised
end-to-end against etspaceman/kinesis-mock with 100 random-partition-key
records (tests/integration/test_producer_integration.py).