Kafka_SASL_SCRAM 인증 기반 접속

Kafka_SASL_SCRAM 인증 기반 접속

개요

Apache Kafka는 대용량, 대규모 메시지 데이터를 빠르게 처리하도록 개발된 분산 메시징 플랫폼으로, Samsung Cloud Platform(이하 SCP)에서는 Kafka 클러스터를 자동 배포하고 모니터링 기능과 지표, 로그 및 인증 관리하는 기능을 서비스로 제공하고 있습니다. 일반적으로 인증은 ID/Password 또는 Token 기반 사용자가 서비스를 이용할 수 있도록 허용하는 것을 의미합니다. Kafka는 기본적으로 미인증 방식으로 동작하므로 클라이언트 접속 시 보안성 확보를 위해서 SASL(Simple Authentication and Security Layer)과 SCRAM(Salted Challenge Response Authentication Mechanism) 방식을 활용한 인증이 필요합니다. 이 문서에서는 SASL_SCRAM인증 기반 Kafka 클라이언트 접속 방법에 대해 소개합니다.

DB서비스 APACHE KAFKA 서버 내 인증 설정

SASL 인증

Kafka및 zookeeper는 기본적으로 미인증 방식으로 동작하기 때문에 보안을 위해선 인증설정을 진행해야 합니다. Kafka및 zookeeper가 제공하는 인증방식 중 SASL은 id와 password 기반으로 사용자를 인증하는 방식입니다. Kafka 및 zookeeper서버 SASL인증 설정을 위해선 아래와 같이 설정파일을 생성합니다.

[ Kafka 인증 파일 ]

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka 계정"
password="kafka 패스워드"
};

SCRAM 암호화

Kafka서버에 SASL인증 후 통신시 계정정보를 PLAINTEXT형태의 평문으로 통신하게 됩니다. 보안을 위해서는 계정정보는 암호화하여 보내야 합니다. Kafka는 SCRAM-256 및 SCRAM-512 방식의 암호화를 제공합니다.

[ Kafka 서버설정 내 SASL_SCRAM 설정 ]

listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SCRAM-SHASHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256

DB서비스 Apache Kafka 서버 인증

Apache Kafka 서비스는 SASL_SCRAM인증 설정이 되어있습니다. 계정정보는 아래의 서비스 신청화면의 Zookeeper 및 Kafka SASL 정보를 기반으로 생성되며, 클라이언트로 접속하기 위해선 이 문서의의 아래 내용을 참고하여 설정 후 진행해야 됩니다.

Apache Kafka 신청화면
그림. Apache Kafka 신청화면

SASL_SCRAM 설정 파일 생성

SASL 인증 파일 생성

클라라이언트용 SASL 인증파일은 conf확장자로 생성하며 파일 내부에는 KafkaClient 파라미터가 있습니다. KafkaClient는 Kafka용 인증을 담당합니다. 파라미터에는 인증 클래스를 할당해야 되는데, KafkaClient는 SCRAM용 클래스를 설정합니다. Kafka접속정보는 Apache Kafka DB서비스 상품 신청시 입력하였던 id/pw 정보를 작성합니다. (https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html#security-considerations-for-sasl-scram)

[ Kafka SASL 인증 파일 ]

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="Kafka 유저정보"
password="Kafka 패스워드"
};

프로토콜 설정 파일 생성

Kafka클라이언트는 SASL_SCRAM 접속시 프로토콜 설정파일이 필요합니다. 파일확장자는 properties이며, 내용은 SASL인증 프로토콜 및 SCRAM암호화 클래스를 작성합니다. [ Kafka 프로토콜프로토콜 설정설정 파일파일 ]

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

인증용 환경변수 등록

Kafka 클라이언트 실행시 이전 단계에서 생성한 SASL인증파일을 인식하도록 OS환경변수에 등록합니다. 변수명은 KAFKA_OPT이며, 내용은 인증용 클래스에 SASL인증파일 경로를 작성합니다. [ SASL SASL 환경환경 변수변수 ]

KAFKA_OPTS=Djava.security.auth.login.config="SASL 인증파일 경로"

Kafka 클라이언트 접속

Kafka 클라이언트 및 Java 설치

Apache Kafka 클라이언트는 https://kafka.apache.org/downloads 에서 서버 버전에 맞게 다운로드 후 원하는 경로에 압축해제 하여 설치를 진행합니다. DB서비스용 Apache Apache Kafka는 3.1버전 Scala 2.13버전이므로 해당 버전을 다운로드 합니다. JDK버전은 11버전 이상을 다운로드 후 압축해제 및 JAVA_HOME환경 변수를 등록합니다. 설치된 Kafka 클라이언트의 bin경로에 OS버전에 맞는 클라이언트들이 있습니다. .sh확장자는 리눅스, .bat확장자는 윈도우에서 사용가능합니다.

