Connecting RDBs and Search Engines — Chapter 4 Part 1
sisiodos

sisiodos @sisiodos

About: Writing about stream processing, data pipelines, and modern data architecture using Flink, Kafka, OpenSearch, and lakehouse technologies.

Joined:
May 10, 2025

Connecting RDBs and Search Engines — Chapter 4 Part 1

Publish Date: May 10
0 0

Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink

In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka.


1. Prerequisites

Ensure the following components are already running:

  • PostgreSQL
  • Apache Kafka
  • Kafka Connect
  • ZooKeeper
  • Flink

Refer to Chapter 3 for details on setting up the Debezium → Kafka pipeline.


2. Architecture Overview

graph TD
  subgraph source
    PG[PostgreSQL]
  end
  subgraph Change Data Capture
    DBZ[Debezium Connect]
  end
  subgraph stream platform
    TOPIC[Kafka Topic]
  end
  subgraph stream processing
    FLINK[Flink SQL Kafka Source]
    PRINT[Flink Print Sink Console]
  end

  PG --> DBZ --> TOPIC --> FLINK --> PRINT
Enter fullscreen mode Exit fullscreen mode

We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like +I[...].


3. Add Kafka Connector to Flink

Add the Kafka SQL connector JAR to Flink:

  • flink-sql-connector-kafka-3.3.0-1.19.jar

⚠️ To avoid interference with Flink core libraries, place the JAR in flink/lib/ext/ and copy it into /opt/flink/lib/ at startup.

docker-compose.yaml Excerpt

flink-jobmanager:
  image: flink:1.19
  command: ["/bin/bash", "/jobmanager-entrypoint.sh"]
  volumes:
    - ./flink/sql:/opt/flink/sql
    - ./flink/lib/ext:/opt/flink/lib/ext
    - ./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh
Enter fullscreen mode Exit fullscreen mode

jobmanager-entrypoint.sh Example

#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager
Enter fullscreen mode Exit fullscreen mode

Apply a similar setup to TaskManager to copy the JAR.


4. Write Flink SQL Script

Save the following SQL in flink/sql/cdc_to_console.sql:

CREATE TABLE cdc_source (
  id INT,
  message STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbserver1.public.testtable',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE print_sink (
  id INT,
  message STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO print_sink
SELECT * FROM cdc_source;
Enter fullscreen mode Exit fullscreen mode

Explanation of Flink SQL Tables

Kafka Source Table: cdc_source

Property Description
connector = 'kafka' Reads data from a Kafka topic
format = 'debezium-json' Handles JSON messages in Debezium format
scan.startup.mode = 'earliest-offset' Reads from the earliest offset
PRIMARY KEY (...) NOT ENFORCED Defines a primary key without enforcement

Console Sink Table: print_sink

  • Uses Flink's internal print connector to write output to stdout.

5. Run the Flink SQL Job

docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_console.sql
Enter fullscreen mode Exit fullscreen mode

Verify Running Jobs

docker compose exec flink-jobmanager bash
flink list
Enter fullscreen mode Exit fullscreen mode

Expected output:

------------------ Running/Restarting Jobs -------------------
<job-id> : insert-into_default_catalog.default_database.print_sink (RUNNING)
Enter fullscreen mode Exit fullscreen mode

6. Check the Output

docker compose logs flink-taskmanager
Enter fullscreen mode Exit fullscreen mode

Expected output:

flink-taskmanager-1 | +I[1, CDC test row]
Enter fullscreen mode Exit fullscreen mode

Why TaskManager?

  • The print sink outputs to the logs of the TaskManager running the job.
  • Use docker compose logs flink-taskmanager to view the output.

7. Troubleshooting

SQL Client Doesn’t Start

  • Check if JobManager is running:
docker compose logs flink-jobmanager
Enter fullscreen mode Exit fullscreen mode

No CDC Output Appears

  • Is the Kafka topic name correct?
  • Is there data in the Kafka topic?
  • Is scan.startup.mode set to earliest-offset?

Check topic content:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning
Enter fullscreen mode Exit fullscreen mode

Bonus: Observing Parallelism

If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs.

This allows you to observe parallel execution, slot allocation, and subtask distribution.


In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence.

(Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)

Comments 0 total

    Add comment