KAFKA CONNECT 분산 모드 구성 가이드
KAFKA CONNECT 분산 모드 구성 가이드
개요
최근 데이터 통합과 스트리밍 분야에서 데이터 파이프라인 관리와 운영에 있어 Apache Kafka가 핵심 기술로 떠오르고 있습니다. Kafka Connect는 이러한 Apache Kafka의 핵심 구성 요소 중 하나로, 외부 시스템과 Apache Kafka 클러스터 간의 스트림 데이터를 실시간으로 통합하고 처리하는 서비스를 제공합니다. 이 문서는 사용자가 Samsung Cloud Platform(이하 SCP)에서 Kafka Connect 분산 모드를 효과적으로 구성하도록 지원하는 것을 목적으로 하고 있습니다. 이를 통해 사용자는 보다 쉽고 효율적으로 데이터 소스와 Kafka 클러스터 간 신뢰성 있는 데이터 통합을 수행할 수 있을 것입니다.
Kafka Connect 분산 모드 구성 가이드
Kafka Connect는 데이터베이스, 키-값 저장소, 검색 색인 및 파일 시스템 간의 간단한 데이터 통합을 위한 중앙 집중식 데이터 허브 역할을 하는 Apache Kafka의 무료 오픈 소스 구성 요소입니다. Kafka Connect를 사용하여 Apache Kafka와 다른 데이터 시스템 간에 데이터를 스트리밍하고 Kafka 안팎으로 대규모 데이터 세트를 이동하는 Connector를 빠르게 만들 수 있습니다. Kafka Connect 는 하나의 Connect만 사용하는 단일 모드(Standalone)와 여러개의 Connect를 한개의 클러스트로 묶어서 사용하는 분산 모드(Distributed)가 있습니다. 이 문서에서는 분산 모드(Distributed)로 구성하는 방법을 소개합니다.
Source Connector, Sink Connector
- Source Connector: Data source에 담긴 데이터를 Apache Kafka의 topic에 담는 역할(Producer)을 하는 Connector
- Sink Connector: Apache Kafka topic에 담긴 데이터를 특정 Data source로 보내는 역할(Consumer)을 하는 Connector
파일 다운로드
Apache kafka 패키지를 공식 웹 사이트( https://kafka.apache.org/download )에서 다운로드 가능하며, Kafka Connect 설치 또한 이를 통해 수행할 수 있습니다. 현재 SCP에서 제공하는 Apache Kafka 버전은 3.8.0과 3.5.0이며, Kafka Connect도 이와 동일 버전의 사용이 권장됩니다.
connect-distributed.properties 설정
{kafka_dir}/config에 기본 제공되는 connect-distributed.properties 파일의 설정 값을 변경합니다.
bootstrap.servers={kafka_ip1}:{port},{kafka_ip2}:{port},{kafka_ip3}:{port}
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
listeners=HTTP://{connect_ip}:{connect_port}
rest.advertised.port={connect_port}
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
producer.sasl.mechanism=SCRAM-SHA-256
producer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
consumer.sasl.mechanism=SCRAM-SHA-256
consumer.security.protocol=SASL_PLAINTEXT
connector.client.config.override.policy=All
주요 config 설명
- bootstrap.servers: Kafka 브로커의 위치 지정
- group.id: Kafka Connect 클러스터 내에서 작업자(worker)를 그룹화하는 역할. 분산 모드에서 클러스터 구성을 위해 이 값을 반드시 동일하게 설정.
- key.converter / value.converter: 메세지의 key와 value의 변환 클래스 입력. JsonConverter가 default 값
- offset.storage.topic: offset이 저장될 topic 설정
- config.storage.topic: config가 저장될 topic 설정
- status.storage.topic: 상태가 저장될 topic 설정
- listener: Kafka Connect worker가 사용할 REST API 엔드포인트의 호스트 이름과 포트 설정. 엔드포인트는 connector 구성 및 관리를 위해 사용됨.
- SASL 설정: SCP에서 현재 제공 중인 Apache Kafka 서비스로 SASL_PLAINTEXT 보안 프로토콜 사용
- connector.client.config.override.policy: Connector가 Kafka 클라이언트 설정을 재정의(override)할 수 있는 정책
Connect 실행
실행 방법
Connect 서버에 접속한 후 ‘connect-distributed.sh
/{kafka_dir}/bin/connect-distributed.sh /{kafka_dir}/config/connect-distributed.properties
실행 확인
curl –X GET http://{connect_ip}:{connect_port}
{"version":"3.1.0","commit":"c97b88d5db4de28d","kafka_cluster_id":"3Xxgk_3wSjq9OdEUIar6Aw"}
Connect topic 확인
Connect 설정 중 bootstrap.servers에 설정한 Apache Kafka 브로커 서버에 접속하여, connect 관련 topic이 생성된 것을 확인할 수 있다.
/{kafka_dir}/bin/kafka-topics.sh --bootstrap-server {ip}:{port} --list --command-config {client_authentication_properties_file_path}
__consumer_offsets
connect-configs
connect-offsets
connect-status
| Topic Name | Description |
|---|---|
| connect-configs | Kafka Connect의 구성 정보를 저장합니다. 여기에는 각 Connector의 구성 설정이 포함되어 있으며, Kafka Connect 클러스터 내의 모든 노드가 이 정보를 사용하여 어떤 Connector가 실행되어야 하는지를 알 수 있습니다. |
| connect-offsets | Source Connector가 마지막으로 읽은 데이터의 위치(오프셋)를 추적하는 데 사용됩니다. 이 토픽은 Kafka Connect가 중단 후 재시작될 때 중복 또는 데이터 손실 없이 정확한 위치에서 읽기를 재개할 수 있도록 합니다. |
| connect-status | Connector와 작업의 상태 정보가 저장되는 토픽입니다. 여기에는 각 Connector 및 작업의 상태(예: RUNNING, PAUSED, FAILED 등)가 포함됩니다. 이 정보는 Kafka Connect의 작업을 모니터링하고 관리하는 데 사용됩니다. |
자주 쓰는 REST API
Connector API
| Connector API | Description |
|---|---|
| 목록 조회 | curl -X GET "http://{connect_ip}:{connect_port}/connectors/" |
| 상세 정보 조회 | curl -X GET "http://{connect_ip}:{connect_port}/connectors?expand=status&expand=info" |
| Config 조회 | curl -X GET "http://{connect_ip}:{connect_port}/connectors/{connector_name}/config" |
| 재시작 | curl -X POST "http://{connect_ip}:{connect_port}/connectors/{connector_name}/restart" |
| 일시 중지 (pause) | curl -X PUT "http://{connect_ip}:{connect_port}/connectors/{connector_name}/pause" |
| 복귀 (resume) | curl -X PUT "http://{connect_ip}:{connect_port}/connectors/{connector_name}/resume" |
| 삭제 | curl -X DELETE "http://{connect_ip}:{connect_port}/connectors/{connector_name}" |
Task API
| Task API | Description |
|---|---|
| 목록 조회 | curl -X GET "http://{connect_ip}:{connect_port}/connectors/{connector_name}/tasks" |
| 상태 조회 | curl -X GET "http://{connect_ip}:{connect_port}/connectors/{connector_name}/tasks/{task_id}/status" |
| 재시작 | curl -X POST "http://{connect_ip}:{connect_port}/connectors/{connector_name}/tasks/{task_id}/restart" |
File Connector 활용
FileStreamSourceConnector
Config 및 생성
curl --request POST '{connect_ip}:{connect_port}/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "file-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"topic": "{topic_name}",
"file": "{source_file_path}"
}
}'
FileStreamSinkConnector
Config 및 생성
curl --request POST '{connect_ip}:{connect_port}/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "file-sink-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "{topic_name}",
"file": "{sink_file_path}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
Connector 확인
curl -X GET {connect_ip}:{connect_port}:/connectors
[“file-sink-connector”, “file-source-connector”]
Connector 상태 상세 확인
Connector 상세 정보 조회 Rest API를 통해 생성된 Connector의 상태 확인과 task 상태, 실행 중인 Worker 노드 정보를 조회할 수 있습니다.
curl -X GET {connect_ip}:{connect_port}/connectors?expand=status |python -m json.tool
{
"file-sink-connector": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{connect_ip}:{connect_port}"
},
"name": "file-sink-connector",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "{connect_ip}:{connect_port}"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "{connect_ip}:{connect_port}"
}
],
"type": "sink"
}
},
"file-source-connector": {
"status": {
"connector": {
"state": "RUNNING",
"worker_id": "{connect_ip}:{connect_port}"
},
"name": "file-source-connector",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "{connect_ip}:{connect_port}"
}
],
"type": "source"
}
}
}
테스트
file-source-connector가 있는 Worker 노드의 Source file에 Data 입력
echo test_message1 » {source_file_path}
echo test_message2 » {source_file_path}
Source file에 입력한 내용을 Apache Kafka topic data에서 확인
{kafka_dir} /kafka-console-consumer.sh –bootstrap-server {kafka_ip}:{kafka_port} –topic “{topic_name}” –consumer.config {client_ authentication_properties_file_path} –from-beginning
{“schema”:{“type”:“string”,“optional”:false},“payload”:“test_message1”}
{“schema”:{“type”:“string”,“optional”:false},“payload”:“test_message2”}
file-sink-connector가 있는 Worker 노드의 Sink file에 data 출력 확인
cat {sink_file_path}
{“schema”:{“type”:“string”,“optional”:false},“payload”:“test_message1”}
{“schema”:{“type”:“string”,“optional”:false},“payload”:“test_message2”}
그 외 Connector
Kafka Connect는 File Connector 외에도 다양한 Connector를 통해 여러 외부 시스템과 통합이 가능하며, 공식 웹사이트(https://www.confluent.io/product/connectors/)나 다른 오픈 소스 커뮤니티를 통해 Connector들을 찾아볼 수 있습니다. 공식 사이트에서 각 Connector의 사용 방법, 구성 예제, 그리고 관련 문서를 참고할 수 있도록 제공하고 있습니다.