본문 바로가기

리눅스

Kafka에서 메시지를 수신하고 해당 메시지를 Slack으로 보내는 파이썬 코드를 작성하기

반응형

Kafka에서 메시지를 수신하고 해당 메시지를 Slack으로 보내는 파이썬 코드를 작성하기

Kafka 메시지 소비 : kafka-python 라이브러리를 사용할 수 있습니다.

Slack으로 메시지 전송 : Slack의 Webhook을 사용하여 메시지를 보냅니다.

Python 설치

기존 Python 제거

sudo apt-get remove --purge python3

Python 설치

sudo apt-get update
sudo apt-get install -y python3 python3-pip python3-venv

가상 환경 생성 및 패키지 설치

가상 환경을 생성하고 필요한 패키지를 설치할 수 있습니다.

가상 환경 생성

python3 -m venv myenv

가상 환경 활성화 (Linux/Mac)

source myenv/bin/activate

필요한 패키지 설치

pip install kafka-python requests six

코드 작성

vim python_kafka_consumer.py
import json
import requests
from kafka import KafkaConsumer

def send_slack_message(webhook_url, message):
    """슬랙으로 메시지를 전송합니다."""
    headers = {
        'Content-Type': 'application/json',
    }
    data = {
        "text": message
    }
    
    response = requests.post(webhook_url, headers=headers, data=json.dumps(data))

    if response.status_code == 200:
        print("슬랙으로 메시지가 성공적으로 전송되었습니다!")
    else:
        print(f"슬랙으로 메시지 전송 실패: {response.status_code}, {response.text}")

def consume_kafka_messages(kafka_servers, topic, slack_webhook_url):
    """Kafka에서 메시지를 소비하고 슬랙으로 전송합니다."""
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=kafka_servers,
        auto_offset_reset='earliest',  # 'latest'로 변경하면 마지막 오프셋부터 시작
        enable_auto_commit=True,
        group_id='slack-consumer-group',
        value_deserializer=lambda x: x.decode('utf-8')
    )

    print(f"Kafka 토픽 '{topic}'에서 메시지를 소비 중...")
    
    try:
        # Kafka에서 메시지 수신 및 Slack 전송
        for message in consumer:
            print(f"수신된 메시지: {message.value}")
            send_slack_message(slack_webhook_url, message.value)

    except KeyboardInterrupt:
        print("\n종료 중입니다...")

    finally:
        consumer.close()
        print("Kafka 소비자가 종료되었습니다.")

if __name__ == "__main__":
    # 설정 값
    kafka_bootstrap_servers = ['node3:9092']  # Kafka 서버 주소
    topic = 'test-topic'  # Kafka 토픽 이름
    slack_webhook_url = 'https://hooks.slack.com/services/your/webhook/url'  # Slack Webhook URL

    # Kafka 메시지를 소비하고 Slack으로 전송
    consume_kafka_messages(kafka_bootstrap_servers, topic, slack_webhook_url)

코드 실행

python python_kafka_consumer.py
$ python python_kafka_consumer.py 
Kafka 토픽 'test-topic'에서 메시지를 소비 중...

수신된 메시지: hi
슬랙으로 메시지가 성공적으로 전송되었습니다!
^C
종료 중입니다...
Kafka 소비자가 종료되었습니다.

Ctrl+C로 코드를 종료합니다.

가상 환경 비활성화 (Linux/Mac)

deactivate

 

728x90
반응형