Step-by-step guide to streaming AWS CloudWatch Logs into Amazon OpenSearch Service using subscription filters, Lambda, and Firehose - with complete Terraform and CLI examples.

CloudWatch Logs to OpenSearch: A Practical Setup Guide with Terraform and AWS CLI

Centralizing logs into Amazon OpenSearch Service usually starts with CloudWatch. The integration has been around for years, but the moving parts (IAM, Lambda, subscription filters, VPC networking) trip teams up every time. This post is the version of the guide I wish I had the first time I wired this up: three working patterns, with the Terraform and CLI you actually need, and the gotchas that cost us hours to debug.

How CloudWatch Logs Reach OpenSearch

There are three patterns worth knowing:

  1. Subscription filter → Lambda. CloudWatch invokes a Lambda function that transforms and pushes log events to OpenSearch. This is what the AWS Console wires up when you click "Subscription filter for OpenSearch".
  2. Subscription filter → Firehose → S3 → OpenSearch. Firehose buffers records to S3, and a second stage (Lambda on S3 events, or OpenSearch Ingestion) loads them. Note that Firehose's OpenSearch destination does not accept CloudWatch Logs subscription records directly - CloudWatch packs multiple events into one Firehose record and OpenSearch rejects the shape. The Firehose docs spell this out, but it's easy to miss until you watch records pile up in the error bucket.
  3. OpenSearch Ingestion (OSI). A managed Data Prepper pipeline. There's no CloudWatch Logs source plugin, so in practice OSI piggybacks on pattern 2 - CloudWatch to Firehose to S3, then OSI's s3 source (driven by SQS notifications) does the OpenSearch load.

All three start with CloudWatch Logs subscription filters. In every case the log data arrives base64-encoded and gzip-compressed - your code or pipeline has to decode and decompress before doing anything with it.

Setting Up the Lambda Path with Terraform

Most teams I work with end up here. The console will hand-spin a Lambda for you (named something like LogsToElasticsearch_<domain-name>), but if you want anything beyond a one-off prototype you'll want it in Terraform. The function below also signs its requests with SigV4 - most blog snippets skip this, which is fine for a public anonymous-access domain and broken for everything else.

IAM Role for Lambda

The Lambda function needs permission to write to your OpenSearch domain and, if the domain lives in a VPC, to manage network interfaces:

resource "aws_iam_role" "lambda_to_opensearch" {
    name = "cwl-to-opensearch-lambda"
  
    assume_role_policy = jsonencode({
      Version = "2012-10-17"
      Statement = [{
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = { Service = "lambda.amazonaws.com" }
      }]
    })
  }
  
  resource "aws_iam_role_policy" "lambda_opensearch_access" {
    name = "opensearch-write"
    role = aws_iam_role.lambda_to_opensearch.id
  
    policy = jsonencode({
      Version = "2012-10-17"
      Statement = [
        {
          Effect   = "Allow"
          Action   = ["es:ESHttpPost", "es:ESHttpPut"]
          Resource = "${aws_opensearch_domain.logs.arn}/*"
        },
        {
          Effect = "Allow"
          Action = [
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents"
          ]
          Resource = "arn:aws:logs:*:*:*"
        }
      ]
    })
  }
  
  # Required only for VPC-based OpenSearch domains
  resource "aws_iam_role_policy_attachment" "lambda_vpc_access" {
    role       = aws_iam_role.lambda_to_opensearch.name
    policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
  }
  

Lambda Function

The function takes a batch of log events, decompresses them, and bulk-indexes into a daily index (cwl-YYYY.MM.DD). SigV4 signing uses the Lambda role's credentials; without it, a default-secured domain returns 403 and you'll spend an afternoon wondering why nothing shows up in OpenSearch.

