Kafka Connect Configuration
This document outlines the key configuration concepts for Kafka Connect in the MSR (Multi-Session Replay) module.
1. Prerequisites
PostgreSQL WAL Configuration
The source PostgreSQL database MUST have Write-Ahead Logging (WAL) level set to logical. Without this, Debezium cannot capture changes.
-- Check current WAL level
SHOW wal_level;
-- Must return: 'logical'
If not set to logical, configure it:
# Option 1: Server startup command
postgres -c wal_level=logical
# Option 2: In postgresql.conf
wal_level = logical
# Note: Requires database restart
Table REPLICA IDENTITY
Before configuring connectors, set REPLICA IDENTITY FULL for tables you want to replicate:
ALTER TABLE <table_name> REPLICA IDENTITY FULL;
Example for table geo_entity in schema gis:
ALTER TABLE gis.geo_entity REPLICA IDENTITY FULL;
- DEFAULT: Only captures primary key in UPDATE/DELETE events
- FULL: Captures complete row data for all operations
- MSR Requirement: FULL is required to properly reconstruct entity states
Database User Permissions
The database user used by Debezium must have:
-- Grant replication privilege
ALTER USER debezium_user REPLICATION;
-- Grant SELECT on tables to replicate
GRANT SELECT ON ALL TABLES IN SCHEMA target_schema TO debezium_user;
2. Source Connector Configuration
Source connectors capture database changes and stream them to Kafka topics. Here are the key configuration areas:
Database Connection
Establishes connection to the PostgreSQL source database. The connector uses these credentials to connect to the database and access the transaction log for change data capture.
connector.class: Specifies the Debezium PostgreSQL connectordatabase.hostname/port: Database server locationdatabase.user/password: Authentication credentialsdatabase.dbname: Source database name
Kafka Topics
Controls how Kafka topics are named and which database tables are monitored for changes.
database.server.name: Logical name for the database server (used in topic naming)topic.prefix: Prefix for all generated Kafka topicstable.include.list: Specific tables to monitor for changes
Topics are automatically created with format: {topic.prefix}.{schema}.{table}
Replication Slot
PostgreSQL replication slots ensure reliable change data capture by maintaining a consistent snapshot of the database state.
plugin.name: PostgreSQL logical decoding plugin (typicallypgoutput)slot.name: Unique identifier for this replication slot (must be unique across entire Kafka Connect cluster)
Data Transformations
Transforms modify the raw change events into the format required by MSR:
unwrap: Extracts the actual data from Debezium's change event envelopeconvert: Converts timestamps to proper formatrename: Renames fields to match MSR expectationsrestructure: Uses JSLT to transform data structure into MSR's CDC event format
The table_name field is automatically populated in the format schema.table_name (e.g., gis.geo_entity) by the JSLT transformation. This field is crucial for MSR to identify the source table of each CDC event, especially when multiple tables are being replicated to the same MSR instance.
The transformation configuration shown here is specific to the gis.geo_entity table structure. Each table may require different transformation logic, particularly in the restructure transform's JSLT expression, to properly map the source table fields to the MSR CDC event format. You'll need to customize the JSLT transformation based on your specific table schema and field requirements.
Expected Kafka Message Format
After all transformations are applied, the messages in Kafka/Redpanda will have the following structure:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "entity_id"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "event_timestamp"
},
{
"type": "string",
"optional": true,
"field": "table_name"
},
{
"type": "string",
"optional": true,
"field": "data"
}
],
"optional": false
},
"payload": {
"entity_id": "38138e8e",
"op": "u",
"event_timestamp": 1756095710699,
"table_name": "gis.geo_entity",
"data": "{\"id\":\"45de16a6-fef6-45f0-a552-f460be945272\",\"created_at\":\"2025-08-25T04:19:05.799000Z\",\"updated_at\":\"2025-08-25T04:21:50.694000Z\",\"created_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"updated_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"tenant_id\":\"6c17fb87-d715-439f-943f-69f01318adac\",\"occ_lock\":21,\"entity_type\":\"track\",\"geojson\":\"{\\\"type\\\": \\\"Feature\\\", \\\"geometry\\\": {\\\"type\\\": \\\"Point\\\", \\\"coordinates\\\": [103.715, 1.354]}, \\\"properties\\\": {\\\"alt\\\": \\\"0\\\", \\\"eta\\\": \\\"\\\", \\\"hex\\\": \\\"76E9F2\\\", \\\"lat\\\": 1.354, \\\"lon\\\": 103.715, \\\"reg\\\": \\\"\\\", \\\"kind\\\": \\\"aircraft\\\", \\\"type\\\": \\\"GRND\\\", \\\"track\\\": \\\"90\\\", \\\"flight\\\": \\\"2\\\", \\\"gspeed\\\": \\\"8\\\", \\\"source\\\": \\\"ADSB\\\", \\\"squawk\\\": \\\"0\\\", \\\"vspeed\\\": \\\"0\\\", \\\"fr24_id\\\": \\\"38138e8e\\\", \\\"callsign\\\": \\\"ATT06\\\", \\\"dest_iata\\\": \\\"\\\", \\\"dest_icao\\\": \\\"\\\", \\\"orig_iata\\\": \\\"\\\", \\\"orig_icao\\\": \\\"\\\", \\\"timestamp\\\": \\\"2024-11-22T13:25:33Z\\\", \\\"created_at\\\": \\\"1732281939052\\\", \\\"painted_as\\\": \\\"\\\", \\\"operating_as\\\": \\\"\\\"}}\"}"
}
}
Payload Fields:
entity_id: Unique identifier for the entity (extracted from the original record)op: Operation type (c=create,u=update,d=delete,r=read/initial snapshot)event_timestamp: Unix timestamp in milliseconds when the change occurredtable_name: Source table in formatschema.table_name(e.g.,gis.geo_entity)data: JSON string containing the complete record data after the change
The table_name field was added in to support multi-table CDC tracking. This field:
- Identifies the source table for each CDC event
- Enables filtering and grouping of events by source table
- Is automatically populated by the JSLT transformation
- Must be included in all source connector configurations
3. Sink Connector Configuration
The sink connector consumes processed change events from Kafka topics and stores them in the MSR database:
Database Connection
Establishes connection to the target MSR database where change events will be stored.
connector.class: JDBC sink connector for PostgreSQLconnection.url: JDBC connection string to MSR databaseconnection.user/password: Database authentication
Topic and Table Mapping
Defines which Kafka topics to consume and where to store the data.
topics: Comma-separated list of Kafka topics to consumetable.name.format: Target table name in MSR databaseinsert.mode: How data is inserted (typically "insert" for append-only)
Data Transforms
Final transformations before storing in MSR database:
RenameField: Renamesdatafield toentity_stateto match MSR schemaConvertTimestamp: Ensures timestamp fields are in correct database format
Complete Configuration Example
Here's a complete example for setting up both source and sink connectors for the gis.geo_entity table:
- Source Connector
- Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "gis-geo-entity-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "gis-db",
"database.port": "5432",
"database.user": "${GIS_DB_USER}",
"database.password": "${GIS_DB_PASSWORD}",
"database.dbname": "gis",
"database.server.name": "gis_server",
"topic.prefix": "gis_server",
"table.include.list": "gis.geo_entity",
"plugin.name": "pgoutput",
"slot.name": "debezium_gis_geo_entity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,convert,rename,drops,restructure",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms,source.schema,source.table",
"transforms.convert.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type": "Timestamp",
"transforms.convert.field": "__source_ts_ms",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "__source_ts_ms:event_timestamp,__op:op",
"transforms.drops.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.drops.exclude": "__deleted",
"transforms.restructure.type": "at.willhaben.kafka.connect.transforms.jslt.JsltTransform$Value",
"transforms.restructure.jslt": "{ \"entity_id\":.payload.entity_id, \"op\":.payload.op, \"event_timestamp\":.payload.event_timestamp, \"table_name\": .payload.__source_schema + \".\" + .payload.__source_table, \"data\": to-json( { \"id\":.payload.id, \"created_at\":.payload.created_at, \"updated_at\":.payload.updated_at, \"created_by\":.payload.created_by, \"updated_by\":.payload.updated_by, \"tenant_id\":.payload.tenant_id, \"occ_lock\":.payload.occ_lock, \"entity_type\":.payload.entity_type, \"geojson\":.payload.geojson }) }"
}
}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "cdc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "gis_server.gis.geo_entity",
"connection.url": "jdbc:postgresql://msr-database:5432/msr",
"connection.user": "${MSR_DB_USER}",
"connection.password": "${MSR_DB_PASSWORD}",
"table.name.format": "msr.cdc_event",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"transforms": "RenameField,ConvertTimestamp",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "data:entity_state",
"transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimestamp.field": "event_timestamp",
"transforms.ConvertTimestamp.target.type": "Timestamp"
}
}'
Key Configuration Parameters
Source Connector Parameters
| Parameter | Description | Example Value |
|---|---|---|
connector.class | Debezium PostgreSQL connector class | io.debezium.connector.postgresql.PostgresConnector |
database.hostname | Database server hostname | gis-db |
database.port | Database server port | 5432 |
database.user | Database username | admin |
database.password | Database password | ${GIS_DB_PASSWORD} |
database.dbname | Database name | gis |
database.server.name | Logical server name | gis_server |
topic.prefix | Kafka topic prefix | gis_server |
table.include.list | Tables to replicate | gis.geo_entity |
plugin.name | PostgreSQL logical decoding plugin | pgoutput |
slot.name | Replication slot name (must be unique across entire Kafka Connect cluster) | debezium_gis_geo_entity_slot |
transforms | Data transformation pipeline | unwrap,convert,rename,restructure |
Sink Connector Parameters
| Parameter | Description | Example Value |
|---|---|---|
connector.class | JDBC sink connector class | io.confluent.connect.jdbc.JdbcSinkConnector |
connection.url | JDBC connection URL | jdbc:postgresql://msr-database:5432/msr |
connection.user | Database username | admin |
connection.password | Database password | ${MSR_DB_PASSWORD} |
topics | Kafka topics to consume from | gis_server.gis.geo_entity,gis_server.gis.bookmark |
table.name.format | Target table name format | msr.cdc_event |
insert.mode | Data insertion mode | insert |
batch.size | Number of records per batch | 100 |
transforms | Data transformation pipeline | RenameField,ConvertTimestamp |