Kafka 파라미터 조회

Kafka 클라이언트에서 kafka-configs.sh 프로그램으로 Kafka 관련된 설정 및 파라미터 조회 및 수정을 진행 할 수 있습니다. 해당 프로그램을 실행하기 위해선 SASL 및 프로토콜 설정파일 생성 및 환경변수 등록이 먼저 선행되어야 합니다. 프로토콜 설정파일은 command-config 에 설정하여 SASL_SCRAM 설정 적용되도록 해야 합니다.

[ Kafka Cluster내 특정 Broker의 전체 파라미터 조회 ]

./kafka-configs.sh --bootstrap-server kafka 서버 IP”:”kakfa 서버 Port
--entity-type brokers --entity-name Kafka 서버 ID
--describe all --command-config 프로토콜 설정 파일


All configs for broker 1 are:
log.cleaner.min.compaction.lag.ms=0 sensitive=false
synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
offsets.topic.num.partitions=50 sensitive=false
synonyms={DEFAULT_CONFIG:offsets.topic.num.partitions=50}
sasl.oauthbearer.jwks.endpoint .refresh.ms=3600000 sensitive=false
synonyms={DEFAULT_CONFIG:sasl.oauthbearer.jwks.endpoint.refresh.ms=3600000}
log.flush.interval.messages=9223372036854775807 sensitive=false
synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}

Kafka Topic 관리

Kafka 클라이언트에서 kafka-topics.sh 프로그램으로 Topic 및 partition 생성 및 관리가 가능합니다 다른 클라이언트와 마찬가지로 설정파일 및 환경변수 등록이 선행되어야 합니다

[sdstopic Topic 생성]

./kafka-topics.sh --bootstrap-server kafka 서버 IP”:”kakfa 서버 Port
--create topic sdstopic --replication-factor 1 --partitions 8 --command-config 프로토콜 설정 파일

Created topic sdstopic.

[sdstopic Topic Partition 조회]

./kafka-topics.sh --bootstrap-server kafka 서버 IP”:”kakfa 서버 Port
--describe-topic sdstopic --command-config  프로토콜 설정 파일 


Topic: sdstopic TopicId: jVevO5k_Quek1xP4sAYgJw PartitionCount: 10
ReplicationFactor: 2
Configs:
Topic: sdstopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sdstopic Partition: 1 Leader: 1 Replicas: 1,2 I sr: 1,2
Topic: sdstopic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sdstopic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: sdstopic Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: sdstopic Partition: 5 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: sdstopic Partition: 6 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: sdstopic Partition: 7 Leader: 3 Replicas: 3,1 Isr: 1,3
Topic: sds topic Partition: 8 Leader: 3 Replicas: 3,1 Isr: 1,3
Topic: sdstopic Partition: 9 Leader: 3 Replicas: 3,1 Isr: 1,3

Java 클라이언트 접속

Java용 SASL_SCRAM 설정을 위해선 SASL_SCRAM 기반 properties 파일 생성 후 Kafka 접속 클래스에서 producer 및 consumer 객체에 해당정보를 설정합니다 Kafka 접속 클래스 초기화 방식은 프로젝트 환경에 맞게 진행하며 properties 파일에는 접속에 필요한 정보를 추가할 수 있습니다

[ Java용용 properties 파일 ]

classname= org.apache.kafka.common.security.scram.ScramLoginModule required
username=kafka 유저정보
password=kafka 패스워드
broker=kafka IP:Port
protocol=SASL_PLAINTEXT
mechanism=SCRAM-SHA-256
  • SCRAM접속을 위해 class 및 mechanism은 SCRAM으로 설정
  • Protocol은 SASL인증 설정
  • Id/pw 및 broker정보는 Apache Kafka 상품 신청 정보 활용하여 설정

[ Kafka 접속용 클래스 예제 ]

@PropertySource(value = { "classpath:kafka.properties"
public class kafkaInit {
@Autowired
private static Environment env;
private static Properties props;
private static Producer<String, String> producer;
private static Consumer<String, String> consumer;
public static void init() {
props = new Properties();
String jaas = String.format("%s username= username=\"%s \" password=\"%s\";"
,env.getProperty("classname")
,env.getProperty("username")
,env.getProperty("password")
props.put("bootstrap.servers", env.getProperty("brokers"));
props.put("security.protocol", env.getProperty("protocol"));
props.put("sasl.mechanism", env.getProperty("mechanism"));
props.put("sasl.jaas.config", jaas);
producer = new KafkaProducer<String, String>(props);
consumer = new KafkaConsumer<String, String>(props);
}

(https://docs.confluent.io/platform/current/kafka/authentication_sasl/index.html#enabling-multiple-sasl-mechanisms)