Skip to main content
Version: 2.2.0

Kafka Connect Configuration

This document outlines the key configuration concepts for Kafka Connect in the MSR (Multi-Session Replay) module.

1. Prerequisites

PostgreSQL WAL Configuration

Critical Requirement

The source PostgreSQL database MUST have Write-Ahead Logging (WAL) level set to logical. Without this, Debezium cannot capture changes.

-- Check current WAL level
SHOW wal_level;
-- Must return: 'logical'

If not set to logical, configure it:

# Option 1: Server startup command
postgres -c wal_level=logical

# Option 2: In postgresql.conf
wal_level = logical
# Note: Requires database restart

Table REPLICA IDENTITY

Before configuring connectors, set REPLICA IDENTITY FULL for tables you want to replicate:

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Example for table geo_entity in schema gis:

ALTER TABLE gis.geo_entity REPLICA IDENTITY FULL;
Why REPLICA IDENTITY FULL?
  • DEFAULT: Only captures primary key in UPDATE/DELETE events
  • FULL: Captures complete row data for all operations
  • MSR Requirement: FULL is required to properly reconstruct entity states

Database User Permissions

The database user used by Debezium must have:

-- Grant replication privilege
ALTER USER debezium_user REPLICATION;

-- Grant SELECT on tables to replicate
GRANT SELECT ON ALL TABLES IN SCHEMA target_schema TO debezium_user;

2. Source Connector Configuration

Source connectors capture database changes and stream them to Kafka topics. Here are the key configuration areas:

Database Connection

Establishes connection to the PostgreSQL source database. The connector uses these credentials to connect to the database and access the transaction log for change data capture.

  • connector.class: Specifies the Debezium PostgreSQL connector
  • database.hostname/port: Database server location
  • database.user/password: Authentication credentials
  • database.dbname: Source database name

Kafka Topics

Controls how Kafka topics are named and which database tables are monitored for changes.

  • database.server.name: Logical name for the database server (used in topic naming)
  • topic.prefix: Prefix for all generated Kafka topics
  • table.include.list: Specific tables to monitor for changes

Topics are automatically created with format: {topic.prefix}.{schema}.{table}

Replication Slot

PostgreSQL replication slots ensure reliable change data capture by maintaining a consistent snapshot of the database state.

  • plugin.name: PostgreSQL logical decoding plugin (typically pgoutput)
  • slot.name: Unique identifier for this replication slot (must be unique across entire Kafka Connect cluster)

Data Transformations

Transforms modify the raw change events into the format required by MSR:

  • unwrap: Extracts the actual data from Debezium's change event envelope
  • convert: Converts timestamps to proper format
  • rename: Renames fields to match MSR expectations
  • restructure: Uses JSLT to transform data structure into MSR's CDC event format
Table Name Tracking

The table_name field is automatically populated in the format schema.table_name (e.g., gis.geo_entity) by the JSLT transformation. This field is crucial for MSR to identify the source table of each CDC event, especially when multiple tables are being replicated to the same MSR instance.

note

The transformation configuration shown here is specific to the gis.geo_entity table structure. Each table may require different transformation logic, particularly in the restructure transform's JSLT expression, to properly map the source table fields to the MSR CDC event format. You'll need to customize the JSLT transformation based on your specific table schema and field requirements.

Expected Kafka Message Format

After all transformations are applied, the messages in Kafka/Redpanda will have the following structure:

{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "entity_id"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "event_timestamp"
},
{
"type": "string",
"optional": true,
"field": "table_name"
},
{
"type": "string",
"optional": true,
"field": "data"
}
],
"optional": false
},
"payload": {
"entity_id": "38138e8e",
"op": "u",
"event_timestamp": 1756095710699,
"table_name": "gis.geo_entity",
"data": "{\"id\":\"45de16a6-fef6-45f0-a552-f460be945272\",\"created_at\":\"2025-08-25T04:19:05.799000Z\",\"updated_at\":\"2025-08-25T04:21:50.694000Z\",\"created_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"updated_by\":\"f67cb8f5-0645-444d-a4bd-c61aaf1b2db0\",\"tenant_id\":\"6c17fb87-d715-439f-943f-69f01318adac\",\"occ_lock\":21,\"entity_type\":\"track\",\"geojson\":\"{\\\"type\\\": \\\"Feature\\\", \\\"geometry\\\": {\\\"type\\\": \\\"Point\\\", \\\"coordinates\\\": [103.715, 1.354]}, \\\"properties\\\": {\\\"alt\\\": \\\"0\\\", \\\"eta\\\": \\\"\\\", \\\"hex\\\": \\\"76E9F2\\\", \\\"lat\\\": 1.354, \\\"lon\\\": 103.715, \\\"reg\\\": \\\"\\\", \\\"kind\\\": \\\"aircraft\\\", \\\"type\\\": \\\"GRND\\\", \\\"track\\\": \\\"90\\\", \\\"flight\\\": \\\"2\\\", \\\"gspeed\\\": \\\"8\\\", \\\"source\\\": \\\"ADSB\\\", \\\"squawk\\\": \\\"0\\\", \\\"vspeed\\\": \\\"0\\\", \\\"fr24_id\\\": \\\"38138e8e\\\", \\\"callsign\\\": \\\"ATT06\\\", \\\"dest_iata\\\": \\\"\\\", \\\"dest_icao\\\": \\\"\\\", \\\"orig_iata\\\": \\\"\\\", \\\"orig_icao\\\": \\\"\\\", \\\"timestamp\\\": \\\"2024-11-22T13:25:33Z\\\", \\\"created_at\\\": \\\"1732281939052\\\", \\\"painted_as\\\": \\\"\\\", \\\"operating_as\\\": \\\"\\\"}}\"}"
}
}

