The page has been translated by Gen AI.

MirrorMaker 2 Connector useone Apache Kafka DR

MirrorMaker 2 Connector useone Apache Kafka DR

Overview

Mirror Maker 2 (hereafter MM2) is an application that replicates topics between two different Apache Kafka clusters based on the Kafka Connect Framework. There are two ways to use the MirrorMaker provided by Apache Kafka (hereafter Kafka) itself and to manually configure MM2’s connectors. The built‑in MirrorMaker configuration provided by Kafka does not support distributed mode. Therefore, to build a more reliable and scalable solution, this document examines how to configure distributed mode using Kafka Connect Distributed Mode and manually configure the three connectors used by MM2.

MM2 features

  • Data replication between clusters
  • Detect new topics and partitions
  • Topic and Consumer group offset synchronization

Kafka DR configuration using MM2

Based on an architecture that replicates data from the Source Kafka Cluster to the Target Kafka Cluster. In case of a system failure, the Target Kafka Cluster serves as a backup data source, enabling data recovery. (This document does not cover network configuration.)

Example of Kafka DR configuration using MM2
Figure. Example of Kafka DR configuration using MM2

MM2 operates within a cluster built using Kafka Connect, where each Worker functions as an independent node in the cluster and runs one or more Connector instances. Each Connector can have one or more Tasks, and these Tasks are responsible for the actual work of replicating and processing data.

Kafka Connect Cluster Details
Figure. Kafka Connect Cluster Details

MM2 Connectors

Connectors

Connector NameDescription
MirrorSourceConnectorPerform Kafka record replication (source→target)
MirrorCheckpointConnectorReplicate the consumer group offsets from the source Kafka cluster to the target Kafka cluster.
MirrorHeartbeatConnectorRecord heartbeat data and periodically monitor whether the remote Kafka is operating normally and connected.

Connector Main Settings

ConfigurationDescriptionSourceConnectorCheckpointConnectorHeartbeatConnector
admin.timeout.msTimeout for administrative tasks such as new topic detection
replication.policy.classPolicy defining naming conventions for remote topics
replication.policy.separatorDelimiter used for naming in the target cluster
consumer.poll.timeout.msTimeout when polling the source cluster
offset-syncs.topic.locationLocation of the offset-syncs topic
topic.filter.classFilter items and select the topic to duplicate
config.property.filter.classtopicSelect the topic config properties to replicate using a filter
config.properties.excludeTopic config properties that must not be duplicated. Supports comma-separated property names and regular expressions.
offset.lag.maxMaximum allowable (unsynchronized) offset delay before the remote partition is synchronized.
offset-syncs.topic.replication.factorreplication factor for the internal offset synchronization topic
refresh.topics.enabledEnable checks for new topics and partitions
refresh.topics.interval.secondsTopic Refresh Frequency
replication.factorReplication factor of the new topic
sync.topic.acls.enabledEnable ACL synchronization of the source cluster. Not compatible with User Operator.
sync.topic.acls.interval.secondsACL synchronization frequency
sync.topic.configs.enabledEnable synchronization of topic config on the source cluster
sync.topic.configs.interval.secondsfrequency of topic config synchronization
checkpoints.topic.replication.factorReplication factor for the internal checkpoint topic
emit.checkpoints.enabledEnable synchronization of consumer offsets with the target cluster
emit.checkpoints.interval.secondsFrequency of consumer offset synchronization
group.filter.classGroup filter that selects the consumer group to replicate
refresh.groups.enabledEnable checking for new consumer groups
refresh.groups.interval.secondsConsumer group refresh frequency
sync.group.offsets.enabledEnable synchronization of consumer group offsets for the __consumer_offsets topic on the Target cluster
sync.group.offsets.interval.secondsConsumer group offset synchronization frequency
emit.heartbeats.enabledEnable connection check on the target cluster
emit.heartbeats.interval.secondsConnection check frequency
heartbeats.topic.replication.factorReplication factor of the internal heartbeat topic

MM2 Connector Setup

This document explains how to create the three connectors used in MM2 via the Kafka Connect REST API and how to retrieve the information of the created connectors thereafter.

SourceConnector

curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "mm2-msc",
    "config":
    {
        "name": "mm2-msc",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "sourcemsc, targetmsc",
        "source.cluster.alias": "sourcemsc",
        "source.cluster.bootstrap.servers":  "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
        "source.cluster.security.protocol": "SASL_PLAINTEXT",
        "source.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "source.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
        "target.cluster.alias": "targetmsc",
        "target.cluster.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "target.cluster.security.protocol": "SASL_PLAINTEXT",
        "target.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
        "topics": ".*",
        "groups": ".*",
        "topics.blacklist": ".*[\\-\\.]internal, .*\\.replica, __consumer_offsets",
        "groups.blacklist": "console-consumer-.*, connect-.*, __.*",
        "tasks.max": "3",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "offset-syncs.topic.location": "target",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "producer.enable.idempotence": "true",
        "sync.topic.acls.enabled": "true",
        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
        "producer.override.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "producer.override.security.protocol": "SASL_PLAINTEXT",
        "producer.override.sasl.mechanism": "SCRAM-SHA-256",
        "producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
  }
}'

