Chapter 5: Implementing a CDC Join Pipeline with Flink SQL
In this chapter, we build a full pipeline that captures CDC streams from PostgreSQL via Debezium, joins the data using Flink SQL, and stores the results in OpenSearch.
Architecture Overview
PostgreSQL
↓ (CDC)
Debezium (Kafka Connect)
↓ (debezium-json)
Kafka Topic (orders, products)
↓
Flink SQL
↓
OpenSearch (orders_with_products)
PostgreSQL Table Setup
Add the following to postgres/01-init.sql
:
-- Products table
CREATE TABLE products (
product_id VARCHAR(32) PRIMARY KEY,
product_name VARCHAR(255),
category_id VARCHAR(32)
);
ALTER TABLE products REPLICA IDENTITY FULL;
-- Initial product data
INSERT INTO products (product_id, product_name, category_id) VALUES
('P001', 'Sneaker X', 'C001'),
('P002', 'Jacket Y', 'C002');
-- Orders table
CREATE TABLE orders (
order_id VARCHAR(32) PRIMARY KEY,
order_time TIMESTAMP,
customer_id VARCHAR(32),
product_id VARCHAR(32),
quantity INT,
price NUMERIC(10,2)
);
ALTER TABLE orders REPLICA IDENTITY FULL;
-- Initial order data
INSERT INTO orders (order_id, order_time, customer_id, product_id, quantity, price) VALUES
('O1001', '2025-04-27 10:00:00', 'CUST01', 'P001', 1, 9800.00),
('O1002', '2025-04-27 10:05:00', 'CUST02', 'P002', 2, 15800.00);
-- Grant permissions to Debezium user
GRANT SELECT ON products, orders TO debezium;
-- Add tables to existing publication
ALTER PUBLICATION debezium_pub ADD TABLE products, orders;
Register Debezium Connector
After Kafka Connect is running, register the connector using a script such as scripts/02-debezium-table-table-join.sh
. Ensure the config includes:
"table.include.list": "public.testtable,public.products,public.orders"
Create OpenSearch Index
Prepare opensearch/orders_with_products-mapping.json
:
{
"mappings": {
"properties": {
"order_id": { "type": "keyword" },
"product_id": { "type": "keyword" },
"product_name": {
"type": "text",
"fields": { "raw": { "type": "keyword" } }
},
"category_id": { "type": "keyword" },
"quantity": { "type": "integer" },
"price": { "type": "double" }
}
}
}
Create the index:
curl -X PUT "http://localhost:9200/orders_with_products" \
-H "Content-Type: application/json" \
-d @opensearch/orders_with_products-mapping.json
✅
product_name
uses a multi-field (text + keyword) for full-text and exact match support.
Example Flink SQL
Create flink/sql/table-table-join.sql
:
-- Products table
CREATE TABLE products (
product_id STRING,
product_name STRING,
category_id STRING,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.products',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
-- Orders table
CREATE TABLE orders (
order_id STRING,
order_time STRING,
customer_id STRING,
product_id STRING,
quantity INT,
price DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
-- OpenSearch Sink
CREATE TABLE orders_with_products (
order_id STRING,
product_id STRING,
product_name STRING,
category_id STRING,
quantity INT,
price DOUBLE,
PRIMARY KEY (order_id, product_id) NOT ENFORCED
) WITH (
'connector' = 'opensearch-2',
'hosts' = 'http://opensearch:9200',
'allow-insecure' = 'true',
'index' = 'orders_with_products',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '1mb'
);
-- Join View
CREATE VIEW orders_with_products_view AS
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category_id,
o.quantity,
o.price
FROM orders o
INNER JOIN products p ON o.product_id = p.product_id;
-- Insert into OpenSearch
INSERT INTO orders_with_products
SELECT * FROM orders_with_products_view;
Run the Flink Job
Execute from inside the Flink container:
sql-client.sh -f /opt/flink/sql/table-table-join.sql
Or via Docker:
docker compose exec flink-jobmanager sql-client.sh -f /opt/flink/sql/table-table-join.sql
Validation Steps
- Verify CDC data in Kafka topics:
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.orders --from-beginning
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.products --from-beginning
- Verify data in OpenSearch:
curl -X GET "http://localhost:9200/orders_with_products/_search?pretty"
With custom query:
curl -s -X GET "http://localhost:9200/orders_with_products/_search?pretty" \
-H 'Content-Type: application/json' \
-d '{ "query": { "match_all": {} } }'
For convenience, use a script like scripts/opensearch-query.sh
:
#!/bin/bash
osq() {
local index="${1:-_all}"
local size="${2:-10}"
curl -s -X GET "http://localhost:9200/${index}/_search?pretty" \
-H 'Content-Type: application/json' \
-d "{\"query\": { \"match_all\": {} }, \"size\": ${size} }"
}
Example usage:
source scripts/opensearch-query.sh
osq orders_with_products
osq orders_with_products 20
osq orders_with_products 100 | jq '.hits.hits[]._id'
In Chapter 6 and beyond, we will explore topics such as deduplication, batch processing, DLQ (Dead Letter Queue), and OpenSearch index partitioning strategies for production-grade pipelines.