Connecting RDBs and Search Engines — Chapter 5
sisiodos

sisiodos @sisiodos

About: Writing about stream processing, data pipelines, and modern data architecture using Flink, Kafka, OpenSearch, and lakehouse technologies.

Joined:
May 10, 2025

Connecting RDBs and Search Engines — Chapter 5

Publish Date: May 10
0 0

Chapter 5: Implementing a CDC Join Pipeline with Flink SQL

In this chapter, we build a full pipeline that captures CDC streams from PostgreSQL via Debezium, joins the data using Flink SQL, and stores the results in OpenSearch.

Architecture Overview

PostgreSQL
   ↓ (CDC)
Debezium (Kafka Connect)
   ↓ (debezium-json)
Kafka Topic (orders, products)
   ↓
Flink SQL
   ↓
OpenSearch (orders_with_products)
Enter fullscreen mode Exit fullscreen mode

PostgreSQL Table Setup

Add the following to postgres/01-init.sql:

-- Products table
CREATE TABLE products (
  product_id VARCHAR(32) PRIMARY KEY,
  product_name VARCHAR(255),
  category_id VARCHAR(32)
);
ALTER TABLE products REPLICA IDENTITY FULL;

-- Initial product data
INSERT INTO products (product_id, product_name, category_id) VALUES
('P001', 'Sneaker X', 'C001'),
('P002', 'Jacket Y', 'C002');

-- Orders table
CREATE TABLE orders (
  order_id VARCHAR(32) PRIMARY KEY,
  order_time TIMESTAMP,
  customer_id VARCHAR(32),
  product_id VARCHAR(32),
  quantity INT,
  price NUMERIC(10,2)
);
ALTER TABLE orders REPLICA IDENTITY FULL;

-- Initial order data
INSERT INTO orders (order_id, order_time, customer_id, product_id, quantity, price) VALUES
('O1001', '2025-04-27 10:00:00', 'CUST01', 'P001', 1, 9800.00),
('O1002', '2025-04-27 10:05:00', 'CUST02', 'P002', 2, 15800.00);

-- Grant permissions to Debezium user
GRANT SELECT ON products, orders TO debezium;

-- Add tables to existing publication
ALTER PUBLICATION debezium_pub ADD TABLE products, orders;
Enter fullscreen mode Exit fullscreen mode

Register Debezium Connector

After Kafka Connect is running, register the connector using a script such as scripts/02-debezium-table-table-join.sh. Ensure the config includes:

"table.include.list": "public.testtable,public.products,public.orders"
Enter fullscreen mode Exit fullscreen mode

Create OpenSearch Index

Prepare opensearch/orders_with_products-mapping.json:

{
  "mappings": {
    "properties": {
      "order_id": { "type": "keyword" },
      "product_id": { "type": "keyword" },
      "product_name": {
        "type": "text",
        "fields": { "raw": { "type": "keyword" } }
      },
      "category_id": { "type": "keyword" },
      "quantity": { "type": "integer" },
      "price": { "type": "double" }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Create the index:

curl -X PUT "http://localhost:9200/orders_with_products" \
  -H "Content-Type: application/json" \
  -d @opensearch/orders_with_products-mapping.json
Enter fullscreen mode Exit fullscreen mode

product_name uses a multi-field (text + keyword) for full-text and exact match support.

Example Flink SQL

Create flink/sql/table-table-join.sql:

-- Products table
CREATE TABLE products (
  product_id STRING,
  product_name STRING,
  category_id STRING,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbserver1.public.products',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset'
);

-- Orders table
CREATE TABLE orders (
  order_id STRING,
  order_time STRING,
  customer_id STRING,
  product_id STRING,
  quantity INT,
  price DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbserver1.public.orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset'
);

-- OpenSearch Sink
CREATE TABLE orders_with_products (
  order_id STRING,
  product_id STRING,
  product_name STRING,
  category_id STRING,
  quantity INT,
  price DOUBLE,
  PRIMARY KEY (order_id, product_id) NOT ENFORCED
) WITH (
  'connector' = 'opensearch-2',
  'hosts' = 'http://opensearch:9200',
  'allow-insecure' = 'true',
  'index' = 'orders_with_products',
  'document-id.key-delimiter' = '$',
  'sink.bulk-flush.max-size' = '1mb'
);

-- Join View
CREATE VIEW orders_with_products_view AS
SELECT
  o.order_id,
  o.product_id,
  p.product_name,
  p.category_id,
  o.quantity,
  o.price
FROM orders o
INNER JOIN products p ON o.product_id = p.product_id;

-- Insert into OpenSearch
INSERT INTO orders_with_products
SELECT * FROM orders_with_products_view;
Enter fullscreen mode Exit fullscreen mode

Run the Flink Job

Execute from inside the Flink container:

sql-client.sh -f /opt/flink/sql/table-table-join.sql
Enter fullscreen mode Exit fullscreen mode

Or via Docker:

docker compose exec flink-jobmanager sql-client.sh -f /opt/flink/sql/table-table-join.sql
Enter fullscreen mode Exit fullscreen mode

Validation Steps

  • Verify CDC data in Kafka topics:
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.orders --from-beginning
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.products --from-beginning
Enter fullscreen mode Exit fullscreen mode
  • Verify data in OpenSearch:
curl -X GET "http://localhost:9200/orders_with_products/_search?pretty"
Enter fullscreen mode Exit fullscreen mode

With custom query:

curl -s -X GET "http://localhost:9200/orders_with_products/_search?pretty" \
  -H 'Content-Type: application/json' \
  -d '{ "query": { "match_all": {} } }'
Enter fullscreen mode Exit fullscreen mode

For convenience, use a script like scripts/opensearch-query.sh:

#!/bin/bash

osq() {
  local index="${1:-_all}"
  local size="${2:-10}"

  curl -s -X GET "http://localhost:9200/${index}/_search?pretty" \
    -H 'Content-Type: application/json' \
    -d "{\"query\": { \"match_all\": {} }, \"size\": ${size} }"
}
Enter fullscreen mode Exit fullscreen mode

Example usage:

source scripts/opensearch-query.sh
osq orders_with_products
osq orders_with_products 20
osq orders_with_products 100 | jq '.hits.hits[]._id'
Enter fullscreen mode Exit fullscreen mode

In Chapter 6 and beyond, we will explore topics such as deduplication, batch processing, DLQ (Dead Letter Queue), and OpenSearch index partitioning strategies for production-grade pipelines.

Comments 0 total

    Add comment