MirrorCheckpointConnector

curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "mm2-cpc",
    "config":
    {
        "name": "mm2-cpc",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "clusters": "sourcecpc, targetcpc",
        "source.cluster.alias": "sourcecpc",
        "source.cluster.bootstrap.servers":  "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
        "source.cluster.security.protocol": "SASL_PLAINTEXT",
        "source.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "source.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
        "target.cluster.alias": "targetcpc",
        "target.cluster.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "target.cluster.security.protocol": "SASL_PLAINTEXT",
        "target.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "target.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
        "tasks.max": "3",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "checkpoints.topic.replication.factor": "3",
        "emit.checkpoints.enabled": "true",
        "emit.checkpoints.interval.seconds": "20",
        "sync.group.offsets.enabled": "true",
        "sync.group.offsets.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "producer.override.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "producer.override.security.protocol": "SASL_PLAINTEXT",
        "producer.override.sasl.mechanism": "SCRAM-SHA-256",
        "producer.override.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
    }
  }'

MirrorHeartbeatConnector

curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "mm2-hbc",
    "config":
    {
        "name": "mm2-hbc",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "clusters": "sourcehbc, targethbc",
        "source.cluster.alias": "sourcehbc",
        "source.cluster.bootstrap.servers":  "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
        "source.cluster.security.protocol": "SASL_PLAINTEXT",
        "source.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "source.cluster.sasl.jaas.config":   "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
        "target.cluster.alias": "targethbc",
        "target.cluster.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "target.cluster.security.protocol": "SASL_PLAINTEXT",
        "target.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "target.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
        "tasks.max": "3",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "heartbeats.topic.replication.factor": "3",
        "emit.heartbeats.enabled": "true",
        "emit.heartbeats.interval.seconds": "1",
        "producer.override.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "producer.override.security.protocol": "SASL_PLAINTEXT",
        "producer.override.sasl.mechanism": "SCRAM-SHA-256",
        "producer.override.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
    }
  }'

Connector Producer and Consumer configuration

ConnectorRoleDescriptionConfig
Source ConnectorProducerSend the topic messages to the target Kafka cluster. When processing large volumes of data, it is advisable to tune this producer’s config.mirrors.sourceConnector.config: producer.override.*
Source ConnectorProducerWrite to the offset-syncs entry that maps the source and target offsets for a replicated topic partition.mirrors.sourceConnector.config: producer.*
Source ConnectorConsumerSearches for topic messages in the source Kafka cluster.mirrors.sourceConnector.config: consumer.*
Checkpoint ConnectorProducerExports the consumer offset checkpoint.mirrors.checkpointConnector.config: producer.override.*
Checkpoint ConnectorConsumerLoading offset-syncs topic.mirrors.checkpointConnector.config: consumer.*
Heartbeat ConnectorProducerExport the heartbeat.mirrors.heartbeatConnector.config: producer.override.*

Connector Check

curl -X GET { kafka_connect_ip }:8083/connectors?expand=status |python -m json.tool

{
    "mm2-cpc": {
        "status": {
            "connector": {
                "state": "RUNNING",
                "worker_id": "{ kafka_connect_ip }:8083"
            },
            "name": "mm2-cpc",
            "tasks": [],
            "type": "source"
        }
    },
    "mm2-hbc": {
        "status": {
            "connector": {
                "state": "RUNNING",
                "worker_id": "{ kafka_connect_ip }:8083"
            },
            "name": "mm2-hbc",
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "{ kafka_connect_ip }:8083"
                }
            ],
            "type": "source"
        }
    },
    "mm2-msc": {
        "status": {
            "connector": {
                "state": "RUNNING",
                "worker_id": "{ kafka_connect_ip }:8083"
            },
            "name": "mm2-msc",
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "{ kafka_connect_ip }:8083"
                },
                {
                    "id": 1,
                    "state": "RUNNING",
                    "worker_id": "{ kafka_connect_ip }:8083"
                }
            ],
            "type": "source"
        }
    }
}

MM2 internal Topics

It is a Topic automatically created for the internal operation of MM2, and it plays an essential role in managing and monitoring the replication process.

Topic NameDescription
heartbeatsMonitor the cluster’s status and verify that the MirrorMaker 2 instance is operating correctly.
mm2-offset-syncs.source.internalCluster-to-cluster offset mapping information for replicated topic partitions of the source and target Kafka
source.checkpoints.internalUsed to record the replication progress for each partition of the source cluster.

test

Test configuration
Figure. Test configuration

View Connector List

Connector list view
Figure. Connector list view

Check existing Topic duplication

Check existing Topic replication
Figure. Check existing Topic replication

Verify replication of newly created Topic

Newly Created Topic Replication Check
Figure. Newly Created Topic Replication Check

Check for message duplication in Topic

Message replication check in Topic
Figure. Message replication check in Topic