Step-by-step guide to streaming AWS CloudWatch Logs into Amazon OpenSearch Service using subscription filters, Lambda, and Firehose - with complete Terraform and CLI examples.

Getting CloudWatch Logs into Amazon OpenSearch Service is one of the first things teams tackle when building centralized log analytics on AWS. The integration has been available for years, but the setup involves several moving parts - IAM roles, Lambda functions, subscription filters, and VPC networking - that are easy to get wrong. This guide walks through the three main approaches, with working Terraform and CLI examples you can adapt to your environment.

How CloudWatch Logs Reach OpenSearch

AWS provides three paths for getting CloudWatch Logs data into OpenSearch Service:

  1. Subscription filter with Lambda - CloudWatch invokes a Lambda function that transforms and pushes log events to OpenSearch. This is the approach AWS wires up through the console.
  2. Subscription filter with Amazon Data Firehose - CloudWatch sends logs to a Firehose delivery stream, which buffers and delivers them to OpenSearch with automatic retry and S3 backup for failed records.
  3. OpenSearch Ingestion - A managed pipeline (based on Data Prepper) that pulls from CloudWatch Logs without requiring Lambda or Firehose.

All three receive data through CloudWatch Logs subscription filters. The log data arrives base64-encoded and gzip-compressed in every case.

Setting Up the Lambda Path with Terraform

The Lambda-based approach is the most common. When you create a subscription filter targeting OpenSearch through the AWS Console, it automatically provisions a Lambda function called LogsToElasticsearch_<domain-name>. Below, we replicate that setup in Terraform for reproducibility.

IAM Role for Lambda

The Lambda function needs permission to write to your OpenSearch domain and, if the domain lives in a VPC, to manage network interfaces:

resource "aws_iam_role" "lambda_to_opensearch" {
    name = "cwl-to-opensearch-lambda"
  
    assume_role_policy = jsonencode({
      Version = "2012-10-17"
      Statement = [{
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = { Service = "lambda.amazonaws.com" }
      }]
    })
  }
  
  resource "aws_iam_role_policy" "lambda_opensearch_access" {
    name = "opensearch-write"
    role = aws_iam_role.lambda_to_opensearch.id
  
    policy = jsonencode({
      Version = "2012-10-17"
      Statement = [
        {
          Effect   = "Allow"
          Action   = ["es:ESHttpPost", "es:ESHttpPut"]
          Resource = "${aws_opensearch_domain.logs.arn}/*"
        },
        {
          Effect = "Allow"
          Action = [
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents"
          ]
          Resource = "arn:aws:logs:*:*:*"
        }
      ]
    })
  }
  
  # Required only for VPC-based OpenSearch domains
  resource "aws_iam_role_policy_attachment" "lambda_vpc_access" {
    role       = aws_iam_role.lambda_to_opensearch.name
    policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
  }
  

Lambda Function

The function receives batches of log events, decompresses them, and bulk-indexes into OpenSearch. The index pattern follows the convention cwl-YYYY.MM.DD:

import base64
  import gzip
  import json
  import os
  import urllib.request
  from datetime import datetime
  
  OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
  
  def handler(event, context):
      compressed = base64.b64decode(event["awslogs"]["data"])
      payload = json.loads(gzip.decompress(compressed))
  
      if payload["messageType"] == "CONTROL_MESSAGE":
          return
  
      index = f"cwl-{datetime.utcnow():%Y.%m.%d}"
      bulk_body = ""
      for log_event in payload["logEvents"]:
          action = json.dumps({"index": {"_index": index}})
          doc = json.dumps({
              "@timestamp": datetime.utcfromtimestamp(
                  log_event["timestamp"] / 1000
              ).isoformat() + "Z",
              "message": log_event["message"],
              "log_group": payload["logGroup"],
              "log_stream": payload["logStream"],
              "owner": payload["owner"],
          })
          bulk_body += f"{action}\n{doc}\n"
  
      url = f"https://{OPENSEARCH_ENDPOINT}/_bulk"
      req = urllib.request.Request(
          url,
          data=bulk_body.encode(),
          headers={"Content-Type": "application/x-ndjson"},
          method="POST",
      )
      with urllib.request.urlopen(req) as resp:
          result = json.loads(resp.read())
          if result.get("errors"):
              raise Exception(f"Bulk indexing errors: {json.dumps(result)}")
  

Terraform Resources for Lambda and Subscription Filter

data "archive_file" "lambda_zip" {
    type        = "zip"
    source_file = "${path.module}/cwl_to_opensearch.py"
    output_path = "${path.module}/cwl_to_opensearch.zip"
  }
  
  resource "aws_lambda_function" "cwl_to_opensearch" {
    filename         = data.archive_file.lambda_zip.output_path
    function_name    = "cwl-to-opensearch"
    role             = aws_iam_role.lambda_to_opensearch.arn
    handler          = "cwl_to_opensearch.handler"
    runtime          = "python3.12"
    timeout          = 60
    source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  
    environment {
      variables = {
        OPENSEARCH_ENDPOINT = aws_opensearch_domain.logs.endpoint
      }
    }
  
    # Include vpc_config only for VPC-based domains
    vpc_config {
      subnet_ids         = var.private_subnet_ids
      security_group_ids = [aws_security_group.lambda_sg.id]
    }
  }
  
  resource "aws_lambda_permission" "allow_cloudwatch" {
    statement_id  = "AllowCloudWatchInvoke"
    action        = "lambda:InvokeFunction"
    function_name = aws_lambda_function.cwl_to_opensearch.function_name
    principal     = "logs.amazonaws.com"
    source_arn    = "${aws_cloudwatch_log_group.app.arn}:*"
  }
  
  resource "aws_cloudwatch_log_subscription_filter" "to_opensearch" {
    name            = "cwl-to-opensearch"
    log_group_name  = aws_cloudwatch_log_group.app.name
    filter_pattern  = ""  # Empty = all events
    destination_arn = aws_lambda_function.cwl_to_opensearch.arn
  }
  

