Step-by-step guide to streaming AWS CloudWatch Logs into Amazon OpenSearch Service using subscription filters, Lambda, and Firehose - with complete Terraform and CLI examples.
Centralizing logs into Amazon OpenSearch Service usually starts with CloudWatch. The integration has been around for years, but the moving parts (IAM, Lambda, subscription filters, VPC networking) trip teams up every time. This post is the version of the guide I wish I had the first time I wired this up: three working patterns, with the Terraform and CLI you actually need, and the gotchas that cost us hours to debug.
How CloudWatch Logs Reach OpenSearch
There are three patterns worth knowing:
- Subscription filter → Lambda. CloudWatch invokes a Lambda function that transforms and pushes log events to OpenSearch. This is what the AWS Console wires up when you click "Subscription filter for OpenSearch".
- Subscription filter → Firehose → S3 → OpenSearch. Firehose buffers records to S3, and a second stage (Lambda on S3 events, or OpenSearch Ingestion) loads them. Note that Firehose's OpenSearch destination does not accept CloudWatch Logs subscription records directly - CloudWatch packs multiple events into one Firehose record and OpenSearch rejects the shape. The Firehose docs spell this out, but it's easy to miss until you watch records pile up in the error bucket.
- OpenSearch Ingestion (OSI). A managed Data Prepper pipeline. There's no CloudWatch Logs source plugin, so in practice OSI piggybacks on pattern 2 - CloudWatch to Firehose to S3, then OSI's
s3source (driven by SQS notifications) does the OpenSearch load.
All three start with CloudWatch Logs subscription filters. In every case the log data arrives base64-encoded and gzip-compressed - your code or pipeline has to decode and decompress before doing anything with it.
Setting Up the Lambda Path with Terraform
Most teams I work with end up here. The console will hand-spin a Lambda for you (named something like LogsToElasticsearch_<domain-name>), but if you want anything beyond a one-off prototype you'll want it in Terraform. The function below also signs its requests with SigV4 - most blog snippets skip this, which is fine for a public anonymous-access domain and broken for everything else.
IAM Role for Lambda
The Lambda function needs permission to write to your OpenSearch domain and, if the domain lives in a VPC, to manage network interfaces:
resource "aws_iam_role" "lambda_to_opensearch" {
name = "cwl-to-opensearch-lambda"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy" "lambda_opensearch_access" {
name = "opensearch-write"
role = aws_iam_role.lambda_to_opensearch.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["es:ESHttpPost", "es:ESHttpPut"]
Resource = "${aws_opensearch_domain.logs.arn}/*"
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
# Required only for VPC-based OpenSearch domains
resource "aws_iam_role_policy_attachment" "lambda_vpc_access" {
role = aws_iam_role.lambda_to_opensearch.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}
Lambda Function
The function takes a batch of log events, decompresses them, and bulk-indexes into a daily index (cwl-YYYY.MM.DD). SigV4 signing uses the Lambda role's credentials; without it, a default-secured domain returns 403 and you'll spend an afternoon wondering why nothing shows up in OpenSearch.
import base64
import gzip
import json
import os
from datetime import datetime, timezone
import boto3
import urllib3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
REGION = os.environ["AWS_REGION"]
http = urllib3.PoolManager()
credentials = boto3.Session().get_credentials()
def handler(event, context):
compressed = base64.b64decode(event["awslogs"]["data"])
payload = json.loads(gzip.decompress(compressed))
if payload["messageType"] == "CONTROL_MESSAGE":
return
index = f"cwl-{datetime.now(timezone.utc):%Y.%m.%d}"
bulk_body = ""
for log_event in payload["logEvents"]:
action = json.dumps({"index": {"_index": index}})
doc = json.dumps({
"@timestamp": datetime.fromtimestamp(
log_event["timestamp"] / 1000, tz=timezone.utc
).isoformat(),
"message": log_event["message"],
"log_group": payload["logGroup"],
"log_stream": payload["logStream"],
"owner": payload["owner"],
})
bulk_body += f"{action}\n{doc}\n"
url = f"https://{OPENSEARCH_ENDPOINT}/_bulk"
request = AWSRequest(
method="POST",
url=url,
data=bulk_body,
headers={"Content-Type": "application/x-ndjson"},
)
SigV4Auth(credentials, "es", REGION).add_auth(request)
response = http.request(
"POST",
url,
body=bulk_body.encode(),
headers=dict(request.headers),
)
if response.status >= 300:
raise Exception(f"OpenSearch returned {response.status}: {response.data!r}")
result = json.loads(response.data)
if result.get("errors"):
raise Exception(f"Bulk indexing errors: {json.dumps(result)}")
boto3 and botocore ship with the Lambda Python runtime, and urllib3 rides along. Two variations worth knowing: for OpenSearch Serverless, switch the signing service from es to aoss; for a domain using FGAC's internal user database instead of IAM, swap SigV4 for HTTP basic auth and store the password in Secrets Manager.
Terraform Resources for Lambda and Subscription Filter
data "archive_file" "lambda_zip" {
type = "zip"
source_file = "${path.module}/cwl_to_opensearch.py"
output_path = "${path.module}/cwl_to_opensearch.zip"
}
resource "aws_lambda_function" "cwl_to_opensearch" {
filename = data.archive_file.lambda_zip.output_path
function_name = "cwl-to-opensearch"
role = aws_iam_role.lambda_to_opensearch.arn
handler = "cwl_to_opensearch.handler"
runtime = "python3.12"
timeout = 60
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
environment {
variables = {
OPENSEARCH_ENDPOINT = aws_opensearch_domain.logs.endpoint
}
}
# Include vpc_config only for VPC-based domains
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda_sg.id]
}
}
resource "aws_lambda_permission" "allow_cloudwatch" {
statement_id = "AllowCloudWatchInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.cwl_to_opensearch.function_name
principal = "logs.amazonaws.com"
source_arn = "${aws_cloudwatch_log_group.app.arn}:*"
}
resource "aws_cloudwatch_log_subscription_filter" "to_opensearch" {
name = "cwl-to-opensearch"
log_group_name = aws_cloudwatch_log_group.app.name
filter_pattern = "" # Empty = all events
destination_arn = aws_lambda_function.cwl_to_opensearch.arn
}
An empty filter_pattern forwards everything. For noisy log groups, narrow it down - e.g. "{ $.level = \"ERROR\" }" for JSON-formatted error logs. Your OpenSearch cluster (and your bill) will thank you.
The Firehose-to-S3 Path
When you need higher throughput, or a durable buffer you can replay from, route logs through Firehose - but to S3, not straight to OpenSearch. The shape is:
CloudWatch Logs --subscription filter--> Firehose --> S3 --S3 event--> Lambda or OSI --> OpenSearch
S3 is doing the heavy lifting here: durable buffering, replayability, and dead-letter handling for free. The second hop is either a Lambda that bulk-indexes new objects as they land, or an OpenSearch Ingestion pipeline using the s3 source with SQS notifications.
AWS CLI Setup
Create the delivery stream targeting S3:
aws firehose create-delivery-stream \
--delivery-stream-name cwl-to-s3 \
--delivery-stream-type DirectPut \
--extended-s3-destination-configuration '{
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseToS3",
"BucketARN": "arn:aws:s3:::my-cwl-archive",
"Prefix": "cwl/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
"ErrorOutputPrefix": "errors/",
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 5
},
"CompressionFormat": "GZIP"
}'
Then create the subscription filter pointing to Firehose:
aws logs put-subscription-filter \
--log-group-name "/aws/lambda/my-app" \
--filter-name "to-firehose" \
--filter-pattern "" \
--destination-arn "arn:aws:firehose:us-east-1:123456789012:deliverystream/cwl-to-s3" \
--role-arn "arn:aws:iam::123456789012:role/CWLtoFirehose"
Each S3 object is gzipped, newline-delimited JSON in the standard CloudWatch Logs envelope ({"messageType": ..., "logEvents": [...], ...}). The loader unwraps the envelope, fans out logEvents, and bulk-indexes them. Rotate the index daily (e.g. cwl-2026-03-30) so it plays nicely with index lifecycle policies for retention.
One footnote: Firehose's OpenSearch destination (which you'd use for non-CloudWatch sources) accepts BufferingHints.IntervalInSeconds 0-900, SizeInMBs 1-100, and RetryOptions.DurationInSeconds up to 7,200. Worth knowing if you reuse the same stream shape for app logs sent to Firehose directly.
Comparison: Lambda vs Firehose-to-S3 vs OpenSearch Ingestion
| Aspect | Lambda (direct) | Firehose to S3 + loader | OpenSearch Ingestion (S3 source) |
|---|---|---|---|
| Custom transformation | Full control (your code) | In the loader Lambda | Data Prepper processors |
| Failed record handling | Manual (DLQ or retry logic) | S3 acts as a durable buffer; replay by re-processing objects | Pipeline DLQ to S3 |
| Buffering | None (event-driven) | Firehose: 0-900s, 1-128 MB to S3 | Driven by S3 object size and SQS batch |
| VPC support | Requires VPC config + ENIs | Loader Lambda needs VPC config + ENIs | Managed VPC endpoints |
| Index rotation | Custom logic in code | Custom logic in loader | Pipeline configuration |
| Scaling | Lambda concurrency limits | Firehose automatic; loader scales with Lambda | Managed auto-scaling (OCUs) |
| Cost model | Per invocation + duration | Firehose per GB + S3 + loader Lambda | Per OCU-hour + S3 |
| Setup effort | Medium (IAM + Lambda + filter) | High (Firehose + S3 + loader + filter) | Medium (pipeline + S3 + filter) |
Rule of thumb: at low-to-medium volumes, the direct Lambda path is the simplest thing that works and you should default to it. Once you're shipping enough logs that you care about replay, batching, or backpressure - or you've already burned a few Lambdas to OpenSearch backoff - move to Firehose-to-S3. OSI is worth the switch when you want the loader stage off your plate, but it's region-gated and you still need the CloudWatch-to-S3 hop in front of it.
What Trips People Up
VPC connectivity. If your OpenSearch domain is in a VPC (and for production, it should be), Lambda or the loader needs to be in the same VPC, with a security group that allows outbound HTTPS to the OpenSearch SG, and a matching inbound rule on the OpenSearch SG. Miss this and the integration silently drops logs - no errors, no metrics, just an empty index.
IAM scope. Don't ship es:* to production. Use es:ESHttpPost and es:ESHttpPut against the specific domain ARN. Roles for the Firehose/loader path need both Firehose permissions and OpenSearch write permissions.
Lambda concurrency. A chatty log group can fan out into thousands of concurrent Lambda invocations and steamroll your cluster's bulk thread pool. Set a reserved concurrency limit - I usually start at 10-50 and raise it once I've watched bulk queue metrics for a day.
Index management. Daily indices pile up fast. Put an ISM policy in place from day one to roll or delete old indices. Skip this and you'll hit shard count limits long before storage limits, and the failure mode is ugly.
Two subscription filters per log group, max. Hard limit. If you need to fan out to more than two destinations, put a Kinesis Data Stream in front and split from there.
Wrapping Up
CloudWatch Logs to OpenSearch is a well-supported pattern, but it's less plug-and-play than the marketing implies. Lambda gives you the most control and the fewest pieces to move. Firehose-to-S3 adds a real buffer at the cost of a second stage. OSI takes the loader off your hands - if your region supports it and you're willing to land logs in S3 first.
Whichever path you pick: sign your requests, plan VPC networking up front, cap Lambda concurrency, and have an ISM policy ready before the first log lands. If you'd like help designing or scaling OpenSearch logging at your org, take a look at our OpenSearch consulting services or drop us a line - we've been running OpenSearch and Elasticsearch in production for over a decade.