Praxisguide: Streame Daten von Kafka nach OpenSearch in Kubernetes

In diesem Artikel zeige ich, wie du Daten aus einem Kafka Topic automatisch nach OpenSearch senden kannst. Wir nutzen dafür Kafka Connect und Strimzi in Kubernetes.

Von Anatoly Zelenin

Kafka Connect ist ein Framework zur verlässlichen und skalierbaren Datenübertragung zwischen Apache Kafka und anderen Systemen. In unserem Fall wollen wir Transaktionsdaten aus einem Kafka Topic in einen OpenSearch Index übertragen. Die Architektur sieht so aus:

  1. Kafka Broker - enthält unsere Transaktionsdaten

  2. Kafka Connect - übernimmt die Übertragung

  3. OpenSearch - speichert die Daten für Suche und Analyse

Wir verwenden Strimzi, einen Kubernetes Operator für Apache Kafka, um diese Komponenten zu verwalten.

Voraussetzungen

  • Ein laufender Kubernetes Cluster

  • Kubectl installiert und konfiguriert

  • Strimzi Operator bereits installiert

  • Ein Kafka Cluster im Namespace kafka mit einem Topic db-transactions.

1. OpenSearch in Kubernetes installieren

Zuerst erstellen wir einen eigenen Namespace für OpenSearch und setzen ihn als Standardkontext:

kubectl create namespace opensearch
kubectl config set-context --current --namespace=opensearch

Als nächstes definieren wir die OpenSearch-Deployment und den zugehörigen Service:

opensearch.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: opensearch
  namespace: opensearch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: opensearch
  template:
    metadata:
      labels:
        app: opensearch
    spec:
      containers:
      - name: opensearch
        image: opensearchproject/opensearch:latest
        ports:
        - containerPort: 9200
        env:
        - name: discovery.type
          value: "single-node"
        - name: plugins.security.disabled
          value: "true"
---
apiVersion: v1
kind: Service
metadata:
  name: opensearch-service
  namespace: opensearch
spec:
  selector:
    app: opensearch
  ports:
  - port: 9200
    targetPort: 9200

Diese Konfiguration ist nur für Testzwecke gedacht und darf in einer Produktionsumgebung nicht verwendet werden!

Speichere die obige Definition in einer Datei opensearch.yaml und wende sie an:

kubectl apply -f opensearch.yaml

Warte, bis der OpenSearch-Pod läuft:

kubectl get pods -w

Sobald der Pod läuft, richten wir Port-Forwarding ein, um von unserem lokalen System darauf zugreifen zu können. Lass dieses Kommando in einem separaten Terminal laufen:

kubectl port-forward svc/opensearch-service 9200:9200

Teste mit einem einfachen Aufruf, ob OpenSearch funktioniert:

Nun erstellen wir einen Index für unsere Transaktionsdaten:

curl -X PUT "http://localhost:9200/db-transactions" | jq

2. Kafka Connect Cluster einrichten

Jetzt definieren wir unseren Kafka Connect Cluster, der die Verbindung zwischen Kafka und OpenSearch herstellen wird:

kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  namespace: opensearch
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.7.0
  replicas: 1
  bootstrapServers: kafka-kafka-bootstrap.kafka.svc.cluster.local:9092
  config:
    group.id: connect-cluster
    # Wir benötigen den JSON Converter mit Schema-Support
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true
  # Mein Image mit integriertem OpenSearch-Connector
  # Anleitung, wie du dieses Image selbst erstellen kannst, findest du weiter unten
  image: ghcr.io/kafka-trainings/example-project/opensearch-connector:latest

Beachte die image-Einstellung, die auf ein Docker-Image mit installiertem OpenSearch-Connector verweist. Wir verwenden hier ein bereits vorgefertigtes Image, das ich für dieses Tutorial bereitgestellt habe.

Speichere die Definition in kafka-connect.yaml, wende sie an und warte, bis der Pod gestartet ist:

kubectl apply -f kafka-connect.yaml
# Es dauert bis zu 2 Minuten
kubectl get pods -w

3. OpenSearch Connector erstellen

Ein wichtiger Hinweis: Die Standard-Kafka-Connect-Images enthalten nicht automatisch den OpenSearch-Connector. Es gibt zwei Möglichkeiten, diesen hinzuzufügen:

Ich persönlich finde Option 1 viel sauberer und flexibler. Für dieses Beispiel verwenden wir ein von mir vorgefertigtes Image, das den OpenSearch-Connector enthält und das ich für diesen Artikel erstellt habe.

Hier ist das Dockerfile für das Image. Du kannst es verwenden, um ein eigenes Image mit dem OpenSearch-Connector zu erstellen:

Dockerfile
# Builder image
FROM gradle:7.6 as builder
# Wir nutzen eine ältere Version, damit der Blogpost nicht so schnell veraltet
RUN wget https://github.com/Aiven-Open/opensearch-connector-for-apache-kafka/archive/refs/tags/v3.1.1.tar.gz
RUN tar -xzf v3.1.1.tar.gz && \
    cd opensearch-connector-for-apache-kafka-3.1.1 && \
    ./gradlew installDist

