Connecting RDBs and Search Engines — Chapter 3
sisiodos

sisiodos @sisiodos

About: Writing about stream processing, data pipelines, and modern data architecture using Flink, Kafka, OpenSearch, and lakehouse technologies.

Joined:
May 10, 2025

Connecting RDBs and Search Engines — Chapter 3

Publish Date: May 10
0 0

Chapter 3: Verifying the Flow from PostgreSQL to Debezium to Kafka

In this chapter, we will test the end-to-end process of capturing change data from PostgreSQL with Debezium and delivering it to Kafka.


1. Starting the Environment

Before beginning this chapter, ensure that all necessary services are running via Docker Compose:

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Verify that the following components are running:

  • PostgreSQL
  • Apache Kafka
  • Debezium Connect (Kafka Connect)
  • ZooKeeper

See docker-compose.yaml in the repository for service configuration details.


2. PostgreSQL Setup

First, we create the table that Debezium will monitor. Add the following to postgres/00-init.sql:

-- Create Debezium user
CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION;

-- Create target table
CREATE TABLE testtable (
  id INTEGER PRIMARY KEY,
  message VARCHAR(255)
);
INSERT INTO testtable (id, message) VALUES (1, 'CDC test row');

-- Grant SELECT permission
GRANT SELECT ON testtable TO debezium;

-- Create publication (for pgoutput plugin)
CREATE PUBLICATION debezium_pub FOR TABLE testtable;
Enter fullscreen mode Exit fullscreen mode

This script runs once only during initial container creation.
To apply changes to the script, you must delete persistent volumes and restart:

docker compose down --volumes
Enter fullscreen mode Exit fullscreen mode

3. PostgreSQL Container Configuration

To enable CDC using WAL (Write-Ahead Log), add the following to the docker-compose.yaml configuration:

command: >
  postgres
  -c wal_level=logical
  -c max_replication_slots=4
  -c max_wal_senders=4
Enter fullscreen mode Exit fullscreen mode

To restart PostgreSQL without losing data:

docker compose restart postgres
Enter fullscreen mode Exit fullscreen mode

To rerun initialization SQL, you’ll need to remove volumes:

docker compose down --volumes
Enter fullscreen mode Exit fullscreen mode

4. Registering the Debezium Connector

Once Debezium Connect is running, register the connector with the following command (Debezium 1.9 format):

curl -X POST "localhost:8083/connectors" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.dbname": "postgres",
      "database.server.name": "dbserver1",
      "topic.prefix": "dbserver1",
      "plugin.name": "pgoutput",
      "publication.name": "debezium_pub",
      "slot.name": "debezium_slot",
      "slot.drop.on.stop": "true",
      "table.include.list": "public.testtable",
      "snapshot.mode": "initial",
      "tombstones.on.delete": "false",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'
Enter fullscreen mode Exit fullscreen mode

About snapshot.mode

The snapshot.mode option controls whether to load the full contents of the database at connector startup.

  • initial: Load all existing data once, then capture changes (default)
  • never: Skip initial load; capture only subsequent changes

In this guide, we use initial, so existing rows are sent to Kafka with op: r (read).


5. Checking Connector Status

Check the connector status:

curl http://localhost:8083/connectors/postgres-connector/status
Enter fullscreen mode Exit fullscreen mode

Expected output:

{
  "name": "postgres-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.4.1.81:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.4.1.81:8083"
    }
  ],
  "type": "source"
}
Enter fullscreen mode Exit fullscreen mode

6. Checking Kafka Topics

List Kafka topics:

# Enter Kafka container
docker compose exec kafka bash

# List topics
kafka-topics --bootstrap-server localhost:9092 --list
Enter fullscreen mode Exit fullscreen mode

Expected output:

dbserver1.public.testtable
Enter fullscreen mode Exit fullscreen mode

7. Viewing CDC Events from Kafka

Use the following to consume events from the topic:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning
Enter fullscreen mode Exit fullscreen mode

Sample snapshot output:

{
  "before": null,
  "after": {
    "id": 1,
    "message": "CDC test row"
  },
  "op": "r"
}
Enter fullscreen mode Exit fullscreen mode

8. Confirming Live Data Changes

Add a new row in PostgreSQL:

# Log into PostgreSQL
docker compose exec postgres psql -U postgres

# Insert new row
INSERT INTO testtable (id, message) VALUES (2, 'inserted row');
Enter fullscreen mode Exit fullscreen mode

Expected Kafka output:

{
  "before": null,
  "after": {
    "id": 2,
    "message": "inserted row"
  },
  "op": "c"
}
Enter fullscreen mode Exit fullscreen mode

Debezium JSON Event Format

Debezium uses the following format for CDC messages:

op value Meaning
r Snapshot read (initial data)
c Insert (create)
u Update
d Delete

For example, the following JSON means a new row was inserted:

{
  "before": null,
  "after": {
    "id": 2,
    "message": "inserted row"
  },
  "op": "c"
}
Enter fullscreen mode Exit fullscreen mode

Troubleshooting

Topic Not Created in Kafka

If dbserver1.public.testtable doesn’t appear, check:

  • That debezium_pub exists in PostgreSQL:
SELECT * FROM pg_publication;
Enter fullscreen mode Exit fullscreen mode
  • That publication.name and slot.name are correctly set in the connector config

  • That the 00-init.sql script ran (reset if needed):

docker compose down --volumes
docker compose up -d
Enter fullscreen mode Exit fullscreen mode
  • That Kafka Connect logs are error-free:
docker compose logs connect
Enter fullscreen mode Exit fullscreen mode

No Events Displayed from Kafka

If CDC events aren’t visible in Kafka:

  • Ensure the connector is RUNNING (see step 5)
  • Ensure the inserted row has a unique ID
  • Make sure you’re consuming from the beginning:
kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning
Enter fullscreen mode Exit fullscreen mode

In this chapter, we confirmed the flow of change data from PostgreSQL to Kafka using Debezium. In the next chapter, we'll use Flink to process this data.

(Coming soon: Chapter 4 Part 1 — Outputting Kafka CDC Data to Console with Flink)

Comments 0 total

    Add comment