Practical Guide: Stream Data from Kafka to OpenSearch in Kubernetes

In this article, I'll show you how to automatically send data from a Kafka topic to OpenSearch. We'll use Kafka Connect and Strimzi in Kubernetes.

By Anatoly Zelenin

Kafka Connect is a framework for reliable and scalable data transfer between Apache Kafka and other systems. In our case, we want to transfer transaction data from a Kafka topic to an OpenSearch index. The architecture looks like this:

  1. Kafka Broker - contains our transaction data

  2. Kafka Connect - handles the transfer

  3. OpenSearch - stores the data for search and analysis

We use Strimzi, a Kubernetes operator for Apache Kafka, to manage these components.

Prerequisites

  • A running Kubernetes cluster

  • Kubectl installed and configured

  • Strimzi operator already installed

  • A Kafka cluster in the namespace kafka with a topic db-transactions.

1. Install OpenSearch in Kubernetes

First, we create a separate namespace for OpenSearch and set it as the default context:

kubectl create namespace opensearch
kubectl config set-context --current --namespace=opensearch

Next, we define the OpenSearch deployment and the associated service:

opensearch.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: opensearch
  namespace: opensearch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: opensearch
  template:
    metadata:
      labels:
        app: opensearch
    spec:
      containers:
      - name: opensearch
        image: opensearchproject/opensearch:latest
        ports:
        - containerPort: 9200
        env:
        - name: discovery.type
          value: "single-node"
        - name: plugins.security.disabled
          value: "true"
---
apiVersion: v1
kind: Service
metadata:
  name: opensearch-service
  namespace: opensearch
spec:
  selector:
    app: opensearch
  ports:
  - port: 9200
    targetPort: 9200

Do not use this configuration in production! This is a proof of concept and not a production-ready solution.

Save the above definition in a file opensearch.yaml and apply it:

kubectl apply -f opensearch.yaml

Wait until the OpenSearch pod is running:

kubectl get pods -w

Once the pod is running, we set up port forwarding to access it from our local system. Run this command in a separate terminal:

kubectl port-forward svc/opensearch-service 9200:9200

Test with a simple call to check if OpenSearch is working:

Now we create an index for our transaction data:

curl -X PUT "http://localhost:9200/db-transactions" | jq

2. Set up Kafka Connect Cluster

Now we define our Kafka Connect cluster that will establish the connection between Kafka and OpenSearch:

kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  namespace: opensearch
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.7.0
  replicas: 1
  bootstrapServers: kafka-kafka-bootstrap.kafka.svc.cluster.local:9092
  config:
    group.id: connect-cluster
    # We need the JSON Converter with Schema-Support
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true
  # My image with integrated OpenSearch-Connector
  # Instructions on how to create this image yourself can be found below
  image: ghcr.io/kafka-trainings/example-project/opensearch-connector:latest

Note the image setting, which points to a Docker image with the installed OpenSearch connector. We’re using a pre-built image here that I’ve provided for this tutorial.

Save the definition in kafka-connect.yaml, apply it, and wait until the pod is started:

kubectl apply -f kafka-connect.yaml
# It takes up to 2 minutes until the pod is ready
kubectl get pods -w

3. Create OpenSearch Connector

An important note: The standard Kafka Connect images don’t automatically include the OpenSearch connector. There are two ways to add it:

Personally, I find option 1 much cleaner and more flexible. For this example, we’re using a pre-built image that I’ve created for this article that includes the OpenSearch connector.

Here’s the Dockerfile for the image. You can use it to create your own image with the OpenSearch connector:

Dockerfile
# Builder image
FROM gradle:7.6 as builder
# We're using an older version so the blog post doesn't become outdated too quickly
RUN wget https://github.com/Aiven-Open/opensearch-connector-for-apache-kafka/archive/refs/tags/v3.1.1.tar.gz
RUN tar -xzf v3.1.1.tar.gz && \
    cd opensearch-connector-for-apache-kafka-3.1.1 && \
    ./gradlew installDist