import base64
  import gzip
  import json
  import os
  from datetime import datetime, timezone
  
  import boto3
  import urllib3
  from botocore.auth import SigV4Auth
  from botocore.awsrequest import AWSRequest
  
  OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
  REGION = os.environ["AWS_REGION"]
  
  http = urllib3.PoolManager()
  credentials = boto3.Session().get_credentials()
  
  
  def handler(event, context):
      compressed = base64.b64decode(event["awslogs"]["data"])
      payload = json.loads(gzip.decompress(compressed))
  
      if payload["messageType"] == "CONTROL_MESSAGE":
          return
  
      index = f"cwl-{datetime.now(timezone.utc):%Y.%m.%d}"
      bulk_body = ""
      for log_event in payload["logEvents"]:
          action = json.dumps({"index": {"_index": index}})
          doc = json.dumps({
              "@timestamp": datetime.fromtimestamp(
                  log_event["timestamp"] / 1000, tz=timezone.utc
              ).isoformat(),
              "message": log_event["message"],
              "log_group": payload["logGroup"],
              "log_stream": payload["logStream"],
              "owner": payload["owner"],
          })
          bulk_body += f"{action}\n{doc}\n"
  
      url = f"https://{OPENSEARCH_ENDPOINT}/_bulk"
      request = AWSRequest(
          method="POST",
          url=url,
          data=bulk_body,
          headers={"Content-Type": "application/x-ndjson"},
      )
      SigV4Auth(credentials, "es", REGION).add_auth(request)
  
      response = http.request(
          "POST",
          url,
          body=bulk_body.encode(),
          headers=dict(request.headers),
      )
      if response.status >= 300:
          raise Exception(f"OpenSearch returned {response.status}: {response.data!r}")
      result = json.loads(response.data)
      if result.get("errors"):
          raise Exception(f"Bulk indexing errors: {json.dumps(result)}")
  

boto3 and botocore ship with the Lambda Python runtime, and urllib3 rides along. Two variations worth knowing: for OpenSearch Serverless, switch the signing service from es to aoss; for a domain using FGAC's internal user database instead of IAM, swap SigV4 for HTTP basic auth and store the password in Secrets Manager.

Terraform Resources for Lambda and Subscription Filter

data "archive_file" "lambda_zip" {
    type        = "zip"
    source_file = "${path.module}/cwl_to_opensearch.py"
    output_path = "${path.module}/cwl_to_opensearch.zip"
  }
  
  resource "aws_lambda_function" "cwl_to_opensearch" {
    filename         = data.archive_file.lambda_zip.output_path
    function_name    = "cwl-to-opensearch"
    role             = aws_iam_role.lambda_to_opensearch.arn
    handler          = "cwl_to_opensearch.handler"
    runtime          = "python3.12"
    timeout          = 60
    source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  
    environment {
      variables = {
        OPENSEARCH_ENDPOINT = aws_opensearch_domain.logs.endpoint
      }
    }
  
    # Include vpc_config only for VPC-based domains
    vpc_config {
      subnet_ids         = var.private_subnet_ids
      security_group_ids = [aws_security_group.lambda_sg.id]
    }
  }
  
  resource "aws_lambda_permission" "allow_cloudwatch" {
    statement_id  = "AllowCloudWatchInvoke"
    action        = "lambda:InvokeFunction"
    function_name = aws_lambda_function.cwl_to_opensearch.function_name
    principal     = "logs.amazonaws.com"
    source_arn    = "${aws_cloudwatch_log_group.app.arn}:*"
  }
  
  resource "aws_cloudwatch_log_subscription_filter" "to_opensearch" {
    name            = "cwl-to-opensearch"
    log_group_name  = aws_cloudwatch_log_group.app.name
    filter_pattern  = ""  # Empty = all events
    destination_arn = aws_lambda_function.cwl_to_opensearch.arn
  }
  

An empty filter_pattern forwards everything. For noisy log groups, narrow it down - e.g. "{ $.level = \"ERROR\" }" for JSON-formatted error logs. Your OpenSearch cluster (and your bill) will thank you.

The Firehose-to-S3 Path

When you need higher throughput, or a durable buffer you can replay from, route logs through Firehose - but to S3, not straight to OpenSearch. The shape is:

CloudWatch Logs --subscription filter--> Firehose --> S3 --S3 event--> Lambda or OSI --> OpenSearch
  

S3 is doing the heavy lifting here: durable buffering, replayability, and dead-letter handling for free. The second hop is either a Lambda that bulk-indexes new objects as they land, or an OpenSearch Ingestion pipeline using the s3 source with SQS notifications.

AWS CLI Setup

Create the delivery stream targeting S3:

aws firehose create-delivery-stream \
    --delivery-stream-name cwl-to-s3 \
    --delivery-stream-type DirectPut \
    --extended-s3-destination-configuration '{
      "RoleARN": "arn:aws:iam::123456789012:role/FirehoseToS3",
      "BucketARN": "arn:aws:s3:::my-cwl-archive",
      "Prefix": "cwl/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
      "ErrorOutputPrefix": "errors/",
      "BufferingHints": {
        "IntervalInSeconds": 60,
        "SizeInMBs": 5
      },
      "CompressionFormat": "GZIP"
    }'
  

