kubectl create namespace opensearch
kubectl config set-context --current --namespace=opensearch
Kafka Connect is a framework for reliable and scalable data transfer between Apache Kafka and other systems. In our case, we want to transfer transaction data from a Kafka topic to an OpenSearch index. The architecture looks like this:
Kafka Broker - contains our transaction data
Kafka Connect - handles the transfer
OpenSearch - stores the data for search and analysis
We use Strimzi, a Kubernetes operator for Apache Kafka, to manage these components.
A running Kubernetes cluster
Kubectl installed and configured
Strimzi operator already installed
A Kafka cluster in the namespace kafka
with a topic db-transactions
.
First, we create a separate namespace for OpenSearch and set it as the default context:
kubectl create namespace opensearch
kubectl config set-context --current --namespace=opensearch
Next, we define the OpenSearch deployment and the associated service:
opensearch.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: opensearch
namespace: opensearch
spec:
replicas: 1
selector:
matchLabels:
app: opensearch
template:
metadata:
labels:
app: opensearch
spec:
containers:
- name: opensearch
image: opensearchproject/opensearch:latest
ports:
- containerPort: 9200
env:
- name: discovery.type
value: "single-node"
- name: plugins.security.disabled
value: "true"
---
apiVersion: v1
kind: Service
metadata:
name: opensearch-service
namespace: opensearch
spec:
selector:
app: opensearch
ports:
- port: 9200
targetPort: 9200
Do not use this configuration in production! This is a proof of concept and not a production-ready solution.
Save the above definition in a file opensearch.yaml
and apply it:
kubectl apply -f opensearch.yaml
Wait until the OpenSearch pod is running:
kubectl get pods -w
Once the pod is running, we set up port forwarding to access it from our local system. Run this command in a separate terminal:
kubectl port-forward svc/opensearch-service 9200:9200
Test with a simple call to check if OpenSearch is working:
curl http://localhost:9200/ | jq
Now we create an index for our transaction data:
curl -X PUT "http://localhost:9200/db-transactions" | jq
Now we define our Kafka Connect cluster that will establish the connection between Kafka and OpenSearch:
kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-cluster
namespace: opensearch
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
replicas: 1
bootstrapServers: kafka-kafka-bootstrap.kafka.svc.cluster.local:9092
config:
group.id: connect-cluster
# We need the JSON Converter with Schema-Support
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
# My image with integrated OpenSearch-Connector
# Instructions on how to create this image yourself can be found below
image: ghcr.io/kafka-trainings/example-project/opensearch-connector:latest
Note the image
setting, which points to a Docker image with the installed OpenSearch connector. We’re using a pre-built image here that I’ve provided for this tutorial.
Save the definition in kafka-connect.yaml
, apply it, and wait until the pod is started:
kubectl apply -f kafka-connect.yaml
# It takes up to 2 minutes until the pod is ready
kubectl get pods -w
An important note: The standard Kafka Connect images don’t automatically include the OpenSearch connector. There are two ways to add it:
Personally, I find option 1 much cleaner and more flexible. For this example, we’re using a pre-built image that I’ve created for this article that includes the OpenSearch connector.
Here’s the Dockerfile for the image. You can use it to create your own image with the OpenSearch connector:
Dockerfile
# Builder image
FROM gradle:7.6 as builder
# We're using an older version so the blog post doesn't become outdated too quickly
RUN wget https://github.com/Aiven-Open/opensearch-connector-for-apache-kafka/archive/refs/tags/v3.1.1.tar.gz
RUN tar -xzf v3.1.1.tar.gz && \
cd opensearch-connector-for-apache-kafka-3.1.1 && \
./gradlew installDist
# Image for Kafka Connect
# We're using Kafka 3.7.0 here. Please use the latest version!
FROM quay.io/strimzi/kafka:latest-kafka-3.7.0
USER root:root
# Copy connector
COPY --from=builder /home/gradle/opensearch-connector-for-apache-kafka-3.1.1/build/install/opensearch-connector-for-apache-kafka/ /opt/kafka/plugins/
USER 1001
The Dockerfile has two important phases:
Builder Phase: Since the Aiven OpenSearch Connector is only available as source code on GitHub, we first need to build it with Gradle. This phase uses the gradle:7.6
image, downloads the source code, unpacks it, and builds the connector.
Final Image Phase: This phase uses the official Strimzi Kafka image as a base and copies the built connector files from the builder phase into the correct plugins folder (/opt/kafka/plugins/
).
This multi-stage build process keeps the final image small and clean. You can similarly include multiple connectors in a single image - just build each connector in separate builder phases and then copy all of them into the plugins folder of the final image.
Now we define the actual connector that transfers the data from the Kafka topic db-transactions
to OpenSearch:
opensearch-sink-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: opensearch-sink-connector
labels:
strimzi.io/cluster: connect-cluster
spec:
class: io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
tasksMax: 1
config:
topics: "db-transactions"
key.ignore: true
connection.url: "http://opensearch-service.opensearch.svc.cluster.local:9200"
Save this definition in opensearch-sink-connector.yaml
and apply it:
kubectl apply -f opensearch-sink-connector.yaml
kubectl describe kafkaconnector opensearch-sink-connector
Now we can test if our connection works by writing some data to the Kafka topic.
Important: Due to our configuration (schemas.enable: true
in the Connect configuration), we need to transmit the schema of the data along with the actual values. This makes the messages considerably more complex:
The schema of our transaction data looks like this:
{
"type": "struct",
"fields": [
{
"field": "id",
"type": "int32"
},
{
"field": "amount",
"type": "int32"
},
{
"field": "name",
"type": "string"
}
]
}
Our actual payload data are simple JSON objects:
{"id": 1, "amount": 100, "name": "John Doe"}
{"id": 2, "amount": 200, "name": "Jane Doe"}
{"id": 3, "amount": 300, "name": "John Smith"}
But with the schema, we have to send them in this complex format:
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 1, "amount": 100, "name": "John Doe"}}
Tip: Manually inserting schemas makes messages confusing and error-prone. In a production environment, you should definitely use a Schema Registry that manages this complexity for you and ensures schema compatibility.
Connect to a Kafka pod and start the producer:
kubectl exec -it kafka-kafka-0 -- \
bin/kafka-console-producer.sh \
--bootstrap-server kafka-kafka-bootstrap.kafka.svc.cluster.local:9092 \
--topic db-transactions
After starting the producer, enter the following messages:
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 1, "amount": 100, "name": "John Doe"}}
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 2, "amount": 200, "name": "Jane Doe"}}
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 3, "amount": 300, "name": "John Smith"}}
Check if the data has arrived in OpenSearch:
curl -X GET "http://localhost:9200/db-transactions/_search?q=id:1" | jq
We have successfully set up a data pipeline from Kafka to OpenSearch using Strimzi Kafka Connect. This method is ideal for:
Real-time data processing and analysis
Scalable data pipelines in Kubernetes
Automatic synchronization between Kafka and search systems
With this configuration, data sent to the Kafka topic db-transactions
is automatically indexed in OpenSearch and available there for search and analysis.