MirrorMaker 2 Connector를 활용한 Apache Kafka DR

MirrorMaker 2 Connector를 활용한 Apache Kafka DR

개요

Mirror Maker 2(이하 MM2)는 Kafka Connect Framework 를 기반으로 하여 서로 다른 두 개의 Apache Kafka 클러스터 간에 토픽을 복제하는 애플리케이션입니다. Apache Kafka(이하 Kafka)에서 자체적으로 제공하는 MirrorMaker를 사용하는 방법과 수동으로 MM2의 Connector를 설정하는 방법이 있습니다. Kafka에서 기본적으로 제공하는 자체 MirrorMaker 구성은 분산 모드를 지원하지 않습니다. 따라서 더 안정적이고 확장 가능한 솔루션을 구축하기 위해 본 문서에서는 Kafka Connect Distributed Mode 를 통해 분산 모드 구성을 하고, MM2 에서 사용하는 3 개의 Connector 를 수동으로 구성하는 방법을 살펴봅니다.

MM2 특징

  • Cluster 간 데이터 복제
  • 새로운 Topic, Partition 감지
  • Topic 및 Consumer group offset 동기화

MM2를 활용한 Kafka DR 구성

Source Kafka Cluster 에서 Target Kafka Cluster 로 데이터를 복제하는 아키텍처를 기반으로 합니다. 시스템에 장애가 발생할 경우, Target Kafka Cluster 는 백업 데이터 소스로서의 역할을 하여 데이터 복구를 가능하게 합니다. (본 문서에서는 Network 구성에 대해서는 다루지 않습니다.)

MM2를 활용한 Kafka DR 구성 예시
그림. MM2를 활용한 Kafka DR 구성 예시

MM2는 Kafka Connect를 활용하여 구성된 클러스터 내에서 작동하며, 각 Worker는 클러스터 내의 독립적인 노드로서 하나 이상의 Connector 인스턴스를 실행합니다. 각 Connector들은 다시 하나 이상의 Task를 가질 수 있으며, 해당 Task들은 데이터를 복제하고 처리하는 실질적인 작업을 담당합니다.

Kafka Connect Cluster 상세
그림. Kafka Connect Cluster 상세

MM2 Connectors

Connectors

Connector NameDescription
MirrorSourceConnectorKafka 레코드 복제 (source→target) 수행
MirrorCheckpointConnectorSource Kafka Cluster의 Consumer group offset을 Target Kafka Cluster로 복제
MirrorHeartbeatConnectorHeartbeat 데이터를 기록하며 remote Kafka가 정상 동작하여 연결이 되는지 주기적 모니터링

Connector 주요 설정

ConfigurationDescriptionSourceConnectorCheckpointConnectorHeartbeatConnector
admin.timeout.ms새 topic 탐지와 같은 관리자 작업에 대한 타임아웃
replication.policy.class원격 topic 이름 지정 규칙을 정의하는 정책
replication.policy.separatortarget 클러스터에서 이름 지정에 사용되는 구분 기호
consumer.poll.timeout.mssource 클러스터를 폴링할 때 제한 시간
offset-syncs.topic.locationoffset-syncs topic의 위치
topic.filter.class항목을 필터링하여 복제할 topic를 선택
config.property.filter.classtopic필터를 사용하여 복제할 topic config 속성을 선택
config.properties.exclude복제해서는 안 되는 topic config 속성. 쉼표로 구분된 속성 이름 및 정규식을 지원
offset.lag.max원격 파티션이 동기화되기 전에 최대 허용 가능(동기화되지 않음) 오프셋이 지연
offset-syncs.topic.replication.factor내부 오프셋 동기화 topic에 대한 replication factor
refresh.topics.enabled새 topic와 파티션에 대한 검사를 활성화
refresh.topics.interval.secondsTopic 새로 고침 빈도
replication.factor새 topic의 replication factor
sync.topic.acls.enabledSource 클러스터의 ACL 동기화를 활성화. User Operator와 호환되지 않음
sync.topic.acls.interval.secondsACL 동기화 빈도
sync.topic.configs.enabledSource 클러스터에서 topic config의 동기화를 활성화
sync.topic.configs.interval.secondstopic config 동기화의 빈도
checkpoints.topic.replication.factor내부 체크포인트 topic에 대한 replication factor
emit.checkpoints.enabledTarget 클러스터와 consumer 오프셋의 동기화를 활성화
emit.checkpoints.interval.secondsConsumer 오프셋 동기화의 빈도
group.filter.class복제할 consumer group을 선택하는 그룹 필터
refresh.groups.enabled새 consumer group에 대한 검사를 활성화
refresh.groups.interval.secondsConsumer group 새로 고침 빈도
sync.group.offsets.enabledTarget 클러스터 __consumer_offsets topic에 대한 consumer group 오프셋의 동기화를 활성화
sync.group.offsets.interval.secondsConsumer group 오프셋 동기화의 빈도
emit.heartbeats.enabledTarget 클러스터에서 연결 검사를 활성화
emit.heartbeats.interval.seconds연결 확인 빈도
heartbeats.topic.replication.factor내부 heartbeat topic의 replication factor