Then create the subscription filter pointing to Firehose:

aws logs put-subscription-filter \
    --log-group-name "/aws/lambda/my-app" \
    --filter-name "to-firehose" \
    --filter-pattern "" \
    --destination-arn "arn:aws:firehose:us-east-1:123456789012:deliverystream/cwl-to-s3" \
    --role-arn "arn:aws:iam::123456789012:role/CWLtoFirehose"
  

Each S3 object is gzipped, newline-delimited JSON in the standard CloudWatch Logs envelope ({"messageType": ..., "logEvents": [...], ...}). The loader unwraps the envelope, fans out logEvents, and bulk-indexes them. Rotate the index daily (e.g. cwl-2026-03-30) so it plays nicely with index lifecycle policies for retention.

One footnote: Firehose's OpenSearch destination (which you'd use for non-CloudWatch sources) accepts BufferingHints.IntervalInSeconds 0-900, SizeInMBs 1-100, and RetryOptions.DurationInSeconds up to 7,200. Worth knowing if you reuse the same stream shape for app logs sent to Firehose directly.

Comparison: Lambda vs Firehose-to-S3 vs OpenSearch Ingestion

Aspect Lambda (direct) Firehose to S3 + loader OpenSearch Ingestion (S3 source)
Custom transformation Full control (your code) In the loader Lambda Data Prepper processors
Failed record handling Manual (DLQ or retry logic) S3 acts as a durable buffer; replay by re-processing objects Pipeline DLQ to S3
Buffering None (event-driven) Firehose: 0-900s, 1-128 MB to S3 Driven by S3 object size and SQS batch
VPC support Requires VPC config + ENIs Loader Lambda needs VPC config + ENIs Managed VPC endpoints
Index rotation Custom logic in code Custom logic in loader Pipeline configuration
Scaling Lambda concurrency limits Firehose automatic; loader scales with Lambda Managed auto-scaling (OCUs)
Cost model Per invocation + duration Firehose per GB + S3 + loader Lambda Per OCU-hour + S3
Setup effort Medium (IAM + Lambda + filter) High (Firehose + S3 + loader + filter) Medium (pipeline + S3 + filter)

Rule of thumb: at low-to-medium volumes, the direct Lambda path is the simplest thing that works and you should default to it. Once you're shipping enough logs that you care about replay, batching, or backpressure - or you've already burned a few Lambdas to OpenSearch backoff - move to Firehose-to-S3. OSI is worth the switch when you want the loader stage off your plate, but it's region-gated and you still need the CloudWatch-to-S3 hop in front of it.

What Trips People Up

VPC connectivity. If your OpenSearch domain is in a VPC (and for production, it should be), Lambda or the loader needs to be in the same VPC, with a security group that allows outbound HTTPS to the OpenSearch SG, and a matching inbound rule on the OpenSearch SG. Miss this and the integration silently drops logs - no errors, no metrics, just an empty index.

IAM scope. Don't ship es:* to production. Use es:ESHttpPost and es:ESHttpPut against the specific domain ARN. Roles for the Firehose/loader path need both Firehose permissions and OpenSearch write permissions.

Lambda concurrency. A chatty log group can fan out into thousands of concurrent Lambda invocations and steamroll your cluster's bulk thread pool. Set a reserved concurrency limit - I usually start at 10-50 and raise it once I've watched bulk queue metrics for a day.

Index management. Daily indices pile up fast. Put an ISM policy in place from day one to roll or delete old indices. Skip this and you'll hit shard count limits long before storage limits, and the failure mode is ugly.

Two subscription filters per log group, max. Hard limit. If you need to fan out to more than two destinations, put a Kinesis Data Stream in front and split from there.

Wrapping Up

CloudWatch Logs to OpenSearch is a well-supported pattern, but it's less plug-and-play than the marketing implies. Lambda gives you the most control and the fewest pieces to move. Firehose-to-S3 adds a real buffer at the cost of a second stage. OSI takes the loader off your hands - if your region supports it and you're willing to land logs in S3 first.

Whichever path you pick: sign your requests, plan VPC networking up front, cap Lambda concurrency, and have an ISM policy ready before the first log lands. If you'd like help designing or scaling OpenSearch logging at your org, take a look at our OpenSearch consulting services or drop us a line - we've been running OpenSearch and Elasticsearch in production for over a decade.