Chapter 4 (Part 2): Integrating Kafka CDC Data with OpenSearch Using Flink
In this chapter, we use Flink SQL to process CDC data in Debezium JSON format from Kafka and write it to OpenSearch.
1. Architecture Overview
graph TD
subgraph Source
PG[PostgreSQL]
end
subgraph CDC
DBZ[Debezium Connect]
end
subgraph Stream
TOPIC[Kafka Topic]
end
subgraph Flink
FLINK[Flink SQL Kafka Source]
OS_SINK[Flink OpenSearch Sink]
end
subgraph Search
OS[OpenSearch]
end
PG --> DBZ --> TOPIC --> FLINK --> OS_SINK --> OS
Flink consumes CDC events from Kafka, transforms them, and upserts the data into OpenSearch.
2. Prerequisites
Ensure the following OSS components are running (e.g., via Docker Compose):
- PostgreSQL
- Apache Kafka
- Kafka Connect (Debezium)
- ZooKeeper
- Flink (1.19)
- OpenSearch (2.13)
💡 See
docker-compose.yaml
for details.
3. Flink JAR Setup
Place the following JARs in flink/plugins/kafka/
, flink/plugins/opensearch
directories.
Required Connector JARs
flink-sql-connector-kafka-3.3.0-1.19.jar
flink-sql-connector-opensearch2-2.0.0-1.19.jar
💡 Note: Why use the
plugins
directory?Flink's SQL Client loads connectors via a plugin classloader.
Placing JARs directly intoflink/lib
can lead to class conflicts when used alongside the DataStream API.
Using theplugins/{connector}
directory to place connector JARs is a safer approach that prevents
classloading issues.
4. Create OpenSearch Index
Create opensearch/test-index-mapping.json
:
{
"mappings": {
"properties": {
"doc_id": { "type": "keyword" },
"id": { "type": "integer" },
"message": { "type": "text" }
}
}
}
Run:
curl -X PUT "http://localhost:9200/test-index" \
-H "Content-Type: application/json" \
-d @opensearch/test-index-mapping.json
✅ Defining mappings helps avoid automatic keyword field generation.
5. Create Flink SQL Script
Save the following as flink/sql/cdc_to_opensearch.sql
:
CREATE TABLE cdc_source (
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.testtable',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE opensearch_sink (
doc_id STRING,
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'opensearch-2',
'hosts' = 'http://opensearch:9200',
'allow-insecure' = 'true',
'index' = 'test-index',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '1mb'
);
INSERT INTO opensearch_sink
SELECT
SHA256(CAST(id AS STRING)) AS doc_id,
id,
message
FROM cdc_source;
6. Run the Flink Job
docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_opensearch.sql -l /opt/flink/plugins/kafka -l /opt/flink/plugins/
7. Verify OpenSearch Output
curl -X GET "http://localhost:9200/test-index/_search?pretty"
Sample output:
"hits" : [
{
"_index" : "test-index",
"_id" : "c4ca4238a0b923820dcc509a6f75849b",
"_source" : {
"doc_id" : "c4ca4238a0b923820dcc509a6f75849b",
"id" : 1,
"message" : "CDC test row"
}
}
]
In this chapter, we built and verified a pipeline using Flink SQL to upsert CDC data into OpenSearch.
(Coming soon: Chapter 5 — Implementing a CDC Join Pipeline with Flink SQL)