MM2 Connector 구성 방법

Kafka Connect의 REST API를 통해 MM2에서 사용되는 세 가지 Connector를 생성하는 방법과 이후 생성된 Connector 정보를 조회하는 방법을 설명합니다.

SourceConnector

curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "mm2-msc",
    "config":
    {
        "name": "mm2-msc",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "clusters": "sourcemsc, targetmsc",
        "source.cluster.alias": "sourcemsc",
        "source.cluster.bootstrap.servers":  "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
        "source.cluster.security.protocol": "SASL_PLAINTEXT",
        "source.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "source.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
        "target.cluster.alias": "targetmsc",
        "target.cluster.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "target.cluster.security.protocol": "SASL_PLAINTEXT",
        "target.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
        "topics": ".*",
        "groups": ".*",
        "topics.blacklist": ".*[\\-\\.]internal, .*\\.replica, __consumer_offsets",
        "groups.blacklist": "console-consumer-.*, connect-.*, __.*",
        "tasks.max": "3",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "offset-syncs.topic.location": "target",
        "offset-syncs.topic.replication.factor": "3",
        "sync.topic.configs.interval.seconds": "20",
        "refresh.topics.interval.seconds": "20",
        "producer.enable.idempotence": "true",
        "sync.topic.acls.enabled": "true",
        "replication.policy.class": "org.apache.kafka.connect.mirror.IdentityReplicationPolicy",
        "producer.override.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "producer.override.security.protocol": "SASL_PLAINTEXT",
        "producer.override.sasl.mechanism": "SCRAM-SHA-256",
        "producer.override.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
  }
}'

MirrorCheckpointConnector

curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "mm2-cpc",
    "config":
    {
        "name": "mm2-cpc",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "clusters": "sourcecpc, targetcpc",
        "source.cluster.alias": "sourcecpc",
        "source.cluster.bootstrap.servers":  "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
        "source.cluster.security.protocol": "SASL_PLAINTEXT",
        "source.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "source.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
        "target.cluster.alias": "targetcpc",
        "target.cluster.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "target.cluster.security.protocol": "SASL_PLAINTEXT",
        "target.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "target.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
        "tasks.max": "3",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "checkpoints.topic.replication.factor": "3",
        "emit.checkpoints.enabled": "true",
        "emit.checkpoints.interval.seconds": "20",
        "sync.group.offsets.enabled": "true",
        "sync.group.offsets.interval.seconds": "20",
        "refresh.groups.interval.seconds": "20",
        "producer.override.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "producer.override.security.protocol": "SASL_PLAINTEXT",
        "producer.override.sasl.mechanism": "SCRAM-SHA-256",
        "producer.override.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
    }
  }'

MirrorHeartbeatConnector

