본문 바로가기

리눅스

Python으로 Kafka Consumer 생성

반응형

Python으로 Kafka Consumer 생성

Python에서는 confluent_kafka 라이브러리를 주로 사용하여 Kafka 클러스터와 상호작용합니다.

Apache Kafka - Cluster Architecture

출처-https://www.tutorialspoint.com/apache_kafka/images/cluster_architecture.jpg

1. 간단한 Kafka 컨슈머를 만들기

confluent_kafka 라이브러리를 설치합니다.

pip install confluent_kafka

Python Kafka Consumer 예제

  • Topic : test-topic-1
  • Kafka Broker Server : node1:9092, node2:9092, node3:9092
  • Consumers Group ID : my-consumer-group
vim python_kafka_consumer.py
from confluent_kafka import Consumer, KafkaException

# Kafka Consumer 설정
conf = {
    'bootstrap.servers': 'node1:9092,node2:9092,node3:9092',  # Kafka broker 목록
    'group.id': 'my-consumer-group',  # 컨슈머 그룹
    'auto.offset.reset': 'earliest',  # 시작 시점 (earliest: 처음부터, latest: 최신 메시지부터)
}

# Consumer 객체 생성
consumer = Consumer(conf)

# 구독할 토픽 설정(토픽 이름)
consumer.subscribe(['test-topic-1'])

try:
    while True:
        # 메시지 소비
        msg = consumer.poll(timeout=1.0)  # 1초 동안 메시지를 기다림

        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        
        # 메시지 출력
        print(f"Received message: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:
    pass
finally:
    # 컨슈머 종료
    consumer.close()

실행 권한 부여

chmod +x python_kafka_consumer.py

2. 프로듀서 메시지 전송(Send Message)

Topic : test-topic-1

/opt/kafka/bin/kafka-console-producer.sh --topic test-topic-1 --bootstrap-server node1:9092

3. 컨슈머 메시지 확인(Consume Message)

python python_kafka_consumer.py
$ python python_kafka_consumer.py 

Received message: value consumer test

kafka_produce-consumers

 

Kafka 클러스터의 브로커 주소와 토픽 이름을 지정하고 메시지를 실시간으로 소비하여 출력하는 방식으로 작동합니다.

 

728x90
반응형