MirrorMaker 2 Connector useone Apache Kafka DR
MirrorMaker 2 Connector useone Apache Kafka DR
Overview
Mirror Maker 2 (hereafter MM2) is an application that replicates topics between two different Apache Kafka clusters based on the Kafka Connect Framework. There are two ways to use the MirrorMaker provided by Apache Kafka (hereafter Kafka) itself and to manually configure MM2’s connectors. The built‑in MirrorMaker configuration provided by Kafka does not support distributed mode. Therefore, to build a more reliable and scalable solution, this document examines how to configure distributed mode using Kafka Connect Distributed Mode and manually configure the three connectors used by MM2.
MM2 features
- Data replication between clusters
- Detect new topics and partitions
- Topic and Consumer group offset synchronization
Kafka DR configuration using MM2
Based on an architecture that replicates data from the Source Kafka Cluster to the Target Kafka Cluster. In case of a system failure, the Target Kafka Cluster serves as a backup data source, enabling data recovery. (This document does not cover network configuration.)
MM2 operates within a cluster built using Kafka Connect, where each Worker functions as an independent node in the cluster and runs one or more Connector instances. Each Connector can have one or more Tasks, and these Tasks are responsible for the actual work of replicating and processing data.
MM2 Connectors
Connectors
| Connector Name | Description |
|---|---|
| MirrorSourceConnector | Perform Kafka record replication (source→target) |
| MirrorCheckpointConnector | Replicate the consumer group offsets from the source Kafka cluster to the target Kafka cluster. |
| MirrorHeartbeatConnector | Record heartbeat data and periodically monitor whether the remote Kafka is operating normally and connected. |
Connector Main Settings
| Configuration | Description | SourceConnector | CheckpointConnector | HeartbeatConnector |
|---|---|---|---|---|
| admin.timeout.ms | Timeout for administrative tasks such as new topic detection | ✓ | ✓ | ✓ |
| replication.policy.class | Policy defining naming conventions for remote topics | ✓ | ✓ | ✓ |
| replication.policy.separator | Delimiter used for naming in the target cluster | ✓ | ✓ | ✓ |
| consumer.poll.timeout.ms | Timeout when polling the source cluster | ✓ | ✓ | |
| offset-syncs.topic.location | Location of the offset-syncs topic | ✓ | ✓ | |
| topic.filter.class | Filter items and select the topic to duplicate | ✓ | ✓ | |
| config.property.filter.classtopic | Select the topic config properties to replicate using a filter | ✓ | ||
| config.properties.exclude | Topic config properties that must not be duplicated. Supports comma-separated property names and regular expressions. | ✓ | ||
| offset.lag.max | Maximum allowable (unsynchronized) offset delay before the remote partition is synchronized. | ✓ | ||
| offset-syncs.topic.replication.factor | replication factor for the internal offset synchronization topic | ✓ | ||
| refresh.topics.enabled | Enable checks for new topics and partitions | ✓ | ||
| refresh.topics.interval.seconds | Topic Refresh Frequency | ✓ | ||
| replication.factor | Replication factor of the new topic | ✓ | ||
| sync.topic.acls.enabled | Enable ACL synchronization of the source cluster. Not compatible with User Operator. | ✓ | ||
| sync.topic.acls.interval.seconds | ACL synchronization frequency | ✓ | ||
| sync.topic.configs.enabled | Enable synchronization of topic config on the source cluster | ✓ | ||
| sync.topic.configs.interval.seconds | frequency of topic config synchronization | ✓ | ||
| checkpoints.topic.replication.factor | Replication factor for the internal checkpoint topic | ✓ | ||
| emit.checkpoints.enabled | Enable synchronization of consumer offsets with the target cluster | ✓ | ||
| emit.checkpoints.interval.seconds | Frequency of consumer offset synchronization | ✓ | ||
| group.filter.class | Group filter that selects the consumer group to replicate | ✓ | ||
| refresh.groups.enabled | Enable checking for new consumer groups | ✓ | ||
| refresh.groups.interval.seconds | Consumer group refresh frequency | ✓ | ||
| sync.group.offsets.enabled | Enable synchronization of consumer group offsets for the __consumer_offsets topic on the Target cluster | ✓ | ||
| sync.group.offsets.interval.seconds | Consumer group offset synchronization frequency | ✓ | ||
| emit.heartbeats.enabled | Enable connection check on the target cluster | ✓ | ||
| emit.heartbeats.interval.seconds | Connection check frequency | ✓ | ||
| heartbeats.topic.replication.factor | Replication factor of the internal heartbeat topic | ✓ |
MM2 Connector Setup
This document explains how to create the three connectors used in MM2 via the Kafka Connect REST API and how to retrieve the information of the created connectors thereafter.
SourceConnector
curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mm2-msc",
"config":
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"clusters": "sourcemsc, targetmsc",
"source.cluster.alias": "sourcemsc",
"source.cluster.bootstrap.servers": "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
"target.cluster.alias": "targetmsc",
"target.cluster.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
"topics": ".*",
"groups": ".*",
"topics.blacklist": ".*[\\-\\.]internal, .*\\.replica, __consumer_offsets",
"groups.blacklist": "console-consumer-.*, connect-.*, __.*",
"tasks.max": "3",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"offset-syncs.topic.location": "target",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.configs.interval.seconds": "20",
"refresh.topics.interval.seconds": "20",
"producer.enable.idempotence": "true",
"sync.topic.acls.enabled": "true",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"producer.override.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"producer.override.security.protocol": "SASL_PLAINTEXT",
"producer.override.sasl.mechanism": "SCRAM-SHA-256",
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
}
}'
MirrorCheckpointConnector
curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mm2-cpc",
"config":
{
"name": "mm2-cpc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"clusters": "sourcecpc, targetcpc",
"source.cluster.alias": "sourcecpc",
"source.cluster.bootstrap.servers": "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
"target.cluster.alias": "targetcpc",
"target.cluster.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
"tasks.max": "3",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"checkpoints.topic.replication.factor": "3",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "20",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "20",
"refresh.groups.interval.seconds": "20",
"producer.override.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"producer.override.security.protocol": "SASL_PLAINTEXT",
"producer.override.sasl.mechanism": "SCRAM-SHA-256",
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
}
}'
MirrorHeartbeatConnector
curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mm2-hbc",
"config":
{
"name": "mm2-hbc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"clusters": "sourcehbc, targethbc",
"source.cluster.alias": "sourcehbc",
"source.cluster.bootstrap.servers": "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
"target.cluster.alias": "targethbc",
"target.cluster.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
"tasks.max": "3",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"heartbeats.topic.replication.factor": "3",
"emit.heartbeats.enabled": "true",
"emit.heartbeats.interval.seconds": "1",
"producer.override.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"producer.override.security.protocol": "SASL_PLAINTEXT",
"producer.override.sasl.mechanism": "SCRAM-SHA-256",
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
}
}'
Connector Producer and Consumer configuration
| Connector | Role | Description | Config |
|---|---|---|---|
| Source Connector | Producer | Send the topic messages to the target Kafka cluster. When processing large volumes of data, it is advisable to tune this producer’s config. | mirrors.sourceConnector.config: producer.override.* |
| Source Connector | Producer | Write to the offset-syncs entry that maps the source and target offsets for a replicated topic partition. | mirrors.sourceConnector.config: producer.* |
| Source Connector | Consumer | Searches for topic messages in the source Kafka cluster. | mirrors.sourceConnector.config: consumer.* |
| Checkpoint Connector | Producer | Exports the consumer offset checkpoint. | mirrors.checkpointConnector.config: producer.override.* |
| Checkpoint Connector | Consumer | Loading offset-syncs topic. | mirrors.checkpointConnector.config: consumer.* |
| Heartbeat Connector | Producer | Export the heartbeat. | mirrors.heartbeatConnector.config: producer.override.* |
Connector Check
curl -X GET { kafka_connect_ip }:8083/connectors?expand=status |python -m json.tool
{
"mm2-cpc": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
"name": "mm2-cpc",
"tasks": [],
"type": "source"
}
},
"mm2-hbc": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
"name": "mm2-hbc",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
}
],
"type": "source"
}
},
"mm2-msc": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
"name": "mm2-msc",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
}
],
"type": "source"
}
}
}
MM2 internal Topics
It is a Topic automatically created for the internal operation of MM2, and it plays an essential role in managing and monitoring the replication process.
| Topic Name | Description |
|---|---|
| heartbeats | Monitor the cluster’s status and verify that the MirrorMaker 2 instance is operating correctly. |
| mm2-offset-syncs.source.internal | Cluster-to-cluster offset mapping information for replicated topic partitions of the source and target Kafka |
| source.checkpoints.internal | Used to record the replication progress for each partition of the source cluster. |

