Apache Kafka와 ZooKeeper 클러스터를 구성하는 방법
ZooKeeper는 Kafka의 클러스터 메타데이터를 관리하기 위해 필요하며 Kafka 브로커와 ZooKeeper의 클러스터 구성은 분산 환경에서 높은 가용성과 확장성을 제공합니다.
테스트 환경
Hostname | IP | Role | 비고 |
node1 | 192.168.10.111 | ||
node2 | 192.168.10.112 | ||
node3 | 192.168.10.113 |
1. Kafka 및 ZooKeeper 클러스터 구조 개요
ZooKeeper는 Kafka 클러스터의 상태를 관리하고 브로커 간의 협력을 조율합니다.
Kafka 브로커는 메시지를 저장하고 클라이언트(프로듀서 및 컨슈머)로부터 데이터를 송수신합니다. 여러 개의 브로커가 함께 작동하여 클러스터를 구성합니다.
2. 필수 조건
서버 : 각 브로커와 ZooKeeper 인스턴스를 실행할 물리적 또는 가상 서버.
Kafka 및 ZooKeeper 소프트웨어 : 동일한 Kafka 버전 및 ZooKeeper 버전을 각 서버에 설치.
Java 설치 : Kafka는 Java로 실행되므로 각 서버에 JDK 8 이상이 필요합니다.
3. 필수 패키지 설치
sudo apt-get install -y wget
4. 호스트 설정
우분투에서는 호스트 파일에 포함된 127.0.1.1 IP 주소를 주석 처리합니다.
sed -i '/^127\.0\.1\.1/s/^/#/' /etc/hosts
cat <<EOF | sudo tee -a /etc/hosts
#Zookeeper Cluster
192.168.10.111 node1
192.168.10.112 node2
192.168.10.113 node3
EOF
5. Java 설치
cd /usr/local/src
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
mkdir -p /opt/java
tar xf jdk-17_linux-x64_bin.tar.gz -C /opt/java --strip-components=1
환경 변수 파일에 JAVA_HOME 추가
echo "export JAVA_HOME=/opt/java" >> ~/.bashrc
echo "export PATH=\$JAVA_HOME/bin:\$PATH" >> ~/.bashrc
source ~/.bashrc
java --version
6. ZooKeeper 설치 및 설정
6.1. ZooKeeper 설치
각 서버에서 ZooKeeper를 다운로드하고 설치합니다.
cd /usr/local/src
wget https://downloads.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
mkdir -p /opt/zookeeper
tar xf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/zookeeper --strip-components=1
6.2. ZooKeeper 설정 파일 수정
각 서버에서 ZooKeeper의 설정 파일(zoo.cfg)을 수정하여 클러스터 구성을 설정합니다. 설정 파일은 conf 디렉토리 안에 있습니다.
cd /opt/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
---
cat zoo_sample.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
---
mkdir -p /opt/zookeeper/data
chmod -R 755 /opt/zookeeper/data
zoo.cfg 파일을 열고 다음 내용을 수정합니다.
vim /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPortAddress=0.0.0.0
clientPort=2181
maxClientCnxns=50
minSessionTimeout=2000
maxSessionTimeout=10000
4lw.commands.whitelist=stat, ruok, conf, isro, srvr
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
또는
cat <<EOF | sudo tee /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPortAddress=0.0.0.0
clientPort=2181
maxClientCnxns=50
minSessionTimeout=2000
maxSessionTimeout=10000
4lw.commands.whitelist=stat, ruok, conf, isro, srvr
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
EOF
- tickTime : ZooKeeper의 기본 단위 시간(밀리초)입니다.
- initLimit : 팔로워가 리더를 인식할 때까지의 초기화 제한 시간.
- syncLimit : 팔로워가 리더와 동기화할 때까지의 시간 제한.
- dataDir : ZooKeeper 데이터가 저장될 디렉토리.
- clientPort : 클라이언트가 ZooKeeper에 접속할 포트.
- maxClientCnxns : 클라이언트당 최대 연결 수.
- server.X : 각 ZooKeeper 서버의 호스트 이름과 포트 설정.
- 첫 번째 포트(2888)는 ZooKeeper가 다른 ZooKeeper 서버와 통신하는 데 사용되는 포트.
- 두 번째 포트(3888)는 리더 선출에 사용되는 포트.
6.3. 각 ZooKeeper 서버에 서버 ID 설정
각 ZooKeeper 서버의 dataDir에 myid 파일을 만들어 각 노드에 고유한 ID를 할당합니다.
서버 1에서는 1
echo "1" > /opt/zookeeper/data/myid
서버 2에서는 2
echo "2" > /opt/zookeeper/data/myid
서버 3에서는 3
echo "3" > /opt/zookeeper/data/myid
6.4. ZooKeeper 시작
모든 서버에서 ZooKeeper를 시작합니다.
/opt/zookeeper/bin/zkServer.sh start
$ /opt/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
ZooKeeper Stop
/opt/zookeeper/bin/zkServer.sh stop
클러스터가 정상적으로 작동하는지 확인
/opt/zookeeper/bin/zkServer.sh status
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
각 서버에서 클러스터의 리더와 팔로워 상태를 확인할 수 있습니다.
6.5 ZooKeeper 명령어로 확인
정상적으로 작동하는지 확인
- ZooKeeper 클라이언트를 실행하고 stat 명령을 사용하면 서버의 상태와 버전을 확인할 수 있습니다.
echo stat | nc <zookeeper-host> <zookeeper-port>
echo stat | nc localhost 2181
$ echo stat | nc localhost 2181
Zookeeper version: 3.8.4-9316c2a7a97e1666d8f4593f34dd6fc36ecc436c, built on 2024-02-12 22:16 UTC
Clients:
/127.0.0.1:52272[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000002
Mode: follower
Node count: 5
$ echo stat | nc localhost 2181
Zookeeper version: 3.8.4-9316c2a7a97e1666d8f4593f34dd6fc36ecc436c, built on 2024-02-12 22:16 UTC
Clients:
/127.0.0.1:52272[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000002
Mode: follower
Node count: 5
6.6. ZooKeeper 서비스 등록
---
ZooKeeper 서비스 파일 생성
sudo vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper Service
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Environment="JAVA_HOME=/opt/java"
WorkingDirectory=/opt/zookeeper
Type=forking
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target
서비스 파일 재로드 및 시작
sudo systemctl daemon-reload
sudo systemctl start zookeeper
sudo systemctl enable zookeeper
서비스 상태 확인
sudo systemctl status zookeeper
---
7. Kafka 클러스터 구성
7.1. Kafka 설치
각 서버에서 Kafka를 다운로드하고 설치합니다.
cd /usr/local/src
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
mkdir -p /opt/kafka
tar xf kafka_2.13-3.8.0.tgz -C /opt/kafka --strip-components=1
7.2. Kafka 설정 파일 수정
mkdir -p /opt/kafka/logs
cd /opt/kafka/config
vim server.properties
---
cat server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
---
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=0
log.dirs=/var/lib/kafka-logs
# ZooKeeper 클러스터 정보
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
# 파티션 수 설정
num.partitions=3
# 오프셋 토픽의 복제 계수 설정 (브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
- broker.id : 각 Kafka 브로커에 고유한 ID를 설정합니다.
- zookeeper.connect : ZooKeeper 클러스터의 호스트 및 포트를 지정합니다.
(또는)
cat <<EOF | sudo tee /opt/kafka/config/server.properties
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=0
# 네트워크 및 IO 설정
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3
num.recovery.threads.per.data.dir=1
# 오프셋 토픽의 복제 계수 설정(브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 로그 설정
log.dirs=/opt/kafka/logs
# 로그 보관 설정
log.retention.hours=168
log.retention.check.interval.ms=300000
# Zookeeper 클러스터 정보
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
node2, node3 server.propertie
---
cat <<EOF | sudo tee /opt/kafka/config/server.properties
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=1
# 네트워크 및 IO 설정
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3
num.recovery.threads.per.data.dir=1
# 오프셋 토픽의 복제 계수 설정(브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 로그 설정
log.dirs=/opt/kafka/logs
# 로그 보관 설정
log.retention.hours=168
log.retention.check.interval.ms=300000
# Zookeeper 클러스터 정보
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
cat <<EOF | sudo tee /opt/kafka/config/server.properties
# 각 브로커에 고유한 ID 설정 (예: 0, 1, 2)
broker.id=2
# 네트워크 및 IO 설정
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3
num.recovery.threads.per.data.dir=1
# 오프셋 토픽의 복제 계수 설정(브로커 수만큼 설정)
offsets.topic.replication.factor=3
# 기본 복제 계수
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 로그 설정
log.dirs=/opt/kafka/logs
# 로그 보관 설정
log.retention.hours=168
log.retention.check.interval.ms=300000
# Zookeeper 클러스터 정보
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
---
7.3. Kafka 시작
모든 서버에서 Kafka 브로커를 시작합니다.
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Kafka가 정상적으로 시작되었는지 확인하려면 로그 파일을 확인하거나 kafka-broker-api-versions.sh 스크립트를 실행합니다.
Kafka 중지
/opt/kafka/bin/kafka-server-stop.sh
7.4 Kafka 서비스 상태 확인
Kafka가 실행 중인 서버에서 Kafka 브로커가 정상적으로 작동 중인지 확인하려면 kafka-broker-api-versions.sh 명령을 사용해 브로커 버전을 조회합니다.
/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
7.5 Kafka 서비스 등록
---
Kafka 서비스 파일 생성
sudo vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Service
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Environment="JAVA_HOME=/opt/java"
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
서비스 파일 재로드 및 시작
sudo systemctl daemon-reload
sudo systemctl start kafka
sudo systemctl enable kafka
서비스 상태 확인
sudo systemctl status kafka
---
8. Kafka 클러스터 상태 확인
Kafka 클러스터가 올바르게 동작하는지 확인하려면 다음 명령을 사용할 수 있습니다.
8.1. 토픽 생성 및 확인
Kafka 클러스터에서 새로운 토픽을 생성하여 브로커들이 올바르게 통신하고 있는지 확인합니다.
/opt/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server node1:9092 --replication-factor 3 --partitions 3
8.2. 토픽 목록 조회
토픽이 정상적으로 생성되었는지 확인합니다.
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server node1:9092
8.3. 메시지 프로듀서 및 컨슈머 테스트
프로듀서를 실행하여 메시지를 전송합니다(메시지 보내기).
/opt/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server node1:9092
컨슈머를 실행하여 메시지를 수신합니다(메시지 읽기).
/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server node1:9092
/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server node2:9092
/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server node3:9092
Kafka와 ZooKeeper 클러스터가 완성되었습니다.
참고URL
- ORACLE : Java downloads(java17)
- Apache Software Foundation projects : kafka
- Apache Software Foundation projects : zookeeper
'리눅스' 카테고리의 다른 글
Kafka-UI 도구를 사용하여 Kafka 클러스터를 관리하는 방법 (0) | 2024.10.11 |
---|---|
127.0.1.1의 의미 (1) | 2024.10.10 |
동적 라이브러리와 정적 라이브러리의 차이점 (0) | 2024.10.07 |
CentOS 7에 YUM을 사용하여 MySQL을 설치하는 방법 (3) | 2024.10.06 |
ss 명령어 (0) | 2024.10.06 |