Connecting RDBs and Search Engines — Chapter 4 Part 2
sisiodos

sisiodos @sisiodos

About: Writing about stream processing, data pipelines, and modern data architecture using Flink, Kafka, OpenSearch, and lakehouse technologies.

Joined:
May 10, 2025

Connecting RDBs and Search Engines — Chapter 4 Part 2

Publish Date: May 10
0 0

Chapter 4 (Part 2): Integrating Kafka CDC Data with OpenSearch Using Flink

In this chapter, we use Flink SQL to process CDC data in Debezium JSON format from Kafka and write it to OpenSearch.


1. Architecture Overview

graph TD
  subgraph Source
    PG[PostgreSQL]
  end
  subgraph CDC
    DBZ[Debezium Connect]
  end
  subgraph Stream
    TOPIC[Kafka Topic]
  end
  subgraph Flink
    FLINK[Flink SQL Kafka Source]
    OS_SINK[Flink OpenSearch Sink]
  end
  subgraph Search
    OS[OpenSearch]
  end

  PG --> DBZ --> TOPIC --> FLINK --> OS_SINK --> OS
Enter fullscreen mode Exit fullscreen mode

Flink consumes CDC events from Kafka, transforms them, and upserts the data into OpenSearch.


2. Prerequisites

Ensure the following OSS components are running (e.g., via Docker Compose):

  • PostgreSQL
  • Apache Kafka
  • Kafka Connect (Debezium)
  • ZooKeeper
  • Flink (1.19)
  • OpenSearch (2.13)

💡 See docker-compose.yaml for details.


3. Flink JAR Setup

Place the following JARs in flink/plugins/kafka/, flink/plugins/opensearch directories.

Required Connector JARs

  • flink-sql-connector-kafka-3.3.0-1.19.jar
  • flink-sql-connector-opensearch2-2.0.0-1.19.jar

💡 Note: Why use the plugins directory?

Flink's SQL Client loads connectors via a plugin classloader.
Placing JARs directly into flink/lib can lead to class conflicts when used alongside the DataStream API.
Using the plugins/{connector} directory to place connector JARs is a safer approach that prevents
classloading issues
.


4. Create OpenSearch Index

Create opensearch/test-index-mapping.json:

{
  "mappings": {
    "properties": {
      "doc_id": { "type": "keyword" },
      "id": { "type": "integer" },
      "message": { "type": "text" }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Run:

curl -X PUT "http://localhost:9200/test-index" \
  -H "Content-Type: application/json" \
  -d @opensearch/test-index-mapping.json
Enter fullscreen mode Exit fullscreen mode

✅ Defining mappings helps avoid automatic keyword field generation.


5. Create Flink SQL Script

Save the following as flink/sql/cdc_to_opensearch.sql:

CREATE TABLE cdc_source (
  id INT,
  message STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbserver1.public.testtable',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE opensearch_sink (
  doc_id STRING,
  id INT,
  message STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'opensearch-2',
  'hosts' = 'http://opensearch:9200',
  'allow-insecure' = 'true',
  'index' = 'test-index',
  'document-id.key-delimiter' = '$',
  'sink.bulk-flush.max-size' = '1mb'
);

INSERT INTO opensearch_sink
SELECT
  SHA256(CAST(id AS STRING)) AS doc_id,
  id,
  message
FROM cdc_source;
Enter fullscreen mode Exit fullscreen mode

6. Run the Flink Job

docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_opensearch.sql -l /opt/flink/plugins/kafka -l /opt/flink/plugins/
Enter fullscreen mode Exit fullscreen mode

7. Verify OpenSearch Output

curl -X GET "http://localhost:9200/test-index/_search?pretty"
Enter fullscreen mode Exit fullscreen mode

Sample output:

"hits" : [
  {
    "_index" : "test-index",
    "_id" : "c4ca4238a0b923820dcc509a6f75849b",
    "_source" : {
      "doc_id" : "c4ca4238a0b923820dcc509a6f75849b",
      "id" : 1,
      "message" : "CDC test row"
    }
  }
]
Enter fullscreen mode Exit fullscreen mode

In this chapter, we built and verified a pipeline using Flink SQL to upsert CDC data into OpenSearch.

(Coming soon: Chapter 5 — Implementing a CDC Join Pipeline with Flink SQL)

Comments 0 total

    Add comment