curl --request POST '{ kafka_connect_ip }:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "mm2-hbc",
    "config":
    {
        "name": "mm2-hbc",
        "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "clusters": "sourcehbc, targethbc",
        "source.cluster.alias": "sourcehbc",
        "source.cluster.bootstrap.servers":  "{ source_kafka_ip1 }:{ source_kafka_port1 },{ source_kafka_ip2 }:{ source_kafka_port2 }, …",
        "source.cluster.security.protocol": "SASL_PLAINTEXT",
        "source.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "source.cluster.sasl.jaas.config":   "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{ source_kafka_username }\" password=\"{ source_kafka_password }\";",
        "target.cluster.alias": "targethbc",
        "target.cluster.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "target.cluster.security.protocol": "SASL_PLAINTEXT",
        "target.cluster.sasl.mechanism": "SCRAM-SHA-256",
        "target.cluster.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";",
        "tasks.max": "3",
        "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "heartbeats.topic.replication.factor": "3",
        "emit.heartbeats.enabled": "true",
        "emit.heartbeats.interval.seconds": "1",
        "producer.override.bootstrap.servers":  "{ target_kafka_ip1 }:{ target_kafka_port1 },{ target_kafka_ip2 }:{ target_kafka_port2 }, …",
        "producer.override.security.protocol": "SASL_PLAINTEXT",
        "producer.override.sasl.mechanism": "SCRAM-SHA-256",
        "producer.override.sasl.jaas.config":  "org.apache.kafka.common.security.scram.ScramLoginModule required  username=\"{ target_kafka_username }\" password=\"{ target_kafka_password }\";"
    }
  }'

Connector Producer와 Consumer 설정

ConnectorRoleDescriptionConfig
Source ConnectorProducertopic 메시지를 target Kafka 클러스터로 보냅니다. 대량의 데이터를 처리할 때 이 생산자의 config을 튜닝하는 것이 좋습니다.mirrors.sourceConnector.config: producer.override.*
Source ConnectorProducer복제된 topic 파티션에 대한 source 및 target 오프셋을 매핑하는 offset-syncs 항목에 씁니다.mirrors.sourceConnector.config: producer.*
Source ConnectorConsumersource Kafka 클러스터에서 topic 메시지를 검색합니다.mirrors.sourceConnector.config: consumer.*
Checkpoint ConnectorProducer소비자 오프셋 체크포인트를 내보냅니다.mirrors.checkpointConnector.config: producer.override.*
Checkpoint ConnectorConsumeroffset-syncs topic을 로드합니다.mirrors.checkpointConnector.config: consumer.*
Heartbeat ConnectorProducer하트비트를 내보냅니다.mirrors.heartbeatConnector.config: producer.override.*

Connector 확인

curl -X GET { kafka_connect_ip }:8083/connectors?expand=status |python -m json.tool

{
    "mm2-cpc": {
        "status": {
            "connector": {
                "state": "RUNNING",
                "worker_id": "{ kafka_connect_ip }:8083"
            },
            "name": "mm2-cpc",
            "tasks": [],
            "type": "source"
        }
    },
    "mm2-hbc": {
        "status": {
            "connector": {
                "state": "RUNNING",
                "worker_id": "{ kafka_connect_ip }:8083"
            },
            "name": "mm2-hbc",
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "{ kafka_connect_ip }:8083"
                }
            ],
            "type": "source"
        }
    },
    "mm2-msc": {
        "status": {
            "connector": {
                "state": "RUNNING",
                "worker_id": "{ kafka_connect_ip }:8083"
            },
            "name": "mm2-msc",
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "{ kafka_connect_ip }:8083"
                },
                {
                    "id": 1,
                    "state": "RUNNING",
                    "worker_id": "{ kafka_connect_ip }:8083"
                }
            ],
            "type": "source"
        }
    }
}

MM2 내부 Topics

MM2의 내부적인 운영을 위해 자동으로 생성되는 Topic이며, 복제 프로세스의 관리와 모니터링에 필수적인 역할을 합니다.

Topic NameDescription
heartbeats클러스터의 상태를 모니터링하고, MirrorMaker 2 인스턴스가 정상적으로 작동하고 있는지 확인
mm2-offset-syncs.source.internalSource와 target kafka의 복제된 topic 파티션에 대한 클러스터 간 offset 매핑 정보
source.checkpoints.internalsource 클러스터의 각 파티션에 대한 복제 진행 상황을 기록하는 데 사용

테스트

테스트 구성
그림. 테스트 구성

Connector 목록 조회

Connector 목록 조회
그림. Connector 목록 조회

기존 Topic 복제 확인

기존 Topic 복제 확인
그림. 기존 Topic 복제 확인

신규 생성된 Topic 복제 확인

신규 생성된 Topic 복제 확인
그림. 신규 생성된 Topic 복제 확인

Topic 내 Message 복제 확인

Topic 내 Message 복제 확인
그림. Topic 내 Message 복제 확인