Phase 5 — Sender + Retrier¶
Status: Done.
What ships¶
aiokpl.sender— sends aPutRecordsBatchto Kinesis via the duck-typed_KinesisClientProtocol (aiobotocore in production, fakes in tests). Captures timing, classifies request-level errors and count mismatches, surfaces per-record outcomes aligned with the input batch.aiokpl.retrier— implements the classification table below. Distinguishes throttle, transient, wrong-shard, and expired outcomes. Calls back into the Aggregator (on_retry) or the user-facing future resolver (on_finish).aiokpl.result—AttemptandRecordResultdataclasses (frozen, slotted) carrying the full attempt history a caller sees on the user-facing future.
aiobotocore is now a runtime dependency; the integration extra keeps
only docker + urllib3. aiobotocore is asyncio-only — Sender/Retrier
lose trio support, while Phase 4 and below stay backend-agnostic.
Pipeline placement¶
PutRecordsBatch
│
▼
Sender ─── aiobotocore.put_records ──► Kinesis
│
▼
SendOutcome (timing + per-record slots + batch_items)
│
▼
Retrier ── classify ── on_finish(buffered, RecordResult)
└─ on_retry(buffered) ─► Aggregator.put_buffered
SendOutcome.batch_items is always populated so the Retrier can iterate
the original AggregatedBatch list even on the request-error path where
per_record is empty.
Classification table¶
The Retrier mirrors aws/kinesis/core/retrier.cc in the C++ KPL.
| Outcome | Action |
|---|---|
Request-level error, code is ProvisionedThroughputExceededException, fail_if_throttled=True |
fail every UR with that code/msg |
Request-level error, any other code or fail_if_throttled=False |
retry every UR |
Per-record success, predicted == actual or predicted is None |
on_finish(success) |
| Per-record success, hash key lies inside actual shard's range (child after split) | on_finish(success) + ShardMap.invalidate |
| Per-record success, hash key lies outside actual shard's range | on_retry("Wrong Shard") + ShardMap.invalidate |
Per-record failure, code is ProvisionedThroughputExceededException, fail_if_throttled=True |
fail |
Per-record failure, any other code or fail_if_throttled=False |
retry |
retry_not_expired body, in order:
- Append
Attempt(success=False, code, message)to the buffered record'sattemptslist. - If
clock() - arrival_time > record_ttl, append a secondAttempt(code="Expired")and callon_finishwith a failedRecordResult— noon_retry. - Otherwise bump the buffered record's deadline by
retry_deadline_ms / 2(matchesretrier.cc:160) and callon_retry(buffered). The Aggregator'sput_bufferedre-enqueues the same_BufferedRecordso the attempt history andarrival_timesurvive across retries.
For multi-record aggregated batches, the AR-level invalidate fires
exactly once even though each UserRecord is classified individually
(matches retrier.cc:108-115).
Attempt history¶
Every trip through Sender + Retrier appends one Attempt. The terminal
RecordResult snapshots attempts as an immutable tuple. Callers
receive the full history through RecordResult.attempts, which lets
them tell a single-try transient error apart from a 30-second-long
throttle storm.
Sender error mapping¶
botocore.exceptions.ClientErroris surfaced as(response["Error"]["Code"], response["Error"]["Message"]).- Any other exception (including non-
ExceptionBaseException) is caught and surfaced as("Internal", str(exc))— matches the C++ retrier's "Internal" code for unclassifiable service errors. len(response["Records"]) != batch.countbecomes("RecordCountMismatch", "...")and treats every UR as a request- level failure. Matchesretrier.cc:170-180.