MirrorMaker 2 Connector를 활용한 Apache Kafka DR
MirrorMaker 2 Connector를 활용한 Apache Kafka DR
개요
Mirror Maker 2(이하 MM2)는 Kafka Connect Framework 를 기반으로 하여 서로 다른 두 개의 Apache Kafka 클러스터 간에 토픽을 복제하는 애플리케이션입니다. Apache Kafka(이하 Kafka)에서 자체적으로 제공하는 MirrorMaker를 사용하는 방법과 수동으로 MM2의 Connector를 설정하는 방법이 있습니다. Kafka에서 기본적으로 제공하는 자체 MirrorMaker 구성은 분산 모드를 지원하지 않습니다. 따라서 더 안정적이고 확장 가능한 솔루션을 구축하기 위해 본 문서에서는 Kafka Connect Distributed Mode 를 통해 분산 모드 구성을 하고, MM2 에서 사용하는 3 개의 Connector 를 수동으로 구성하는 방법을 살펴봅니다.
MM2 특징
- Cluster 간 데이터 복제
- 새로운 Topic, Partition 감지
- Topic 및 Consumer group offset 동기화
MM2를 활용한 Kafka DR 구성
Source Kafka Cluster 에서 Target Kafka Cluster 로 데이터를 복제하는 아키텍처를 기반으로 합니다. 시스템에 장애가 발생할 경우, Target Kafka Cluster 는 백업 데이터 소스로서의 역할을 하여 데이터 복구를 가능하게 합니다. (본 문서에서는 Network 구성에 대해서는 다루지 않습니다.)
MM2는 Kafka Connect를 활용하여 구성된 클러스터 내에서 작동하며, 각 Worker는 클러스터 내의 독립적인 노드로서 하나 이상의 Connector 인스턴스를 실행합니다. 각 Connector들은 다시 하나 이상의 Task를 가질 수 있으며, 해당 Task들은 데이터를 복제하고 처리하는 실질적인 작업을 담당합니다.
MM2 Connectors
Connectors
| Connector Name | Description |
|---|---|
| MirrorSourceConnector | Kafka 레코드 복제 (source→target) 수행 |
| MirrorCheckpointConnector | Source Kafka Cluster의 Consumer group offset을 Target Kafka Cluster로 복제 |
| MirrorHeartbeatConnector | Heartbeat 데이터를 기록하며 remote Kafka가 정상 동작하여 연결이 되는지 주기적 모니터링 |
Connector 주요 설정
| Configuration | Description | SourceConnector | CheckpointConnector | HeartbeatConnector |
|---|---|---|---|---|
| admin.timeout.ms | 새 topic 탐지와 같은 관리자 작업에 대한 타임아웃 | ✓ | ✓ | ✓ |
| replication.policy.class | 원격 topic 이름 지정 규칙을 정의하는 정책 | ✓ | ✓ | ✓ |
| replication.policy.separator | target 클러스터에서 이름 지정에 사용되는 구분 기호 | ✓ | ✓ | ✓ |
| consumer.poll.timeout.ms | source 클러스터를 폴링할 때 제한 시간 | ✓ | ✓ | |
| offset-syncs.topic.location | offset-syncs topic의 위치 | ✓ | ✓ | |
| topic.filter.class | 항목을 필터링하여 복제할 topic를 선택 | ✓ | ✓ | |
| config.property.filter.classtopic | 필터를 사용하여 복제할 topic config 속성을 선택 | ✓ | ||
| config.properties.exclude | 복제해서는 안 되는 topic config 속성. 쉼표로 구분된 속성 이름 및 정규식을 지원 | ✓ | ||
| offset.lag.max | 원격 파티션이 동기화되기 전에 최대 허용 가능(동기화되지 않음) 오프셋이 지연 | ✓ | ||
| offset-syncs.topic.replication.factor | 내부 오프셋 동기화 topic에 대한 replication factor | ✓ | ||
| refresh.topics.enabled | 새 topic와 파티션에 대한 검사를 활성화 | ✓ | ||
| refresh.topics.interval.seconds | Topic 새로 고침 빈도 | ✓ | ||
| replication.factor | 새 topic의 replication factor | ✓ | ||
| sync.topic.acls.enabled | Source 클러스터의 ACL 동기화를 활성화. User Operator와 호환되지 않음 | ✓ | ||
| sync.topic.acls.interval.seconds | ACL 동기화 빈도 | ✓ | ||
| sync.topic.configs.enabled | Source 클러스터에서 topic config의 동기화를 활성화 | ✓ | ||
| sync.topic.configs.interval.seconds | topic config 동기화의 빈도 | ✓ | ||
| checkpoints.topic.replication.factor | 내부 체크포인트 topic에 대한 replication factor | ✓ | ||
| emit.checkpoints.enabled | Target 클러스터와 consumer 오프셋의 동기화를 활성화 | ✓ | ||
| emit.checkpoints.interval.seconds | Consumer 오프셋 동기화의 빈도 | ✓ | ||
| group.filter.class | 복제할 consumer group을 선택하는 그룹 필터 | ✓ | ||
| refresh.groups.enabled | 새 consumer group에 대한 검사를 활성화 | ✓ | ||
| refresh.groups.interval.seconds | Consumer group 새로 고침 빈도 | ✓ | ||
| sync.group.offsets.enabled | Target 클러스터 __consumer_offsets topic에 대한 consumer group 오프셋의 동기화를 활성화 | ✓ | ||
| sync.group.offsets.interval.seconds | Consumer group 오프셋 동기화의 빈도 | ✓ | ||
| emit.heartbeats.enabled | Target 클러스터에서 연결 검사를 활성화 | ✓ | ||
| emit.heartbeats.interval.seconds | 연결 확인 빈도 | ✓ | ||
| heartbeats.topic.replication.factor | 내부 heartbeat topic의 replication factor | ✓ |
MM2 Connector 구성 방법
Kafka Connect의 REST API를 통해 MM2에서 사용되는 세 가지 Connector를 생성하는 방법과 이후 생성된 Connector 정보를 조회하는 방법을 설명합니다.
SourceConnector
curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mm2-msc",
"config":
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"clusters": "sourcemsc, targetmsc",
"source.cluster.alias": "sourcemsc",
"source.cluster.bootstrap.servers": "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
"target.cluster.alias": "targetmsc",
"target.cluster.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
"topics": ".*",
"groups": ".*",
"topics.blacklist": ".*[\\-\\.]internal, .*\\.replica, __consumer_offsets",
"groups.blacklist": "console-consumer-.*, connect-.*, __.*",
"tasks.max": "3",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"offset-syncs.topic.location": "target",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.configs.interval.seconds": "20",
"refresh.topics.interval.seconds": "20",
"producer.enable.idempotence": "true",
"sync.topic.acls.enabled": "true",
"replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
"producer.override.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"producer.override.security.protocol": "SASL_PLAINTEXT",
"producer.override.sasl.mechanism": "SCRAM-SHA-256",
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
}
}'
MirrorCheckpointConnector
curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mm2-cpc",
"config":
{
"name": "mm2-cpc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"clusters": "sourcecpc, targetcpc",
"source.cluster.alias": "sourcecpc",
"source.cluster.bootstrap.servers": "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
"target.cluster.alias": "targetcpc",
"target.cluster.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
"tasks.max": "3",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"checkpoints.topic.replication.factor": "3",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "20",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "20",
"refresh.groups.interval.seconds": "20",
"producer.override.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"producer.override.security.protocol": "SASL_PLAINTEXT",
"producer.override.sasl.mechanism": "SCRAM-SHA-256",
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
}
}'
MirrorHeartbeatConnector
curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mm2-hbc",
"config":
{
"name": "mm2-hbc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"clusters": "sourcehbc, targethbc",
"source.cluster.alias": "sourcehbc",
"source.cluster.bootstrap.servers": "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
"source.cluster.security.protocol": "SASL_PLAINTEXT",
"source.cluster.sasl.mechanism": "SCRAM-SHA-256",
"source.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
"target.cluster.alias": "targethbc",
"target.cluster.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-256",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
"tasks.max": "3",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"heartbeats.topic.replication.factor": "3",
"emit.heartbeats.enabled": "true",
"emit.heartbeats.interval.seconds": "1",
"producer.override.bootstrap.servers": "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
"producer.override.security.protocol": "SASL_PLAINTEXT",
"producer.override.sasl.mechanism": "SCRAM-SHA-256",
"producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
}
}'
Connector Producer와 Consumer 설정
| Connector | Role | Description | Config |
|---|---|---|---|
| Source Connector | Producer | topic 메시지를 target Kafka 클러스터로 보냅니다. 대량의 데이터를 처리할 때 이 생산자의 config을 튜닝하는 것이 좋습니다. | mirrors.sourceConnector.config: producer.override.* |
| Source Connector | Producer | 복제된 topic 파티션에 대한 source 및 target 오프셋을 매핑하는 offset-syncs 항목에 씁니다. | mirrors.sourceConnector.config: producer.* |
| Source Connector | Consumer | source Kafka 클러스터에서 topic 메시지를 검색합니다. | mirrors.sourceConnector.config: consumer.* |
| Checkpoint Connector | Producer | 소비자 오프셋 체크포인트를 내보냅니다. | mirrors.checkpointConnector.config: producer.override.* |
| Checkpoint Connector | Consumer | offset-syncs topic을 로드합니다. | mirrors.checkpointConnector.config: consumer.* |
| Heartbeat Connector | Producer | 하트비트를 내보냅니다. | mirrors.heartbeatConnector.config: producer.override.* |
Connector 확인
curl -X GET { kafka_connect_ip }:8083/connectors?expand=status |python -m json.tool
{
"mm2-cpc": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
"name": "mm2-cpc",
"tasks": [],
"type": "source"
}
},
"mm2-hbc": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
"name": "mm2-hbc",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
}
],
"type": "source"
}
},
"mm2-msc": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
"name": "mm2-msc",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "{ kafka_connect_ip }:8083"
}
],
"type": "source"
}
}
}
MM2 내부 Topics
MM2의 내부적인 운영을 위해 자동으로 생성되는 Topic이며, 복제 프로세스의 관리와 모니터링에 필수적인 역할을 합니다.
| Topic Name | Description |
|---|---|
| heartbeats | 클러스터의 상태를 모니터링하고, MirrorMaker 2 인스턴스가 정상적으로 작동하고 있는지 확인 |
| mm2-offset-syncs.source.internal | Source와 target kafka의 복제된 topic 파티션에 대한 클러스터 간 offset 매핑 정보 |
| source.checkpoints.internal | source 클러스터의 각 파티션에 대한 복제 진행 상황을 기록하는 데 사용 |

