FastAPI producer¶
A common use case for aiokpl is a REST front-end that accepts events from
clients (web app, mobile, server-to-server webhooks) and forwards them to a
Kinesis data stream. FastAPI is the natural pairing: both are
asyncio-native, both lean hard on dataclasses, and FastAPI's lifespan
context manager is exactly the shape Producer.__aenter__ was designed
for.
The full example lives at examples/fastapi_producer.py in the
repo. This page walks through the design decisions.
Two endpoints, two patterns¶
REST producers usually want one of two semantics — and most real apps expose both:
| Semantics | Endpoint | When to use |
|---|---|---|
| Confirmed — the client waits until Kinesis has the record | POST /events/confirmed → 200 with shard_id/sequence_number |
Webhooks. Anything where the caller will retry on non-2xx and you want at-least-once with proof. |
| Fire-and-forget — return immediately, log failures server-side | POST /events → 202 Accepted |
Hot ingest. Mobile telemetry. Anything where latency dominates and the upstream can't usefully retry. |
Both endpoints use the same Producer instance. There's no "async
producer" vs "fast producer" — the only difference is whether the request
handler awaits the Outcome or hands it to a background task.
Lifespan: one Producer per app¶
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
cfg = Config(
region=os.environ.get("AIOKPL_REGION", "us-east-1"),
endpoint_url=os.environ.get("AIOKPL_ENDPOINT_URL") or None,
record_max_buffered_time_ms=100.0,
record_ttl_ms=30_000.0,
max_outstanding_records=10_000,
)
async with Producer(cfg) as producer:
app.state.producer = producer
app.state.stream = os.environ["AIOKPL_STREAM"]
yield
app = FastAPI(lifespan=lifespan)
async with Producer(cfg) opens the aiobotocore Kinesis client, spawns the
per-stage background tasks (aggregator timer, limiter drain loop,
shard-map refresh), and on exit drains in-flight records before tearing
the client down. FastAPI invokes the post-yield code on a graceful
shutdown signal, so SIGTERM from your orchestrator triggers a clean flush
— not a record dropped.
One Producer handles every stream
The Producer creates per-stream pipelines lazily on the first
put_record(stream=…). You don't need (and shouldn't have) one
Producer per stream — that wastes connections and breaks the
backpressure semaphore.
Confirmed mode: await outcome.wait()¶
@app.post("/events/confirmed", response_model=EventConfirmed)
async def post_event_confirmed(evt: EventIn) -> EventConfirmed:
producer: Producer = app.state.producer
outcome = await producer.put_record(
stream=app.state.stream,
partition_key=evt.partition_key,
data=_serialize(evt.payload),
)
result = await outcome.wait()
if not result.success:
last = result.attempts[-1] if result.attempts else None
raise HTTPException(
status_code=502,
detail={
"error_code": last.error_code if last else "Unknown",
"error_message": last.error_message if last else "",
"attempts": len(result.attempts),
},
)
return EventConfirmed(
shard_id=result.shard_id or "",
sequence_number=result.sequence_number or "",
attempts=len(result.attempts),
)
Two awaits worth noticing:
await producer.put_record(...)returns theOutcomequickly — it only blocks if the backpressure semaphore is full. The record is now in the pipeline.await outcome.wait()blocks until the Retrier marks the record done (success or terminal failure). Withrecord_max_buffered_time_ms=100and a healthy stream this resolves in 100-200 ms.
On failure we return 502 Bad Gateway — Kinesis (the upstream) rejected. The body carries the full attempt history so an operator can tell a throttle from an expired record from a transient AWS hiccup.
Long-tail latency
A single record's outcome includes all retries. A persistently
throttled shard can keep it in the pipeline up to record_ttl_ms
(default 30 s). If your confirmed clients have a tight HTTP timeout,
drop record_ttl_ms to match, or use fire-and-forget.
Fire-and-forget: BackgroundTasks + outcome observer¶
@app.post("/events", status_code=202, response_model=EventAccepted)
async def post_event(evt: EventIn, bg: BackgroundTasks) -> EventAccepted:
producer: Producer = app.state.producer
outcome = await producer.put_record(
stream=app.state.stream,
partition_key=evt.partition_key,
data=_serialize(evt.payload),
)
bg.add_task(_log_failure_if_any, outcome, evt.partition_key)
return EventAccepted()
async def _log_failure_if_any(outcome: Outcome[RecordResult], partition_key: str) -> None:
result = await outcome.wait()
if not result.success:
last = result.attempts[-1] if result.attempts else None
log.error(
"aiokpl record failed pk=%s code=%s message=%s attempts=%d",
partition_key, last.error_code if last else "Unknown",
last.error_message if last else "", len(result.attempts),
)
The pattern:
await put_recordto enqueue.- Register a background task that will
await outcome.wait()after the response is sent. - Return 202 immediately.
A few details that matter:
BackgroundTasksruns the function after the response. Theawaitinside_log_failure_if_anyhappens off the request's critical path.- Outcomes are never dropped silently. Even though the client gets a 202, a failure shows up in your structured logs with the partition key. Pipe those logs into your existing alerting and you have observability without coupling the client to Kinesis.
- Backpressure is not bypassed. If
max_outstanding_recordsis saturated,put_recordstill blocks. The client experiences this as request latency, not as a 5xx. That's the right tradeoff — better to apply load shedding upstream than to silently drop records.
Health endpoint¶
@app.get("/health")
async def health() -> dict[str, int | bool]:
producer: Producer = app.state.producer
return {"ok": True, "outstanding_records": producer.outstanding_records}
producer.outstanding_records is the live count of records inside the
pipeline. Plot it. When it approaches max_outstanding_records you're
backpressuring — Kinesis is your bottleneck and either the stream needs
more shards or your downstream consumer is falling behind.
Configuration knobs worth tuning per workload¶
| Knob | Default | Bump if… |
|---|---|---|
record_max_buffered_time_ms |
100 | You want lower latency in confirmed mode — drop to 20-50. Or higher throughput on bursty traffic — raise to 250. |
record_ttl_ms |
30_000 | Confirmed clients have tight timeouts. Match it. |
max_outstanding_records |
100_000 | Memory pressure visible in outstanding_records near the cap. Raise it, OR add shards. |
aggregation_enabled |
True |
Your records are already > 50 KB each — aggregation overhead doesn't pay. Set to False. |
fail_if_throttled |
False |
You'd rather surface throttle errors to the client immediately than retry silently. Set to True. |
The full table lives in Configuration.
Running locally against kinesis-mock¶
# 1. Spin up kinesis-mock
docker run -d --name kinesis-mock -p 4567:4567 -p 4568:4568 \
ghcr.io/etspaceman/kinesis-mock:0.5.2
# 2. Create the stream
AWS_ACCESS_KEY_ID=t AWS_SECRET_ACCESS_KEY=t AWS_DEFAULT_REGION=us-east-1 \
aws --endpoint-url https://localhost:4567 --no-verify-ssl \
kinesis create-stream --stream-name events --shard-count 4
# 3. Run the app
AIOKPL_REGION=us-east-1 \
AIOKPL_STREAM=events \
AIOKPL_ENDPOINT_URL=https://localhost:4567 \
AIOKPL_VERIFY_SSL=false \
AWS_ACCESS_KEY_ID=t AWS_SECRET_ACCESS_KEY=t \
uvicorn examples.fastapi_producer:app --reload
# 4. Push an event
curl -X POST http://127.0.0.1:8000/events/confirmed \
-H 'content-type: application/json' \
-d '{"partition_key":"user-42","payload":{"event":"signup"}}'
What's not in this example (yet)¶
- Authentication / authorization. Add your usual FastAPI dependencies.
- Schema versioning. The example serializes the request payload as JSON verbatim — in production you'd want a versioned envelope.
- Metrics. Pair the producer with a
MetricsSink(see Custom sinks). For a FastAPI app,OpenTelemetrySinkcomposes naturally with FastAPI's OTel instrumentation. - Sync workers. If part of your stack is sync (Celery, RQ),
SyncProduceris a drop-in.