Skip to main content
Version: 2.2.0

Kafka Connect Configuration

This document outlines the key configuration concepts for Kafka Connect in the MSR (Multi-Session Replay) module.

1. Prerequisites

Before configuring connectors, set REPLICA IDENTITY FULL for tables you want to replicate:

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Example for table geo_entity in schema gis:

ALTER TABLE gis.geo_entity REPLICA IDENTITY FULL;

2. Source Connector Configuration

Source connectors capture database changes and stream them to Kafka topics. Here are the key configuration areas:

Database Connection

Establishes connection to the PostgreSQL source database. The connector uses these credentials to connect to the database and access the transaction log for change data capture.

  • connector.class: Specifies the Debezium PostgreSQL connector
  • database.hostname/port: Database server location
  • database.user/password: Authentication credentials
  • database.dbname: Source database name

Kafka Topics

Controls how Kafka topics are named and which database tables are monitored for changes.

  • database.server.name: Logical name for the database server (used in topic naming)
  • topic.prefix: Prefix for all generated Kafka topics
  • table.include.list: Specific tables to monitor for changes

Topics are automatically created with format: {topic.prefix}.{schema}.{table}

Replication Slot

PostgreSQL replication slots ensure reliable change data capture by maintaining a consistent snapshot of the database state.

  • plugin.name: PostgreSQL logical decoding plugin (typically pgoutput)
  • slot.name: Unique identifier for this replication slot (must be unique across entire Kafka Connect cluster)

Data Transformations

Transforms modify the raw change events into the format required by MSR:

  • unwrap: Extracts the actual data from Debezium's change event envelope
  • convert: Converts timestamps to proper format
  • rename: Renames fields to match MSR expectations
  • restructure: Uses JSLT to transform data structure into MSR's CDC event format
note

The transformation configuration shown here is specific to the gis.geo_entity table structure. Each table may require different transformation logic, particularly in the restructure transform's JSLT expression, to properly map the source table fields to the MSR CDC event format. You'll need to customize the JSLT transformation based on your specific table schema and field requirements.

Expected Kafka Message Format

After all transformations are applied, the messages in Kafka/Redpanda will have the following structure:

{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "entity_id"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "event_timestamp"
},
{
"type": "string",
"optional": true,
"field": "table_name"
},
{
"type": "string",
"optional": true,
"field": "data"
}
],
"optional": false
},
"payload": {
"entity_id": "38138e8e",
"op": "u",
"event_timestamp": 1756095710699,
"table_name": "gis.geo_entity",
"data": "{\"id\":\"45de16a6-fef6-45f0-a552-f460be945272\",\"created_at\":\"2025-08-25T04:19:05.799000Z\",\"updated_at\":\"2025-08-25T04:21:50.694000Z\",\"created_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"updated_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"tenant_id\":\"6c17fb87-d715-439f-943f-69f01318adac\",\"occ_lock\":21,\"entity_type\":\"track\",\"geojson\":\"{\\\"type\\\": \\\"Feature\\\", \\\"geometry\\\": {\\\"type\\\": \\\"Point\\\", \\\"coordinates\\\": [103.715, 1.354]}, \\\"properties\\\": {\\\"alt\\\": \\\"0\\\", \\\"eta\\\": \\\"\\\", \\\"hex\\\": \\\"76E9F2\\\", \\\"lat\\\": 1.354, \\\"lon\\\": 103.715, \\\"reg\\\": \\\"\\\", \\\"kind\\\": \\\"aircraft\\\", \\\"type\\\": \\\"GRND\\\", \\\"track\\\": \\\"90\\\", \\\"flight\\\": \\\"2\\\", \\\"gspeed\\\": \\\"8\\\", \\\"source\\\": \\\"ADSB\\\", \\\"squawk\\\": \\\"0\\\", \\\"vspeed\\\": \\\"0\\\", \\\"fr24_id\\\": \\\"38138e8e\\\", \\\"callsign\\\": \\\"ATT06\\\", \\\"dest_iata\\\": \\\"\\\", \\\"dest_icao\\\": \\\"\\\", \\\"orig_iata\\\": \\\"\\\", \\\"orig_icao\\\": \\\"\\\", \\\"timestamp\\\": \\\"2024-11-22T13:25:33Z\\\", \\\"created_at\\\": \\\"1732281939052\\\", \\\"painted_as\\\": \\\"\\\", \\\"operating_as\\\": \\\"\\\"}}\"}"
}
}

