31.2 Producer and Consumer APIs: PutRecord, GetRecords, and Enhanced Fan-Out
Alright, let’s talk about getting data in and out of Kinesis. This is where the rubber meets the road, or more accurately, where your events meet the stream. The API surface here is deceptively simple, which is both a blessing and a curse. A blessing because you can get started in minutes; a curse because the real devil is in the details of scaling, error handling, and not accidentally setting your wallet on fire with the bill for Enhanced Fan-Out.
The core operations are brutally straightforward: you PutRecord (or a batch of them with PutRecords) to add data, and you GetRecords to, well, get it. That’s the simple story. The complex story is everything that happens around that.
The Humble PutRecord and Its Smarter Sibling PutRecords
You could use PutRecord to send one event at a time. Please, for the love of all that is holy, don’t do this in production unless you have a phenomenally low throughput requirement. You’re paying per API call, and the latency of a network roundtrip for every single record is a recipe for sadness and poor performance.
The designers got this one right. You almost always want PutRecords. It’s a batch operation that can take up to 500 records per request. This is your number one tool for achieving high throughput and keeping your costs sane.
import boto3
import json
client = boto3.client('kinesis')
# Imagine we have a list of events we want to send
events = [{'user_id': i, 'action': 'click'} for i in range(100)]
# Prepare the records for the batch put
records = [
{
'Data': json.dumps(event), # Data must be bytes. JSON.dumps -> encode.
'PartitionKey': str(event['user_id']) # This determines which shard it goes to!
}
for event in events
]
# Send them in one shot
response = client.put_records(Records=records, StreamName='my-click-stream')
# But wait! You MUST check for failures.
# The response contains a per-record success/failure list.
if response['FailedRecordCount'] > 0:
for i, record in enumerate(response['Records']):
if 'ErrorCode' in record:
print(f"Record {i} failed: {record['ErrorCode']} - {record.get('ErrorMessage')}")
# You need a retry strategy for these failed records.
Why the PartitionKey? This is the most important concept. Kinesis uses this key to decide which shard in your stream gets the record. All records with the same key go to the same shard, and are guaranteed to be read in the order they were written. If you want all events for user_id=123 to be processed in order, you must use the same partition key for all of them. Don’t just use a random key unless you truly don’t care about ordering—it’s a common rookie mistake.
The Workhorse: GetRecords
On the other side, consumers use GetRecords. You first get a shard iterator—a pointer to a position in a shard—and then you repeatedly call GetRecords with that iterator to get your data. The iterator advances automatically after each call.
The catch? This is a pull model. You’re constantly polling. And the latency and overhead of these calls can limit your maximum throughput per consumer. You also have to manage the shard iterators yourself, especially when shards split or merge. It’s functional, but it’s a bit like using a manual transmission in traffic—effective, but it requires your constant attention.
# First, get a shard iterator. Here we get a LATEST one, starting at the end of the stream.
shard_iterator_response = client.get_shard_iterator(
StreamName='my-click-stream',
ShardId='shardId-000000000000', # You have to know which shard you're reading from!
ShardIteratorType='LATEST'
)
shard_iterator = shard_iterator_response['ShardIterator']
# Now, loop and get records. In real life, this would be a long-running process.
while shard_iterator:
records_response = client.get_records(ShardIterator=shard_iterator, Limit=10000)
records = records_response['Records']
shard_iterator = records_response['NextShardIterator'] # Critical: get the next iterator!
for record in records:
data = json.loads(record['Data'].decode('utf-8'))
# ... process your record ...
# Be kind to your wallet and the API. Don't hammer this loop.
# If there are no records, the response will be empty and you should sleep a bit.
if not records:
time.sleep(1)
The Luxury Sedan: Enhanced Fan-Out (EFO)
Here’s where AWS addressed the biggest pain point of GetRecords. What if you have multiple applications that need to read the same stream at high speed, but they all can’t keep up because they’re fighting over the pull requests? Or what if you need super low latency?
Enter Enhanced Fan-Out. It’s a completely different paradigm. Instead of you pulling, you register a consumer and Kinesis pushes records to you over an HTTP/2 connection. It’s fantastic. Latency plummets because you’re not waiting for the next poll. Throughput per consumer can be much higher. Each consumer gets its own read throughput, so they don’t interfere with each other.
The downside? It costs extra. Of course it does. You’re essentially renting a dedicated line from each shard to each consumer. It’s worth every penny for latency-sensitive applications, but for background processing, standard GetRecords might be fine.
# Register a consumer (do this once, probably outside your main loop)
consumer_response = client.register_stream_consumer(
StreamARN='arn:aws:kinesis:us-east-1:123456789012:stream/my-click-stream',
ConsumerName='my-high-speed-app'
)
consumer_arn = consumer_response['Consumer']['ConsumerARN']
# Later, to start reading, get a shard iterator from the CONSUMER endpoint.
# Notice the *boto3 client is for the specific consumer ARN*, not the general Kinesis client.
consumer_client = boto3.client('kinesis', region_name='us-east-1') # Client for consumer operations
shard_iterator_response = consumer_client.get_shard_iterator(
ConsumerARN=consumer_arn,
ShardId='shardId-000000000000',
ShardIteratorType='LATEST'
)
# The subsequent get_records calls are the same, but now they're against the consumer client
# and are using that dedicated, high-speed push connection.
The key takeaway? Use PutRecords and mind your partition keys. Start with standard GetRecords for simplicity and cost. The moment you need high performance or multiple independent consumers, bite the bullet and use Enhanced Fan-Out. It feels like magic after wrestling with the pull API.