An empty filter_pattern forwards every log event. To reduce volume and cost, specify a pattern - for example, "{ $.level = \"ERROR\" }" to send only error-level JSON logs.

The Firehose Alternative

For higher throughput or when you want built-in S3 backup of failed records, Firehose is the better choice. It handles buffering, retry (up to 7,200 seconds), and dead-letter delivery to S3 without custom code.

AWS CLI Setup

Create the delivery stream targeting OpenSearch:

aws firehose create-delivery-stream \
    --delivery-stream-name cwl-to-opensearch \
    --delivery-stream-type DirectPut \
    --amazon-opensearch-service-destination-configuration '{
      "RoleARN": "arn:aws:iam::123456789012:role/FirehoseToOpenSearch",
      "DomainARN": "arn:aws:es:us-east-1:123456789012:domain/logs",
      "IndexName": "cwl",
      "IndexRotationPeriod": "OneDay",
      "BufferingHints": {
        "IntervalInSeconds": 60,
        "SizeInMBs": 5
      },
      "RetryOptions": {
        "DurationInSeconds": 300
      },
      "S3BackupMode": "FailedDocumentsOnly",
      "S3Configuration": {
        "RoleARN": "arn:aws:iam::123456789012:role/FirehoseToOpenSearch",
        "BucketARN": "arn:aws:s3:::my-opensearch-backup",
        "Prefix": "failed-logs/"
      }
    }'
  

Then create the subscription filter pointing to Firehose:

aws logs put-subscription-filter \
    --log-group-name "/aws/lambda/my-app" \
    --filter-name "to-firehose" \
    --filter-pattern "" \
    --destination-arn "arn:aws:firehose:us-east-1:123456789012:deliverystream/cwl-to-opensearch" \
    --role-arn "arn:aws:iam::123456789012:role/CWLtoFirehose"
  

The IndexRotationPeriod of OneDay appends a date suffix to the index name automatically (e.g., cwl-2026-03-30), which keeps index sizes manageable and simplifies retention with index lifecycle policies.

Comparison: Lambda vs Firehose vs OpenSearch Ingestion

Aspect Lambda Firehose OpenSearch Ingestion
Custom transformation Full control (your code) Limited (Firehose transforms) Data Prepper pipelines
Failed record handling Manual (DLQ or retry logic) Automatic S3 backup Pipeline dead-letter queue
Buffering None (event-driven) Configurable (60-900s, 1-128 MB) Configurable
VPC support Requires VPC config + ENIs Requires VPC config + ENIs Managed VPC endpoints
Index rotation Custom logic in code Built-in (hourly/daily/weekly/monthly) Pipeline configuration
Scaling Lambda concurrency limits Automatic Managed auto-scaling
Cost model Per invocation + duration Per GB ingested Per OCU-hour
Setup effort Medium (IAM + Lambda + filter) Medium (IAM + stream + filter) Low (pipeline definition)

For most log-shipping use cases, Firehose is the lowest-maintenance option. Lambda is worth it when you need to enrich, filter, or reshape log data before indexing. OpenSearch Ingestion is the newest option and removes the most operational overhead, but is only available in regions that support it.

Common Pitfalls

VPC connectivity. If your OpenSearch domain uses VPC access (and it should for production), the Lambda function or Firehose stream must be configured with subnets and security groups in the same VPC. The security group needs outbound HTTPS (port 443) to the OpenSearch domain's security group, and the OpenSearch security group needs a corresponding inbound rule. Forgetting this is the most common reason the integration silently drops logs.

IAM permission scope. Avoid using es:* in production policies. Scope permissions to es:ESHttpPost and es:ESHttpPut on the specific domain resource. For Firehose, the role needs both the Firehose permissions and the OpenSearch write permissions.

Lambda concurrency. A high-volume log group can trigger thousands of concurrent Lambda invocations, which can overwhelm your OpenSearch cluster. Set a reserved concurrency limit on the function - start with 10-50 and increase based on your cluster's bulk indexing capacity.

Index management. Daily indices accumulate fast. Configure an ISM (Index State Management) policy to delete or move old indices to warm/cold storage. Without this, you will eventually hit storage limits or shard count limits.

Subscription filter limits. Each CloudWatch log group supports a maximum of two subscription filters. If you need to send logs to more than two destinations, route through a Kinesis Data Stream first, then fan out from there.

Summary

Streaming CloudWatch Logs to Amazon OpenSearch Service is a well-supported integration pattern on AWS, but the details matter. The Lambda-based path gives you full control over data transformation and indexing logic. Firehose reduces operational overhead with built-in buffering, retry, and S3 dead-letter support. OpenSearch Ingestion is the most hands-off option if your region supports it.

Whichever path you choose, plan for VPC networking from the start, set sensible concurrency limits, and put index lifecycle policies in place before you ship your first log event. If you need help designing or scaling your OpenSearch logging infrastructure, reach out to our team - we have been running OpenSearch and Elasticsearch clusters in production for over a decade.