30.7 EventBridge Pipes: Point-to-Point Event Streaming with Enrichment
Alright, let’s talk about EventBridge Pipes. You’re probably looking at the AWS console, seeing yet another service, and thinking, “Great, another way to wire things together. How is this different from just slapping a Lambda between an SQS queue and a DynamoDB table?”
I hear you. But stick with me, because Pipes are one of those rare AWS features that genuinely reduces complexity instead of adding to it. Think of them as purpose-built, point-to-point plumbing for your events. They take a source, optionally filter and enrich the messages, and then shove them into a target. No routing nonsense, no fan-out. Just a straight pipe. It’s the service you use when you realize you’ve been using a full-featured orchestra (EventBridge Buses) to play a single note.
The magic is in its simplicity and cost. A standard EventBridge bus charges you per event. A Pipe charges you based on the duration of its enrichment step (if you use one) and the number of invocations of its target. For high-volume, point-to-point streams, this is often dramatically cheaper.
The Core Components: It’s a Three-Part Harmony
Every Pipe is built from three key parts, and you can’t skip the quiz on this.
Source: This is where your events come from. As of now, you can use DynamoDB Streams, Kinesis Streams, SQS Queues (both FIFO and Standard), and Amazon MQ. This is the well. You’re drawing water from it.
Enrichment (Optional): This is the secret sauce. Before the event reaches its target, you can intercept it and do something. This isn’t a full transformation step; it’s an enrichment. You call a Lambda function or an API Gateway, and that service adds information to the event before passing it along. It doesn’t filter. It doesn’t reformat. It appends. Think of it as adding a delivery instruction to a package without changing the address.
Target: This is where the enriched (or not) event ends up. The list here is long: practically every AWS service you’d want to send an event to, including EventBridge itself if you’re feeling meta.
Why It Beats a DIY Lambda
“So why not just write a Lambda consumer that reads from the source and writes to the target?” Excellent question. You absolutely could. But then you’d have to write code to handle batching, partial failures, retries with backoff, and checkpointing. It’s a weekend project that turns into a month-long maintenance nightmare.
EventBridge Pipes does all that grunt work for you. It handles batching from your stream source, it automatically retries failed enrichments or target deliveries, and it manages the checkpointing to ensure you don’t miss an event or process it twice. It’s the difference between building your own car from scratch and just turning the key in one that already works.
Let’s Build a Pipe: A Practical Example
Let’s say we have a DynamoDB table (UsersTable) that streams new items. We want to take any new user who signs up from a specific city and send a welcome message to an SNS topic, but we need to enrich the event first by calling a third-party API to get their local weather.
First, the enrichment Lambda. Notice how it takes the event, does its thing, and returns the entire event, now with new data grafted onto it.
# enrichment_lambda.py
import json
import os
import requests
def lambda_handler(event, context):
# The event is the record from the DynamoDB Stream
for record in event['records']:
# Parse the DynamoDB JSON format
new_image = record['dynamodb']['NewImage']
city = new_image['city']['S']
# Call some silly weather API
api_key = os.environ['WEATHER_API_KEY']
response = requests.get(f'http://api.weatherapi.com/v1/current.json?key={api_key}&q={city}&aqi=no')
weather_data = response.json()
# This is the key part: we add the enrichment data to the record itself.
# The Pipe will pass this whole enriched structure to the target.
record['weatherEnrichment'] = {
'temp_c': weather_data['current']['temp_c'],
'condition': weather_data['current']['condition']['text']
}
# Return the entire modified event
return event
Now, the CloudFormation to wire it all up. This is where you see the elegance.
Resources:
EnrichmentLambda:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Runtime: python3.9
Code:
ZipFile: |
# ... the code from above ...
Environment:
Variables:
WEATHER_API_KEY: 'your-api-key-here'
WelcomePipe:
Type: AWS::Pipes::Pipe
Properties:
Name: UserSignupWeatherPipe
RoleArn: !GetAtt PipeRole.Arn # A role with permissions for source, enrichment, target
Source: !GetAtt UsersTable.StreamArn
Target: !Ref WelcomeTopic
TargetParameters:
InputTemplate: |
{
"userId": <$.dynamodb.NewImage.userId.S>,
"city": <$.dynamodb.NewImage.city.S>,
"message": "Welcome! The weather in <$.dynamodb.NewImage.city.S> is <$.weatherEnrichment.condition> and <$.weatherEnrichment.temp_c>°C."
}
Enrichment: !GetAtt EnrichmentLambda.Arn
# Filtering: Only process records for London
FilterCriteria: |
{
"filters": [
{
"pattern": "{ \"dynamodb\": { \"NewImage\": { \"city\": { \"S\": [ \"London\" ] } } } }"
}
]
}
WelcomeTopic:
Type: AWS::SNS::Topic
The Sharp Edges and “Oh, Come On” Moments
It’s not all sunshine and enriched events. Pipes have their quirks.
- The Enrichment Contract: Your enrichment Lambda must return the entire event structure in the exact same format it received it. If your Lambda does a simple transformation and returns a completely different object, the Pipe will vomit an error. Its job is to enrich, not transform. For transformation, you use the
TargetParameters.InputTemplate,` as I did above. - Filtering is… Limited: You filter at the Pipe level, not in the enrichment. This is both good (cheaper, you don’t pay for Lambda invocations on filtered events) and bad. The filtering syntax is the same as EventBridge, which is based on JSON, but trying to write a correct filter for a DynamoDB stream event will make you question all your life choices. The structure is deeply nested and unintuitive. Use the AWS console to test your patterns first; it will save you hours.
- No Direct Dead-Letter Queue: If your target fails repeatedly, the event does not go to a DLQ. Instead, the Pipe will retry until it succeeds or until the event expires from its source. For Kinesis and DynamoDB Streams, this means the shard iterator will advance and you could lose the event. Your mitigation is to… wait for it… make your target a Lambda that handles its own failures. I know. The irony is not lost on me. Always ensure your target is idempotent and can handle duplicate events, because retries will happen.
The bottom line? Use EventBridge Pipes when you have a clear, high-volume, point-to-point stream to process. It offloads the undifferentiated heavy lifting of stream processing and is often cheaper than the alternatives. Just be prepared to wrestle with the filter criteria and remember: enrich, don’t transform.