Payload Fields:

  • entity_id: Unique identifier for the entity (extracted from the original record)
  • op: Operation type (c=create, u=update, d=delete, r=read/initial snapshot)
  • event_timestamp: Unix timestamp in milliseconds when the change occurred
  • table_name: Source table in format schema.table_name (e.g., gis.geo_entity)
  • data: JSON string containing the complete record data after the change
Table Name Field

The table_name field was added in to support multi-table CDC tracking. This field:

  • Identifies the source table for each CDC event
  • Enables filtering and grouping of events by source table
  • Is automatically populated by the JSLT transformation
  • Must be included in all source connector configurations

3. Sink Connector Configuration

The sink connector consumes processed change events from Kafka topics and stores them in the MSR database:

Database Connection

Establishes connection to the target MSR database where change events will be stored.

  • connector.class: JDBC sink connector for PostgreSQL
  • connection.url: JDBC connection string to MSR database
  • connection.user/password: Database authentication

Topic and Table Mapping

Defines which Kafka topics to consume and where to store the data.

  • topics: Comma-separated list of Kafka topics to consume
  • table.name.format: Target table name in MSR database
  • insert.mode: How data is inserted (typically "insert" for append-only)

Data Transforms

Final transformations before storing in MSR database:

  • RenameField: Renames data field to entity_state to match MSR schema
  • ConvertTimestamp: Ensures timestamp fields are in correct database format

Complete Configuration Example

Here's a complete example for setting up both source and sink connectors for the gis.geo_entity table:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "gis-geo-entity-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "gis-db",
"database.port": "5432",
"database.user": "${GIS_DB_USER}",
"database.password": "${GIS_DB_PASSWORD}",
"database.dbname": "gis",
"database.server.name": "gis_server",
"topic.prefix": "gis_server",
"table.include.list": "gis.geo_entity",
"plugin.name": "pgoutput",
"slot.name": "debezium_gis_geo_entity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,convert,rename,drops,restructure",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms,source.schema,source.table",
"transforms.convert.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert.target.type": "Timestamp",
"transforms.convert.field": "__source_ts_ms",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "__source_ts_ms:event_timestamp,__op:op",
"transforms.drops.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.drops.exclude": "__deleted",
"transforms.restructure.type": "at.willhaben.kafka.connect.transforms.jslt.JsltTransform$Value",
"transforms.restructure.jslt": "{ \"entity_id\":.payload.entity_id, \"op\":.payload.op, \"event_timestamp\":.payload.event_timestamp, \"table_name\": .payload.__source_schema + \".\" + .payload.__source_table, \"data\": to-json( { \"id\":.payload.id, \"created_at\":.payload.created_at, \"updated_at\":.payload.updated_at, \"created_by\":.payload.created_by, \"updated_by\":.payload.updated_by, \"tenant_id\":.payload.tenant_id, \"occ_lock\":.payload.occ_lock, \"entity_type\":.payload.entity_type, \"geojson\":.payload.geojson }) }"
}
}'

Key Configuration Parameters

Source Connector Parameters

ParameterDescriptionExample Value
connector.classDebezium PostgreSQL connector classio.debezium.connector.postgresql.PostgresConnector
database.hostnameDatabase server hostnamegis-db
database.portDatabase server port5432
database.userDatabase usernameadmin
database.passwordDatabase password${GIS_DB_PASSWORD}
database.dbnameDatabase namegis
database.server.nameLogical server namegis_server
topic.prefixKafka topic prefixgis_server
table.include.listTables to replicategis.geo_entity
plugin.namePostgreSQL logical decoding pluginpgoutput
slot.nameReplication slot name (must be unique across entire Kafka Connect cluster)debezium_gis_geo_entity_slot
transformsData transformation pipelineunwrap,convert,rename,restructure

Sink Connector Parameters

ParameterDescriptionExample Value
connector.classJDBC sink connector classio.confluent.connect.jdbc.JdbcSinkConnector
connection.urlJDBC connection URLjdbc:postgresql://msr-database:5432/msr
connection.userDatabase usernameadmin
connection.passwordDatabase password${MSR_DB_PASSWORD}
topicsKafka topics to consume fromgis_server.gis.geo_entity,gis_server.gis.bookmark
table.name.formatTarget table name formatmsr.cdc_event
insert.modeData insertion modeinsert
batch.sizeNumber of records per batch100
transformsData transformation pipelineRenameField,ConvertTimestamp