# Image for Kafka Connect
# We're using Kafka 3.7.0 here. Please use the latest version!
FROM quay.io/strimzi/kafka:latest-kafka-3.7.0
USER root:root
# Copy connector
COPY --from=builder /home/gradle/opensearch-connector-for-apache-kafka-3.1.1/build/install/opensearch-connector-for-apache-kafka/ /opt/kafka/plugins/
USER 1001

The Dockerfile has two important phases:

  1. Builder Phase: Since the Aiven OpenSearch Connector is only available as source code on GitHub, we first need to build it with Gradle. This phase uses the gradle:7.6 image, downloads the source code, unpacks it, and builds the connector.

  2. Final Image Phase: This phase uses the official Strimzi Kafka image as a base and copies the built connector files from the builder phase into the correct plugins folder (/opt/kafka/plugins/).

This multi-stage build process keeps the final image small and clean. You can similarly include multiple connectors in a single image - just build each connector in separate builder phases and then copy all of them into the plugins folder of the final image.

4. Configure Kafka Connector

Now we define the actual connector that transfers the data from the Kafka topic db-transactions to OpenSearch:

opensearch-sink-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: opensearch-sink-connector
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  class: io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
  tasksMax: 1
  config:
    topics: "db-transactions"
    key.ignore: true
    connection.url: "http://opensearch-service.opensearch.svc.cluster.local:9200"

Save this definition in opensearch-sink-connector.yaml and apply it:

kubectl apply -f opensearch-sink-connector.yaml
kubectl describe kafkaconnector opensearch-sink-connector

5. Test the Setup

Now we can test if our connection works by writing some data to the Kafka topic.

Important: Due to our configuration (schemas.enable: true in the Connect configuration), we need to transmit the schema of the data along with the actual values. This makes the messages considerably more complex:

The schema of our transaction data looks like this:

{
"type": "struct",
"fields": [
    {
    "field": "id",
    "type": "int32"
    },
    {
    "field": "amount",
    "type": "int32"
    },
    {
    "field": "name",
    "type": "string"
    }
]
}

Our actual payload data are simple JSON objects:

{"id": 1, "amount": 100, "name": "John Doe"}
{"id": 2, "amount": 200, "name": "Jane Doe"}
{"id": 3, "amount": 300, "name": "John Smith"}

But with the schema, we have to send them in this complex format:

{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 1, "amount": 100, "name": "John Doe"}}

Tip: Manually inserting schemas makes messages confusing and error-prone. In a production environment, you should definitely use a Schema Registry that manages this complexity for you and ensures schema compatibility.

Connect to a Kafka pod and start the producer:

kubectl exec -it kafka-kafka-0 -- \
bin/kafka-console-producer.sh \
--bootstrap-server kafka-kafka-bootstrap.kafka.svc.cluster.local:9092 \
--topic db-transactions

After starting the producer, enter the following messages:

{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 1, "amount": 100, "name": "John Doe"}}
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 2, "amount": 200, "name": "Jane Doe"}}
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 3, "amount": 300, "name": "John Smith"}}

Check if the data has arrived in OpenSearch:

curl -X GET "http://localhost:9200/db-transactions/_search?q=id:1" | jq

Conclusion

We have successfully set up a data pipeline from Kafka to OpenSearch using Strimzi Kafka Connect. This method is ideal for:

  • Real-time data processing and analysis

  • Scalable data pipelines in Kubernetes

  • Automatic synchronization between Kafka and search systems

With this configuration, data sent to the Kafka topic db-transactions is automatically indexed in OpenSearch and available there for search and analysis.

Anatoly Zelenin teaches Apache Kafka to hundreds of participants in interactive training sessions. His clients from the DAX environment and German mid-sized companies have valued his expertise and inspiring approach for over a decade. In addition to being an IT consultant and trainer, he also explores our planet as an adventurer.