Getting started¶
Everything you need to put records into Kinesis with aiokpl — installation,
first program, the configuration knobs, the per-record outcome, metrics
wiring, and the troubleshooting tree for the questions that actually come up.
Installation¶
aiokpl requires Python 3.10+ and a recent pip.
# Base install — async Producer, SyncProducer, CloudWatchSink, InMemorySink.
pip install aiokpl
# With the OpenTelemetry sink.
pip install "aiokpl[otel]"
# With the Datadog sink.
pip install "aiokpl[datadog]"
# Everything (tests, lint, docs, both optional sinks). For contributors.
pip install "aiokpl[dev]"
The extras only gate optional metrics sinks; the producer itself is fully functional with the base install. Each extra in one sentence:
otel— pullsopentelemetry-apiandopentelemetry-sdk, unlocksaiokpl.sinks.opentelemetry.OpenTelemetrySink.datadog— pullsdatadog-api-client, unlocksaiokpl.sinks.datadog.DatadogSink.dev— everything contributors need: test runner, lint, type checker, docs toolchain, integration deps, both optional sinks.
aiobotocore is asyncio-only
The network edge (Producer, Sender, Retrier, CloudWatchSink) runs
on asyncio because aiobotocore is asyncio-only. The codec, ShardMap,
Reducer, Aggregator, Collector, Limiter, and TokenBucket are
backend-agnostic and the test suite exercises them on both asyncio and
trio.
Quick start (async)¶
import anyio
from aiokpl import Producer, Config
async def main():
cfg = Config(
region="us-east-1",
aggregation_enabled=True,
record_max_buffered_time_ms=100,
record_ttl_ms=30_000,
fail_if_throttled=False,
)
async with Producer(cfg) as producer:
outcome = await producer.put_record(
stream="my-stream",
partition_key="user-123",
data=b"hello",
)
result = await outcome.wait()
if result.success:
print(result.shard_id, result.sequence_number)
else:
print("failed:", result.attempts[-1].error_code)
anyio.run(main)
put_record() returns immediately with an Outcome; the
record itself is queued, aggregated with siblings going to the same shard,
rate-limited, and sent in the background. outcome.wait() blocks until the
record reaches a terminal state (success or final failure) and carries the
full attempt history.
Quick start (synchronous)¶
For Flask/Django request handlers, Celery tasks, plain scripts, or notebooks
that don't run an event loop, use SyncProducer. Same
shape as the async API; internally it owns an anyio BlockingPortal running
the async Producer on a single background thread.
from aiokpl import Config, SyncProducer
cfg = Config(region="us-east-1")
with SyncProducer(cfg) as producer:
outcome = producer.put_record(
stream="my-stream",
partition_key="user-123",
data=b"hello",
)
try:
result = outcome.wait(timeout=5.0)
except TimeoutError:
# The record didn't reach a terminal state within 5 seconds.
# Possible causes: backpressure, network stall, very large
# record_ttl_ms. Outcome is still tracked — try again or cancel.
outcome.cancel()
raise
if result.success:
print(result.shard_id, result.sequence_number)
put_record is thread-safe — you can call it from any OS thread.
outcome.wait(timeout=) and producer.flush(timeout=) both raise
TimeoutError if the deadline elapses; the in-flight Kinesis request is
not stopped, only the local handle is released.
Configuration¶
Every tunable lives on Config. It's a frozen dataclass —
construct once, hand to the producer, never mutate.
| Field | Default | One-line meaning |
|---|---|---|
region |
(required) | AWS region for the Kinesis client. |
endpoint_url |
None |
Override the Kinesis endpoint (LocalStack, kinesis-mock, VPC endpoint). |
verify_ssl |
True |
Set False for self-signed test certs. |
aws_access_key_id |
None |
Explicit credential; otherwise the default chain runs. |
aws_secret_access_key |
None |
Pair to the above. |
aws_session_token |
None |
Session credentials (STS, SSO). |
aggregation_enabled |
True |
Pack many user records into one Kinesis record (KPL wire format). |
aggregation_max_count |
4_294_967_295 |
Hard cap on user records per aggregated record. |
aggregation_max_size |
51_200 |
Bytes; the aggregated record stops growing past this size. |
record_max_buffered_time_ms |
100.0 |
Soft deadline an aggregator/collector batch waits for siblings. |
record_ttl_ms |
30_000.0 |
Hard expiration; once exceeded the record fails with Expired. |
collection_max_count |
500 |
Kinesis hard limit: 500 records per PutRecords. |
collection_max_size |
5_242_880 |
Kinesis hard limit: 5 MiB per PutRecords. |
rate_limit_records_per_sec_per_shard |
1_000.0 |
Kinesis hard limit, per shard. |
rate_limit_bytes_per_sec_per_shard |
1_048_576.0 |
1 MiB/s, per shard. |
drain_interval_ms |
25.0 |
How often the limiter polls its per-shard queues. |
fail_if_throttled |
False |
If True, throttled records fail immediately instead of retrying. |
retry_deadline_ms |
50.0 |
Bumps a record's batching deadline this much on retry. |
max_outstanding_records |
100_000 |
Backpressure cap; put_record blocks when saturated. |
metrics_level |
MetricsLevel.NONE |
NONE / SUMMARY / DETAILED. |
metrics_sink |
None |
Any MetricsSink; defaults to NullSink. |
metrics_upload_interval_ms |
60_000.0 |
How often the MetricsManager flushes to the sink. |
AWS endpoints. Leave endpoint_url / credentials at their defaults in
production — aiobotocore resolves credentials via the standard chain
(env vars, shared config, instance metadata). The overrides exist for tests
against kinesis-mock or LocalStack, and for VPC-private endpoints.
Aggregation. Defaults match the C++ KPL: aggregation on, capped at 50 KiB
per aggregated record. The high aggregation_max_count ceiling is intentional
— size and time deadlines are the actual brakes. Disable aggregation
(aggregation_enabled=False) only if your individual records are already
large (> ~100 KiB) or your consumer can't deaggregate.
Batching deadlines. record_max_buffered_time_ms = 100 is the C++ KPL
default and a good sweet spot: long enough to amortize a PutRecords round
trip over many records, short enough that latency-sensitive callers don't
notice. record_ttl_ms = 30_000 matches the C++ KPL — records that age past
that fail with Expired rather than blocking indefinitely.
Collector. collection_max_count and collection_max_size are Kinesis
hard limits, not preferences. They exist on Config so tests can shrink
them, not so production callers can change them.
Per-shard rate limiting. 1000 records/s + 1 MiB/s per shard are the
Kinesis hard caps. The limiter throttles predicted shards, not the stream
as a whole — that's the entire point of the shard-aware design. Lower the
records-per-second knob if you're sharing a shard with other producers and
want headroom.
Retrier policy. fail_if_throttled=False matches the C++ KPL: throttles
are treated as transient. Flip to True only if you'd rather see the failure
than spend the record's TTL retrying.
Backpressure. max_outstanding_records=100_000 is a soft memory cap.
put_record awaits an internal semaphore released by the retrier's terminal
callback; lower this if you're embedded in a process with tight memory.
Metrics. Default is NONE + NullSink — zero overhead, zero allocations.
Enable by setting both metrics_level and metrics_sink. See
Metrics below.
Working with the awaitable outcome¶
put_record returns an
Outcome[RecordResult] — a backend-agnostic one-shot value.
Outcome.wait() blocks until the pipeline classifies the record as terminal
(success or final failure).
The terminal value is a RecordResult:
@dataclass(slots=True, frozen=True)
class RecordResult:
success: bool
shard_id: str | None
sequence_number: str | None
attempts: tuple[Attempt, ...]
Each Attempt carries started_at / ended_at, success,
and either (shard_id, sequence_number) on success or
(error_code, error_message) on failure.
Happy path¶
outcome = await producer.put_record(
stream="events", partition_key="u-1", data=b"payload"
)
result = await outcome.wait()
assert result.success
print(result.shard_id, result.sequence_number)
Failure path¶
result = await outcome.wait()
if not result.success:
last = result.attempts[-1]
print(f"final failure: {last.error_code} — {last.error_message}")
print(f"retried {len(result.attempts)} time(s)")
for i, a in enumerate(result.attempts):
kind = "OK" if a.success else f"{a.error_code}"
print(f" attempt {i}: {kind} ({a.ended_at - a.started_at:.3f}s)")
Common terminal error_code values: ProvisionedThroughputExceededException
(after final retry, when fail_if_throttled=True), Expired (TTL elapsed
in the limiter or aggregator), Wrong Shard (split-driven; the record was
re-enqueued and a later attempt won), Record Count Mismatch (Kinesis
returned a per-record list of the wrong length — the whole batch fails).
Submitting many records concurrently¶
The producer is built for fan-in. Stuff records in from many tasks; the aggregator groups them by predicted shard.
import anyio
async def push(producer, i):
outcome = await producer.put_record(
stream="events",
partition_key=f"user-{i}",
data=f"event-{i}".encode(),
)
return await outcome.wait()
async with Producer(cfg) as producer:
async with anyio.create_task_group() as tg:
results: list = [None] * 10_000
async def one(i):
results[i] = await push(producer, i)
for i in range(10_000):
tg.start_soon(one, i)
failed = sum(1 for r in results if not r.success)
print(f"{10_000 - failed}/{10_000} succeeded")
Because put_record only blocks on the backpressure semaphore, fan-out
scales until you hit either max_outstanding_records or the per-shard rate
limit.
Metrics: pluggable sinks¶
The library knows nothing about CloudWatch, Datadog, or OpenTelemetry. It
emits semantic events to a MetricsSink you plug in.
Sink design rationale lives in Custom sinks.
Two knobs together turn metrics on:
from aiokpl import Config, MetricsLevel
cfg = Config(
region="us-east-1",
metrics_level=MetricsLevel.SUMMARY, # or DETAILED
metrics_sink=..., # one of the sinks below
)
NullSink (default)¶
Zero overhead, zero allocations. Used automatically when metrics_sink=None.
from aiokpl import Config, NullSink
cfg = Config(region="us-east-1", metrics_sink=NullSink()) # equivalent to None
InMemorySink (test + embed)¶
Captures every exported snapshot batch in process. Handy in tests and in embedded scenarios where the host already publishes its own metrics.
from aiokpl import Config, InMemorySink, MetricsLevel, Producer
sink = InMemorySink()
cfg = Config(region="us-east-1", metrics_level=MetricsLevel.SUMMARY,
metrics_sink=sink)
async with Producer(cfg) as producer:
... # exercise the producer
# Inspect after shutdown:
for snapshot in sink.by_name("UserRecordsPut"):
print(snapshot.sum, snapshot.dimensions)
CloudWatchSink (production AWS)¶
Bundled — no extra needed, since aiobotocore is already a runtime dep.
from aiokpl import CloudWatchSink, Config, MetricsLevel, Producer
sink = CloudWatchSink(region="us-east-1", namespace="myapp/kinesis")
cfg = Config(
region="us-east-1",
metrics_level=MetricsLevel.SUMMARY,
metrics_sink=sink,
metrics_upload_interval_ms=60_000,
)
async with Producer(cfg) as producer:
...
Metric names match the C++ KPL constants verbatim
(UserRecordsReceived, UserRecordsPut, KinesisRecordsPut,
BufferedTime, RequestTime, RetriesPerRecord, ErrorsByCode,
UserRecordsPending, …) — existing C++ KPL dashboards keep working.
OpenTelemetrySink (gated by [otel])¶
# pip install "aiokpl[otel]"
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from aiokpl import Config, MetricsLevel, OpenTelemetrySink, Producer
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://collector:4317", insecure=True),
)
metrics.set_meter_provider(MeterProvider(metric_readers=[reader]))
sink = OpenTelemetrySink(instrument_prefix="aiokpl.")
cfg = Config(
region="us-east-1",
metrics_level=MetricsLevel.SUMMARY,
metrics_sink=sink,
)
async with Producer(cfg) as producer:
...
Counts become OTel Counters, distributions become Histograms, gauges
become UpDownCounters. The sink only emits instruments — exporters,
batching, and resource attribution live on your OTel SDK configuration.
DatadogSink (gated by [datadog])¶
# pip install "aiokpl[datadog]"
# Reads DD_API_KEY / DD_APP_KEY from the environment by default.
import os
from aiokpl import Config, DatadogSink, MetricsLevel, Producer
sink = DatadogSink(
api_key=os.environ["DD_API_KEY"],
site="datadoghq.com",
metric_prefix="aiokpl.",
)
cfg = Config(
region="us-east-1",
metrics_level=MetricsLevel.SUMMARY,
metrics_sink=sink,
)
async with Producer(cfg) as producer:
...
Counts map to Datadog count, distributions to Datadog distribution,
gauges to Datadog gauge.
Multi-stream usage¶
One Producer handles every stream. Per-stream pipelines (ShardMap,
Aggregator, Limiter, Collector, Sender, Retrier) are created lazily the
first time you call put_record(stream=...) for a new stream name; they
share the same aiobotocore Kinesis client (one HTTP connection pool).
async with Producer(cfg) as producer:
await producer.put_record(stream="events", partition_key="u1", data=b"...")
await producer.put_record(stream="audit", partition_key="u1", data=b"...")
await producer.put_record(stream="metrics", partition_key="u1", data=b"...")
There's no streams= constructor argument and no need to declare them
upfront — the first record for a stream creates the pipeline, and the
producer's __aexit__ tears every pipeline down.
Graceful shutdown¶
async with Producer(cfg) as producer: is the only correct way to drive the
producer. The context manager guarantees:
- On entry:
aiobotocoresession opens, the background task group starts, the metrics manager spins up its flush task. - On exit:
flush()runs — every aggregator, limiter, and collector drains into the network — then the task group quiesces, the metrics manager uploads one last batch, and theaiobotocoreclient closes.
flush() is exposed for callers that want to drain without tearing the
producer down (typical in long-lived services at checkpoint time):
async with Producer(cfg) as producer:
for record in batch:
await producer.put_record(stream="events",
partition_key=record.key,
data=record.payload)
await producer.flush()
# producer is still alive; you can keep going.
Don't forget to await
Forgetting to use the async with (or with for SyncProducer) leaks
every per-stream pipeline's task group and the aiobotocore HTTP pool.
Producer.__init__ is cheap and pure — every side effect lives in
__aenter__.
Patterns¶
A short tasting menu of the levers you'll actually pull.
Backpressure tuning¶
# Tight memory? Drop the cap. put_record will block when saturated; the
# producer surfaces backpressure as latency, never as silent drops.
cfg = Config(region="us-east-1", max_outstanding_records=10_000)
You can inspect producer.outstanding_records at any time for a live count.
Time-bounded delivery¶
# 25 ms batching window (low-latency dashboards), but tolerate up to 5 s of
# retries before giving up.
cfg = Config(
region="us-east-1",
record_max_buffered_time_ms=25,
record_ttl_ms=5_000,
)
Lowering record_max_buffered_time_ms shrinks the aggregator/collector
deadlines uniformly; lowering record_ttl_ms makes the retrier give up
sooner on stragglers.
Throttle policy¶
# Treat every ProvisionedThroughputExceededException as terminal.
cfg = Config(region="us-east-1", fail_if_throttled=True)
Useful for batch jobs that prefer failing fast (so an upstream queue can back off) over filling the record's TTL with retries.
Aggregation off¶
# Records already > 100 KiB each — aggregation overhead beats its benefit.
cfg = Config(region="us-east-1", aggregation_enabled=False)
The Aggregator falls back to single-record mode: one user record per Kinesis record, original partition key preserved. The Collector still batches and the Limiter still rate-limits.
Troubleshooting¶
"I get RuntimeError: Producer is closed"¶
You called put_record after __aexit__ ran (e.g. outside the async with
block). The producer is single-use — wrap your work in async with.
"All my records are going to one shard"¶
Your partition keys lack entropy. aiokpl predicts the shard from
md5(partition_key) — if you pass the same key (or a small set of keys),
records concentrate on the same predicted shard and you hit the 1000 rec/s
/ 1 MiB/s limit. Fix: use a per-record key (user id, event id, hash of the
payload). If you must control routing yourself, pass explicit_hash_key= —
the producer uses that integer directly.
"Throughput is lower than I expected"¶
Three usual suspects, in order:
- Aggregation is off when it didn't have to be — flip
aggregation_enabled=True. With small records (< 1 KiB), aggregation pushes per-PutRecordsthroughput up by 10–50×. - Shard count is the cap. Kinesis is 1000 rec/s + 1 MiB/s per shard. Predicted shards that route to the same physical shard share that budget. Add shards or improve key entropy.
max_outstanding_recordsis too low. Ifput_recordcalls are blocking on the semaphore, raise it.
"Records are timing out"¶
Look at result.attempts[-1].error_code == "Expired". The record aged past
record_ttl_ms before the Sender could push it. Either raise record_ttl_ms,
add shards, or reduce upstream rate.
"Outcomes never resolve"¶
The most common cause: you exited the async with block without awaiting
the outcomes. Once __aexit__ returns, the background task group is gone
and no further classification happens. Either await outcome.wait() inside
the block, or hold an asyncio.gather/anyio.create_task_group over all
outcomes before exiting.
"OTel/Datadog sink import error"¶
ImportError: OpenTelemetrySink requires … or the Datadog equivalent. You
installed the base aiokpl without the extra. Fix:
"kinesis-mock works in tests but real AWS doesn't"¶
Three things differ between the emulator and prod:
- Credentials. Real AWS expects a valid principal; the default chain needs an env var, profile, IRSA role, or instance metadata. The test harness uses hard-coded fakes.
- Region.
kinesis-mockignores it; real AWS does not. Match the region of your stream. endpoint_url. Leave itNoneagainst real AWS; it only exists for emulators and VPC endpoints.
"Memory grows over time"¶
A leak typically points at max_outstanding_records. If a record never
reaches a terminal state (network black hole, no flush(), very large
record_ttl_ms), its Outcome stays in the pending dict. Either lower
record_ttl_ms so stuck records expire, or lower
max_outstanding_records so the semaphore caps the queue length.