Payload Fields:

  • entity_id: Unique identifier for the entity (extracted from the original record)
  • op: Operation type (c=create, u=update, d=delete)
  • event_timestamp: Unix timestamp in milliseconds when the change occurred
  • table_name: Source table in format schema.table_name
  • data: JSON string containing the complete record data after the change

3. Sink Connector Configuration

The sink connector consumes processed change events from Kafka topics and stores them in the MSR database:

Database Connection

Establishes connection to the target MSR database where change events will be stored.

  • connector.class: JDBC sink connector for PostgreSQL
  • connection.url: JDBC connection string to MSR database
  • connection.user/password: Database authentication

Topic and Table Mapping

Defines which Kafka topics to consume and where to store the data.

  • topics: Comma-separated list of Kafka topics to consume
  • table.name.format: Target table name in MSR database
  • insert.mode: How data is inserted (typically "insert" for append-only)

Data Transforms

Final transformations before storing in MSR database:

  • RenameField: Renames data field to entity_state to match MSR schema
  • ConvertTimestamp: Ensures timestamp fields are in correct database format

Complete Configuration Example

Here's a complete example for setting up both source and sink connectors for the gis.geo_entity table:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "gis-geo-entity-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "gis-db",
"database.port": "5432",
"database.user": "${GIS_DB_USER}",
"database.password": "${GIS_DB_PASSWORD}",
"database.dbname": "gis",
"database.server.name": "gis_server",
"topic.prefix": "gis_server",
"table.include.list": "gis.geo_entity",
"plugin.name": "pgoutput",
"slot.name": "debezium_gis_geo_entity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,convert,rename,drops,restructure",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms,source.schema,source.table",
"transforms.convert.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type": "Timestamp",
"transforms.convert.field": "__source_ts_ms",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "__source_ts_ms:event_timestamp,__op:op",
"transforms.drops.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.drops.exclude": "__deleted",
"transforms.restructure.type": "at.willhaben.kafka.connect.transforms.jslt.JsltTransform$Value",
"transforms.restructure.jslt": "{ \"entity_id\":.payload.entity_id, \"op\":.payload.op, \"event_timestamp\":.payload.event_timestamp, \"table_name\": .payload.__source_schema + \".\" + .payload.__source_table, \"data\": to-json( { \"id\":.payload.id, \"created_at\":.payload.created_at, \"updated_at\":.payload.updated_at, \"created_by\":.payload.created_by, \"updated_by\":.payload.updated_by, \"tenant_id\":.payload.tenant_id, \"occ_lock\":.payload.occ_lock, \"entity_type\":.payload.entity_type, \"geojson\":.payload.geojson }) }"
}
}'

Key Configuration Parameters

Source Connector Parameters

ParameterDescriptionExample Value
connector.classDebezium PostgreSQL connector classio.debezium.connector.postgresql.PostgresConnector
database.hostnameDatabase server hostnamegis-db
database.portDatabase server port5432
database.userDatabase usernameadmin
database.passwordDatabase password${GIS_DB_PASSWORD}
database.dbnameDatabase namegis
database.server.nameLogical server namegis_server
topic.prefixKafka topic prefixgis_server
table.include.listTables to replicategis.geo_entity
plugin.namePostgreSQL logical decoding pluginpgoutput
slot.nameReplication slot name (must be unique across entire Kafka Connect cluster)debezium_gis_geo_entity_slot
transformsData transformation pipelineunwrap,convert,rename,restructure

Sink Connector Parameters

ParameterDescriptionExample Value
connector.classJDBC sink connector classio.confluent.connect.jdbc.JdbcSinkConnector
connection.urlJDBC connection URLjdbc:postgresql://msr-database:5432/msr
connection.userDatabase usernameadmin
connection.passwordDatabase password${MSR_DB_PASSWORD}
topicsKafka topics to consume fromgis_server.gis.geo_entity,gis_server.gis.bookmark
table.name.formatTarget table name formatmsr.cdc_event
insert.modeData insertion modeinsert
batch.sizeNumber of records per batch100
transformsData transformation pipelineRenameField,ConvertTimestamp