# Image für Kafka Connect
# Wir verwenden hier Kafka 3.7.0. Nutze bitte die neueste Version!
FROM quay.io/strimzi/kafka:latest-kafka-3.7.0
USER root:root
# Connector kopieren
COPY --from=builder /home/gradle/opensearch-connector-for-apache-kafka-3.1.1/build/install/opensearch-connector-for-apache-kafka/ /opt/kafka/plugins/
USER 1001

Das Dockerfile hat zwei wichtige Phasen:

  1. Builder-Phase: Da der Aiven OpenSearch Connector nur als Quellcode auf GitHub verfügbar ist, müssen wir ihn zunächst mit Gradle bauen. Diese Phase verwendet das gradle:7.6 Image, lädt den Quellcode herunter, entpackt ihn und baut den Connector.

  2. Final-Image-Phase: Diese Phase verwendet das offizielle Strimzi Kafka-Image als Basis und kopiert die gebauten Connector-Dateien aus der Builder-Phase in den richtigen Plugins-Ordner (/opt/kafka/plugins/).

Dieses mehrstufige Build-Verfahren (Multi-stage build) hält das finale Image klein und sauber. Du kannst auf ähnliche Weise auch mehrere Connectoren in ein einziges Image einbinden - baue einfach jeden Connector in separaten Builder-Phasen und kopiere dann alle in den Plugins-Ordner des finalen Images.

4. Kafka Connector konfigurieren

Jetzt definieren wir den eigentlichen Connector, der die Daten aus dem Kafka-Topic db-transactions in OpenSearch überträgt:

opensearch-sink-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: opensearch-sink-connector
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  class: io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
  tasksMax: 1
  config:
    topics: "db-transactions"
    key.ignore: true
    connection.url: "http://opensearch-service.opensearch.svc.cluster.local:9200"

Speichere diese Definition in opensearch-sink-connector.yaml und wende sie an:

kubectl apply -f opensearch-sink-connector.yaml
kubectl describe kafkaconnector opensearch-sink-connector

5. Testen des Setups

Jetzt können wir testen, ob unsere Verbindung funktioniert, indem wir einige Daten ins Kafka-Topic schreiben.

Wichtig: Aufgrund unserer Konfiguration (schemas.enable: true in der Connect-Konfiguration) müssen wir das Schema der Daten zusammen mit den eigentlichen Werten übermitteln. Das macht die Nachrichten deutlich komplexer:

Das Schema unserer Transaktionsdaten sieht so aus:

{
"type": "struct",
"fields": [
    {
    "field": "id",
    "type": "int32"
    },
    {
    "field": "amount",
    "type": "int32"
    },
    {
    "field": "name",
    "type": "string"
    }
]
}

Unsere eigentlichen Nutzdaten sind einfache JSON-Objekte:

{"id": 1, "amount": 100, "name": "John Doe"}
{"id": 2, "amount": 200, "name": "Jane Doe"}
{"id": 3, "amount": 300, "name": "John Smith"}

Aber mit Schema müssen wir sie in diesem komplexen Format senden:

{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 1, "amount": 100, "name": "John Doe"}}

Tipp: Das manuelle Einfügen von Schemas macht die Nachrichten unübersichtlich und fehleranfällig. In einer Produktionsumgebung solltest du unbedingt eine Schema Registry verwenden, die diese Komplexität für dich verwaltet und sicherstellt, dass Schema-Kompatibilität gewährleistet ist.

Verbinde dich mit einem Kafka-Pod und starte den Producer:

kubectl exec -it kafka-kafka-0 -- \
bin/kafka-console-producer.sh \
--bootstrap-server kafka-kafka-bootstrap.kafka.svc.cluster.local:9092 \
--topic db-transactions

Gib nach dem Starten des Producers folgende Nachrichten ein:

{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 1, "amount": 100, "name": "John Doe"}}
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 2, "amount": 200, "name": "Jane Doe"}}
{"schema": {"type": "struct", "fields": [{"field": "id", "type": "int32"}, {"field": "amount", "type": "int32"}, {"field": "name", "type": "string"}]}, "payload": {"id": 3, "amount": 300, "name": "John Smith"}}

Überprüfe, ob die Daten in OpenSearch angekommen sind:

curl -X GET "http://localhost:9200/db-transactions/_search?q=id:1" | jq

Fazit

Wir haben erfolgreich einen Datenpipeline von Kafka nach OpenSearch unter Verwendung von Strimzi Kafka Connect eingerichtet. Diese Methode ist ideal für:

  • Echtzeit-Datenverarbeitung und -analyse

  • Skalierbare Datenpipelines in Kubernetes

  • Automatische Synchronisation zwischen Kafka und Suchsystemen

Mit dieser Konfiguration werden Daten, die in das Kafka-Topic db-transactions gesendet werden, automatisch in OpenSearch indexiert und stehen dort für Suche und Analyse zur Verfügung.

Anatoly Zelenin vermittelt als IT-Trainer hunderten Teilnehmern Apache Kafka in interaktiven Schulungen. Seine Kunden aus dem DAX-Umfeld und dem deutschen Mittelstand schätzen seit über einem Jahrzehnt seine Expertise und seine begeisternde Art. Darüber hinaus ist er nicht nur IT-Berater und -Trainer, sondern erkundet auch als Abenteurer unseren Planeten.