A dive into AWS EventBridge Pipes
Birds eye view
EventBridge Pipes was announced at re:Invent 2022 in Werner Vogels’ keynote. It’s pitched as a new way to make integrating AWS services easier. The goal is that it reduces or removes the amount of ‘glue’ code required to take messages from an event producer to an event consumer.
Let’s imagine the requirement of an e-commerce application where new orders can be put onto an SQS queue, and later processed asynchronously by a Step Functions workflow.
Prior to EventBridge Pipes being released, the way to do this would’ve been to have a Lambda function process the messages from the SQS queue and then start a new execution of the Step Functions workflow using the SDK. With EventBridge Pipes, this can now be accomplished without writing any custom code. Keep reading to find out how!
Dive deep
EventBridge Pipe components
An EventBridge Pipe is made up of 4 components, 2 of them required.
It starts with the Source - at present, 6 different sources are supported. These sources include things like SQS queues, DynamoDB streams and Kinesis streams among others. Source is a required component in a pipe.
After an event enters a pipe from the Source, there is an opportunity to filter events using the Filtering stage. This is a way to prevent certain events being processed, saving unnecessary cost. This stage, as per the documentation, follows the same method as defining patterns for matching events in an EventBridge Rule. This stage is optional.
After having the opportunity to pass events through the Filtering stage, data can be enhanced in the Enrichment stage. Sticking with our e-commerce example we talked about earlier, an event could come in with minimal order details including an ID for a customer, rather than the full customer details. This stage could then go and fetch details about the customer before, enriching the data in the event before it gets passed to the next stage. There are 4 ways in which an event can be enriched. This stage is optional.
At the end of a pipe exists the Target. There are 15 different choices when it comes to delivering an event somewhere. The usual choices that you’d expect are there, such as Lambda functions and Step Functions workflows, however there’s also some in there that are less expected such as SageMaker pipelines and Redshift data API queries. Target is a required component in a pipe.
Building it with CloudFormation
Let’s get hands-on with EventBridge Pipes. As we talked about at the start, we’re going to build out an example where a message is put onto an SQS queue to signify that a new order has been created on our e-commerce site. We want to enrich the events with some additional customer information, so at the end of Step Functions workflow that is our Target we can send an email to inform them that the order has been received.
First up, let’s create the Source of our pipe. For this, we’ll be using an SQS queue. When creating this, we don’t have to do anything special. We’ll also set up a second SQS queue as our dead-letter-queue (DLQ) for storage of failed invocations of the pipe.
Resources:
OrdersQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "orders-to-be-processed"
RedrivePolicy:
maxReceiveCount: 1
deadLetterTargetArn: !GetAtt "OrdersDeadLetterQueue.Arn"
OrdersDeadLetterQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "orders-to-be-processed-dlq"
Now that we’ve got our SQS queues, we’ll create an SES (Simple Email Service) email identity. This allows us to use SES to send emails at the end of our Step Functions workflow to inform the customer that the message has been received. When setting up an SES identity, there are two things to keep in mind. You’ll need to add some CNAME records to your domain in order to verify ownership, and if SES is not enabled for production use in the region and account you’re operating in, you’ll need to raise a support ticket to resolve this.
Parameters:
pEmailDomainName:
Type: "String"
Description: "The domain name to verify for the SES email identity"
Resources:
SesEmailIdentity:
Type: "AWS::SES::EmailIdentity"
Properties:
EmailIdentity: !Ref "pEmailDomainName"
Next, let’s create the DynamoDB table where our customer and order data will be stored as well as the Step Functions workflow. That’s going to be processing our order. We’re using a form of data modelling called single table design - this is why our table definition is using PK
and SK
rather than specific names. This allows us to store data flexibly in the attributes.
Setting up a Step Functions workflow in CloudFormation (with SAM) is pretty straightforward, most of the hard work is done in the definition. What we do here is tell SAM where to find our definition, what values to substitute (this allows us to use values in the Step Function workflow definition that are available to CloudFormation) and the IAM policies to attach to the workflow execution role. We’re only going to be interacting with two other services in this workflow, DynamoDB and SES. Let’s add the relevant permissions to allow us to use the PutItem
API in DynamoDB and SendEmail
API in SES, only for our specific resources.
Resources:
ShopDynamoDBTable:
Type: "AWS::DynamoDB::Table"
Properties:
BillingMode: "PAY_PER_REQUEST"
AttributeDefinitions:
- AttributeName: "PK"
AttributeType: "S"
- AttributeName: "SK"
AttributeType: "S"
KeySchema:
- AttributeName: "PK"
KeyType: "HASH"
- AttributeName: "SK"
KeyType: "RANGE"
ProcessOrderStateMachine:
Type: "AWS::Serverless::StateMachine"
Properties:
Type: "EXPRESS"
DefinitionUri: "./state-machines/process-order.asl.yaml"
DefinitionSubstitutions:
ShopTableName: !Ref "ShopDynamoDBTable"
FromEmailAddress: !Join ["@", ["shop", !Ref "pEmailDomainName"]]
Policies:
- Statement:
- Sid: "WriteAccessToDynamoDbTable"
Effect: "Allow"
Action:
- "dynamodb:PutItem"
Resource:
- !GetAtt "ShopDynamoDBTable.Arn"
- Sid: "SendEmailViaSES"
Effect: "Allow"
Action:
- "ses:SendEmail"
Resource:
- !Sub "arn:aws:ses:${AWS::Region}:${AWS::AccountId}:identity/${SesEmailIdentity}"
Although we haven’t yet set up the EventBridge Pipe, we can plan for what data will be passed to our target which enables us to build the pieces in isolation. Below is an example of the input that’ll be passed into the Step Functions workflow. Keep this in mind when we look at the definition next.
{
"Customer": {
"Name": "Joe Bloggs",
"Id": "MYUNIQUECUSTOMERID",
"EmailAddress": "success@simulator.amazonses.com"
},
"Order": {
"Timestamp": "2023-01-08T16:52:00Z",
"Items": [
{"Id": "Item1", "Title": "Product", "CostInMinorUnit": "10000"}
],
"TotalCostInMinorUnit": "10000",
"TotalCostInMajorUnit": "100.00"
}
}
Now let’s look at the definition of the Step Functions workflow, defined in Amazon States Language. This is quite a simple workflow with just three states inside a Map
state. The reason it’s in a Map
state is because EventBridge Pipes supports batching multiple items into a single execution, so let’s build for the future. The first state is responsible for generating a unique identifier (UUIDv4) that we will assign to the order. Using this ID, we can then use the DynamoDB putItem
integration to store information about the order in our table. The final state in the workflow uses the AWS SDK integration to send an email via SES to the customer email address that is passed into the workflow.
StartAt: "Process order"
States:
Process order:
Type: "Map"
ItemProcessor:
StartAt: "Generate order ID"
States:
Generate order ID:
Type: "Pass"
Parameters:
Generated.$: "States.UUID()"
ResultPath: "$.Order.Id"
Next: "Store order"
Store order:
Type: "Task"
Resource: "arn:aws:states:::dynamodb:putItem"
Parameters:
TableName: "${ShopTableName}"
Item:
PK.$: "States.Format('CUST#{}', $.Customer.Id)"
SK.$: "States.Format('ORDER#{}', $.Order.Id.Generated)"
Timestamp.$: "$.Order.Timestamp"
Items.$: "States.JsonToString($.Order.Items)"
TotalCostInMinorUnit.$: "$.Order.TotalCostInMinorUnit"
Status: "RECEIVED"
ResultPath: null
Next: "Send order notification"
Send order notification:
Type: "Task"
Resource: "arn:aws:states:::aws-sdk:sesv2:sendEmail"
Parameters:
Destination:
ToAddresses.$: "States.Array($.Customer.EmailAddress)"
FromEmailAddress: "${FromEmailAddress}"
Content:
Simple:
Subject:
Data.$: "States.Format('Order {} has been received', $.Order.Id.Generated)"
Body:
Text:
Data.$: "States.Format('Hello {}, thank you for your order totalling {}. You will receive another email confirming dispatch once items have been processed in our warehouse', $.Customer.Name, $.Order.TotalCostInMajorUnit)"
End: True
End: True
The final step before we get onto creating our EventBridge Pipe is to create a Lambda function that will be used for enriching the order object that comes from the SQS queue. Our order object will just contain a customer ID rather than all the customer’s details. Our Enrichment stage Lambda function will retrieve the customer details from the DynamoDB table and add them into the object.
So that we’ve got some data to retrieve from DynamoDB, add the following item to the DynamoDB table that we created earlier. The reason we use a dummy address for the EmailAddress
attribute is so we don’t have to wait for AWS to approve our account for sending mail via SES to test the solution.
{
"PK": "CUST#1",
"SK": "META",
"Name": "Alex Kearns",
"EmailAddress": "success@simulator.amazonses.com"
}
With the data in our DynamoDB table, we can get on with creating the Lambda function. Adding an AWS::Serverless::Function
resource to our CloudFormation template makes it nice and easy to build and deploy. We pass the name of the DynamoDB table in as an environment variable and also grant permission for the function’s execution role to perform the dynamodb:GetItem
action.
Resources:
CustomerDataEnrichmentFunction:
Type: "AWS::Serverless::Function"
Properties:
CodeUri: "./lambda-functions/customer-data-enrichment"
Handler: "app.lambda_handler"
Runtime: "python3.9"
Environment:
Variables:
SHOP_DYNAMODB_TABLE: !Ref "ShopDynamoDBTable"
Policies:
- Statement:
- Sid: "ReadDynamoDbTable"
Effect: "Allow"
Action:
- "dynamodb:GetItem"
Resource:
- !GetAtt "ShopDynamoDBTable.Arn"
There’s not an awful lot of Python code required to enrich our object. What we’re doing in the Lambda function is getting the item from DynamoDB that matches our customer ID in the object, then adding some further data to the existing event before returning it. Enrichment at it’s simplest. If you were using this in production, you might want to add some better error handling around the scenario of no customer existing for an ID.
import json
import os
import boto3
# Environment variables
SHOP_TABLE = os.environ["SHOP_DYNAMODB_TABLE"]
# Set up boto3 Table resource
ddb = boto3.resource("dynamodb")
table = ddb.Table(SHOP_TABLE)
def lambda_handler(event, context):
response = []
for item in event:
item = json.loads(item["body"])
customer_id = item["Customer"]["Id"]
result = table.get_item(
Key={
"PK": f"CUST#{customer_id}",
"SK": "META"
}
)
result = result["Item"]
item["Customer"]["Name"] = result["Name"]
item["Customer"]["EmailAddress"] = result["EmailAddress"]
response.append(item)
return response
The final piece of this puzzle is to set up the EventBridge Pipe itself. This is where we tie everything together. First we define the execution role for the pipe. This role needs access to our SQS queue source, enrichment Lambda function and Step Functions workflow target. Next, we can define the resource for the pipe itself. Other than specifying the execution role, source, enrichment and target, there’s nothing really that we have to do. There is the option to configure how the Pipe interacts with the source (e.g. how many messages from the queue to batch into each invocation), how it interacts with the enrichment target (e.g. HTTP headers if the target is API Gateway) and how it invokes the target (e.g. whether to start a sync/async execution of the Step Functions workflow). Pretty flexible, huh?
Resources:
OrderProcessingPipeRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service: "pipes.amazonaws.com"
Action: "sts:AssumeRole"
Policies:
- PolicyName: "BaseInlinePolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: "AccessToCloudWatch"
Effect: "Allow"
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: "*"
- Sid: "UseSqsAsSource"
Effect: "Allow"
Action:
- "sqs:ReceiveMessage"
- "sqs:DeleteMessage"
- "sqs:GetQueueAttributes"
Resource: !GetAtt "OrdersQueue.Arn"
- Sid: "InvokeEnrichmentLambdaFunction"
Effect: "Allow"
Action:
- "lambda:InvokeFunction"
Resource: !GetAtt "CustomerDataEnrichmentFunction.Arn"
- Sid: "ExecuteSfnWorkflowAsTarget"
Effect: "Allow"
Action:
- "states:StartSyncExecution"
Resource: !Ref "ProcessOrderStateMachine"
OrderProcessingPipe:
Type: "AWS::Pipes::Pipe"
Properties:
RoleArn: !GetAtt "OrderProcessingPipeRole.Arn"
Source: !GetAtt "OrdersQueue.Arn"
Enrichment: !GetAtt "CustomerDataEnrichmentFunction.Arn"
Target: !Ref "ProcessOrderStateMachine"
To test this, just send the following payload to the SQS queue (feel free to use the AWS Management Console for this) and within a few moments you should see a new item in DynamoDB representing the order that’s just been processed!
{
"Customer": {
"Id": "1"
},
"Order": {
"Timestamp": "2023-01-08T16:52:00Z",
"Items": [
{
"Id": "Item1",
"Title": "Product",
"CostInMinorUnit": "10000"
}
],
"TotalCostInMinorUnit": "10000",
"TotalCostInMajorUnit": "100.00"
}
}
Rounding up
Summary
In summary, I think EventBridge Pipes is a really useful addition to the AWS ecosystem. Anything that can be done to simplify integration between services is a great step forwards. I’m looking forward to using this on customer projects, I can see it being an easy pattern to bring into designs on both greenfield projects as well as migration projects.
I really enjoyed trying it out properly and hope that this article is useful to some of you. Let me know your thoughts!