Step-by-step guide to streaming AWS CloudWatch Logs into Amazon OpenSearch Service using subscription filters, Lambda, and Firehose - with complete Terraform and CLI examples.
Getting CloudWatch Logs into Amazon OpenSearch Service is one of the first things teams tackle when building centralized log analytics on AWS. The integration has been available for years, but the setup involves several moving parts - IAM roles, Lambda functions, subscription filters, and VPC networking - that are easy to get wrong. This guide walks through the three main approaches, with working Terraform and CLI examples you can adapt to your environment.
How CloudWatch Logs Reach OpenSearch
AWS provides three paths for getting CloudWatch Logs data into OpenSearch Service:
- Subscription filter with Lambda - CloudWatch invokes a Lambda function that transforms and pushes log events to OpenSearch. This is the approach AWS wires up through the console.
- Subscription filter with Amazon Data Firehose - CloudWatch sends logs to a Firehose delivery stream, which buffers and delivers them to OpenSearch with automatic retry and S3 backup for failed records.
- OpenSearch Ingestion - A managed pipeline (based on Data Prepper) that pulls from CloudWatch Logs without requiring Lambda or Firehose.
All three receive data through CloudWatch Logs subscription filters. The log data arrives base64-encoded and gzip-compressed in every case.
Setting Up the Lambda Path with Terraform
The Lambda-based approach is the most common. When you create a subscription filter targeting OpenSearch through the AWS Console, it automatically provisions a Lambda function called LogsToElasticsearch_<domain-name>. Below, we replicate that setup in Terraform for reproducibility.
IAM Role for Lambda
The Lambda function needs permission to write to your OpenSearch domain and, if the domain lives in a VPC, to manage network interfaces:
resource "aws_iam_role" "lambda_to_opensearch" {
name = "cwl-to-opensearch-lambda"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy" "lambda_opensearch_access" {
name = "opensearch-write"
role = aws_iam_role.lambda_to_opensearch.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["es:ESHttpPost", "es:ESHttpPut"]
Resource = "${aws_opensearch_domain.logs.arn}/*"
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
# Required only for VPC-based OpenSearch domains
resource "aws_iam_role_policy_attachment" "lambda_vpc_access" {
role = aws_iam_role.lambda_to_opensearch.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}
Lambda Function
The function receives batches of log events, decompresses them, and bulk-indexes into OpenSearch. The index pattern follows the convention cwl-YYYY.MM.DD:
import base64
import gzip
import json
import os
import urllib.request
from datetime import datetime
OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
def handler(event, context):
compressed = base64.b64decode(event["awslogs"]["data"])
payload = json.loads(gzip.decompress(compressed))
if payload["messageType"] == "CONTROL_MESSAGE":
return
index = f"cwl-{datetime.utcnow():%Y.%m.%d}"
bulk_body = ""
for log_event in payload["logEvents"]:
action = json.dumps({"index": {"_index": index}})
doc = json.dumps({
"@timestamp": datetime.utcfromtimestamp(
log_event["timestamp"] / 1000
).isoformat() + "Z",
"message": log_event["message"],
"log_group": payload["logGroup"],
"log_stream": payload["logStream"],
"owner": payload["owner"],
})
bulk_body += f"{action}\n{doc}\n"
url = f"https://{OPENSEARCH_ENDPOINT}/_bulk"
req = urllib.request.Request(
url,
data=bulk_body.encode(),
headers={"Content-Type": "application/x-ndjson"},
method="POST",
)
with urllib.request.urlopen(req) as resp:
result = json.loads(resp.read())
if result.get("errors"):
raise Exception(f"Bulk indexing errors: {json.dumps(result)}")
Terraform Resources for Lambda and Subscription Filter
data "archive_file" "lambda_zip" {
type = "zip"
source_file = "${path.module}/cwl_to_opensearch.py"
output_path = "${path.module}/cwl_to_opensearch.zip"
}
resource "aws_lambda_function" "cwl_to_opensearch" {
filename = data.archive_file.lambda_zip.output_path
function_name = "cwl-to-opensearch"
role = aws_iam_role.lambda_to_opensearch.arn
handler = "cwl_to_opensearch.handler"
runtime = "python3.12"
timeout = 60
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
environment {
variables = {
OPENSEARCH_ENDPOINT = aws_opensearch_domain.logs.endpoint
}
}
# Include vpc_config only for VPC-based domains
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda_sg.id]
}
}
resource "aws_lambda_permission" "allow_cloudwatch" {
statement_id = "AllowCloudWatchInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.cwl_to_opensearch.function_name
principal = "logs.amazonaws.com"
source_arn = "${aws_cloudwatch_log_group.app.arn}:*"
}
resource "aws_cloudwatch_log_subscription_filter" "to_opensearch" {
name = "cwl-to-opensearch"
log_group_name = aws_cloudwatch_log_group.app.name
filter_pattern = "" # Empty = all events
destination_arn = aws_lambda_function.cwl_to_opensearch.arn
}
An empty filter_pattern forwards every log event. To reduce volume and cost, specify a pattern - for example, "{ $.level = \"ERROR\" }" to send only error-level JSON logs.
The Firehose Alternative
For higher throughput or when you want built-in S3 backup of failed records, Firehose is the better choice. It handles buffering, retry (up to 7,200 seconds), and dead-letter delivery to S3 without custom code.
AWS CLI Setup
Create the delivery stream targeting OpenSearch:
aws firehose create-delivery-stream \
--delivery-stream-name cwl-to-opensearch \
--delivery-stream-type DirectPut \
--amazon-opensearch-service-destination-configuration '{
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseToOpenSearch",
"DomainARN": "arn:aws:es:us-east-1:123456789012:domain/logs",
"IndexName": "cwl",
"IndexRotationPeriod": "OneDay",
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 5
},
"RetryOptions": {
"DurationInSeconds": 300
},
"S3BackupMode": "FailedDocumentsOnly",
"S3Configuration": {
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseToOpenSearch",
"BucketARN": "arn:aws:s3:::my-opensearch-backup",
"Prefix": "failed-logs/"
}
}'
Then create the subscription filter pointing to Firehose:
aws logs put-subscription-filter \
--log-group-name "/aws/lambda/my-app" \
--filter-name "to-firehose" \
--filter-pattern "" \
--destination-arn "arn:aws:firehose:us-east-1:123456789012:deliverystream/cwl-to-opensearch" \
--role-arn "arn:aws:iam::123456789012:role/CWLtoFirehose"
The IndexRotationPeriod of OneDay appends a date suffix to the index name automatically (e.g., cwl-2026-03-30), which keeps index sizes manageable and simplifies retention with index lifecycle policies.
Comparison: Lambda vs Firehose vs OpenSearch Ingestion
| Aspect | Lambda | Firehose | OpenSearch Ingestion |
|---|---|---|---|
| Custom transformation | Full control (your code) | Limited (Firehose transforms) | Data Prepper pipelines |
| Failed record handling | Manual (DLQ or retry logic) | Automatic S3 backup | Pipeline dead-letter queue |
| Buffering | None (event-driven) | Configurable (60-900s, 1-128 MB) | Configurable |
| VPC support | Requires VPC config + ENIs | Requires VPC config + ENIs | Managed VPC endpoints |
| Index rotation | Custom logic in code | Built-in (hourly/daily/weekly/monthly) | Pipeline configuration |
| Scaling | Lambda concurrency limits | Automatic | Managed auto-scaling |
| Cost model | Per invocation + duration | Per GB ingested | Per OCU-hour |
| Setup effort | Medium (IAM + Lambda + filter) | Medium (IAM + stream + filter) | Low (pipeline definition) |
For most log-shipping use cases, Firehose is the lowest-maintenance option. Lambda is worth it when you need to enrich, filter, or reshape log data before indexing. OpenSearch Ingestion is the newest option and removes the most operational overhead, but is only available in regions that support it.
Common Pitfalls
VPC connectivity. If your OpenSearch domain uses VPC access (and it should for production), the Lambda function or Firehose stream must be configured with subnets and security groups in the same VPC. The security group needs outbound HTTPS (port 443) to the OpenSearch domain's security group, and the OpenSearch security group needs a corresponding inbound rule. Forgetting this is the most common reason the integration silently drops logs.
IAM permission scope. Avoid using es:* in production policies. Scope permissions to es:ESHttpPost and es:ESHttpPut on the specific domain resource. For Firehose, the role needs both the Firehose permissions and the OpenSearch write permissions.
Lambda concurrency. A high-volume log group can trigger thousands of concurrent Lambda invocations, which can overwhelm your OpenSearch cluster. Set a reserved concurrency limit on the function - start with 10-50 and increase based on your cluster's bulk indexing capacity.
Index management. Daily indices accumulate fast. Configure an ISM (Index State Management) policy to delete or move old indices to warm/cold storage. Without this, you will eventually hit storage limits or shard count limits.
Subscription filter limits. Each CloudWatch log group supports a maximum of two subscription filters. If you need to send logs to more than two destinations, route through a Kinesis Data Stream first, then fan out from there.
Summary
Streaming CloudWatch Logs to Amazon OpenSearch Service is a well-supported integration pattern on AWS, but the details matter. The Lambda-based path gives you full control over data transformation and indexing logic. Firehose reduces operational overhead with built-in buffering, retry, and S3 dead-letter support. OpenSearch Ingestion is the most hands-off option if your region supports it.
Whichever path you choose, plan for VPC networking from the start, set sensible concurrency limits, and put index lifecycle policies in place before you ship your first log event. If you need help designing or scaling your OpenSearch logging infrastructure, reach out to our team - we have been running OpenSearch and Elasticsearch clusters in production for over a decade.