Kafka Connect Configuration
This document outlines the key configuration concepts for Kafka Connect in the MSR (Multi-Session Replay) module.
1. Prerequisites
Before configuring connectors, set REPLICA IDENTITY FULL
for tables you want to replicate:
ALTER TABLE <table_name> REPLICA IDENTITY FULL;
Example for table geo_entity
in schema gis
:
ALTER TABLE gis.geo_entity REPLICA IDENTITY FULL;
2. Source Connector Configuration
Source connectors capture database changes and stream them to Kafka topics. Here are the key configuration areas:
Database Connection
Establishes connection to the PostgreSQL source database. The connector uses these credentials to connect to the database and access the transaction log for change data capture.
connector.class
: Specifies the Debezium PostgreSQL connectordatabase.hostname/port
: Database server locationdatabase.user/password
: Authentication credentialsdatabase.dbname
: Source database name
Kafka Topics
Controls how Kafka topics are named and which database tables are monitored for changes.
database.server.name
: Logical name for the database server (used in topic naming)topic.prefix
: Prefix for all generated Kafka topicstable.include.list
: Specific tables to monitor for changes
Topics are automatically created with format: {topic.prefix}.{schema}.{table}
Replication Slot
PostgreSQL replication slots ensure reliable change data capture by maintaining a consistent snapshot of the database state.
plugin.name
: PostgreSQL logical decoding plugin (typicallypgoutput
)slot.name
: Unique identifier for this replication slot (must be unique across entire Kafka Connect cluster)
Data Transformations
Transforms modify the raw change events into the format required by MSR:
unwrap
: Extracts the actual data from Debezium's change event envelopeconvert
: Converts timestamps to proper formatrename
: Renames fields to match MSR expectationsrestructure
: Uses JSLT to transform data structure into MSR's CDC event format
The transformation configuration shown here is specific to the gis.geo_entity
table structure. Each table may require different transformation logic, particularly in the restructure
transform's JSLT expression, to properly map the source table fields to the MSR CDC event format. You'll need to customize the JSLT transformation based on your specific table schema and field requirements.
Expected Kafka Message Format
After all transformations are applied, the messages in Kafka/Redpanda will have the following structure:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "entity_id"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "event_timestamp"
},
{
"type": "string",
"optional": true,
"field": "table_name"
},
{
"type": "string",
"optional": true,
"field": "data"
}
],
"optional": false
},
"payload": {
"entity_id": "38138e8e",
"op": "u",
"event_timestamp": 1756095710699,
"table_name": "gis.geo_entity",
"data": "{\"id\":\"45de16a6-fef6-45f0-a552-f460be945272\",\"created_at\":\"2025-08-25T04:19:05.799000Z\",\"updated_at\":\"2025-08-25T04:21:50.694000Z\",\"created_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"updated_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"tenant_id\":\"6c17fb87-d715-439f-943f-69f01318adac\",\"occ_lock\":21,\"entity_type\":\"track\",\"geojson\":\"{\\\"type\\\": \\\"Feature\\\", \\\"geometry\\\": {\\\"type\\\": \\\"Point\\\", \\\"coordinates\\\": [103.715, 1.354]}, \\\"properties\\\": {\\\"alt\\\": \\\"0\\\", \\\"eta\\\": \\\"\\\", \\\"hex\\\": \\\"76E9F2\\\", \\\"lat\\\": 1.354, \\\"lon\\\": 103.715, \\\"reg\\\": \\\"\\\", \\\"kind\\\": \\\"aircraft\\\", \\\"type\\\": \\\"GRND\\\", \\\"track\\\": \\\"90\\\", \\\"flight\\\": \\\"2\\\", \\\"gspeed\\\": \\\"8\\\", \\\"source\\\": \\\"ADSB\\\", \\\"squawk\\\": \\\"0\\\", \\\"vspeed\\\": \\\"0\\\", \\\"fr24_id\\\": \\\"38138e8e\\\", \\\"callsign\\\": \\\"ATT06\\\", \\\"dest_iata\\\": \\\"\\\", \\\"dest_icao\\\": \\\"\\\", \\\"orig_iata\\\": \\\"\\\", \\\"orig_icao\\\": \\\"\\\", \\\"timestamp\\\": \\\"2024-11-22T13:25:33Z\\\", \\\"created_at\\\": \\\"1732281939052\\\", \\\"painted_as\\\": \\\"\\\", \\\"operating_as\\\": \\\"\\\"}}\"}"
}
}
Payload Fields:
entity_id
: Unique identifier for the entity (extracted from the original record)op
: Operation type (c
=create,u
=update,d
=delete)event_timestamp
: Unix timestamp in milliseconds when the change occurredtable_name
: Source table in formatschema.table_name
data
: JSON string containing the complete record data after the change
3. Sink Connector Configuration
The sink connector consumes processed change events from Kafka topics and stores them in the MSR database:
Database Connection
Establishes connection to the target MSR database where change events will be stored.
connector.class
: JDBC sink connector for PostgreSQLconnection.url
: JDBC connection string to MSR databaseconnection.user/password
: Database authentication
Topic and Table Mapping
Defines which Kafka topics to consume and where to store the data.
topics
: Comma-separated list of Kafka topics to consumetable.name.format
: Target table name in MSR databaseinsert.mode
: How data is inserted (typically "insert" for append-only)
Data Transforms
Final transformations before storing in MSR database:
RenameField
: Renamesdata
field toentity_state
to match MSR schemaConvertTimestamp
: Ensures timestamp fields are in correct database format
Complete Configuration Example
Here's a complete example for setting up both source and sink connectors for the gis.geo_entity
table:
- Source Connector
- Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "gis-geo-entity-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "gis-db",
"database.port": "5432",
"database.user": "${GIS_DB_USER}",
"database.password": "${GIS_DB_PASSWORD}",
"database.dbname": "gis",
"database.server.name": "gis_server",
"topic.prefix": "gis_server",
"table.include.list": "gis.geo_entity",
"plugin.name": "pgoutput",
"slot.name": "debezium_gis_geo_entity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,convert,rename,drops,restructure",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms,source.schema,source.table",
"transforms.convert.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type": "Timestamp",
"transforms.convert.field": "__source_ts_ms",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "__source_ts_ms:event_timestamp,__op:op",
"transforms.drops.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.drops.exclude": "__deleted",
"transforms.restructure.type": "at.willhaben.kafka.connect.transforms.jslt.JsltTransform$Value",
"transforms.restructure.jslt": "{ \"entity_id\":.payload.entity_id, \"op\":.payload.op, \"event_timestamp\":.payload.event_timestamp, \"table_name\": .payload.__source_schema + \".\" + .payload.__source_table, \"data\": to-json( { \"id\":.payload.id, \"created_at\":.payload.created_at, \"updated_at\":.payload.updated_at, \"created_by\":.payload.created_by, \"updated_by\":.payload.updated_by, \"tenant_id\":.payload.tenant_id, \"occ_lock\":.payload.occ_lock, \"entity_type\":.payload.entity_type, \"geojson\":.payload.geojson }) }"
}
}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "cdc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "gis_server.gis.geo_entity",
"connection.url": "jdbc:postgresql://msr-database:5432/msr",
"connection.user": "${MSR_DB_USER}",
"connection.password": "${MSR_DB_PASSWORD}",
"table.name.format": "msr.cdc_event",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"transforms": "RenameField,ConvertTimestamp",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "data:entity_state",
"transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimestamp.field": "event_timestamp",
"transforms.ConvertTimestamp.target.type": "Timestamp"
}
}'
Key Configuration Parameters
Source Connector Parameters
Parameter | Description | Example Value |
---|---|---|
connector.class | Debezium PostgreSQL connector class | io.debezium.connector.postgresql.PostgresConnector |
database.hostname | Database server hostname | gis-db |
database.port | Database server port | 5432 |
database.user | Database username | admin |
database.password | Database password | ${GIS_DB_PASSWORD} |
database.dbname | Database name | gis |
database.server.name | Logical server name | gis_server |
topic.prefix | Kafka topic prefix | gis_server |
table.include.list | Tables to replicate | gis.geo_entity |
plugin.name | PostgreSQL logical decoding plugin | pgoutput |
slot.name | Replication slot name (must be unique across entire Kafka Connect cluster) | debezium_gis_geo_entity_slot |
transforms | Data transformation pipeline | unwrap,convert,rename,restructure |
Sink Connector Parameters
Parameter | Description | Example Value |
---|---|---|
connector.class | JDBC sink connector class | io.confluent.connect.jdbc.JdbcSinkConnector |
connection.url | JDBC connection URL | jdbc:postgresql://msr-database:5432/msr |
connection.user | Database username | admin |
connection.password | Database password | ${MSR_DB_PASSWORD} |
topics | Kafka topics to consume from | gis_server.gis.geo_entity,gis_server.gis.bookmark |
table.name.format | Target table name format | msr.cdc_event |
insert.mode | Data insertion mode | insert |
batch.size | Number of records per batch | 100 |
transforms | Data transformation pipeline | RenameField,ConvertTimestamp |