반응형
Kafka에서 메시지를 수신하고 해당 메시지를 Slack으로 보내는 파이썬 코드를 작성하기
Kafka 메시지 소비 : kafka-python 라이브러리를 사용할 수 있습니다.
Slack으로 메시지 전송 : Slack의 Webhook을 사용하여 메시지를 보냅니다.
Python 설치
기존 Python 제거
sudo apt-get remove --purge python3
Python 설치
sudo apt-get update
sudo apt-get install -y python3 python3-pip python3-venv
가상 환경 생성 및 패키지 설치
가상 환경을 생성하고 필요한 패키지를 설치할 수 있습니다.
가상 환경 생성
python3 -m venv myenv
가상 환경 활성화 (Linux/Mac)
source myenv/bin/activate
필요한 패키지 설치
pip install kafka-python requests six
코드 작성
vim python_kafka_consumer.py
import json
import requests
from kafka import KafkaConsumer
def send_slack_message(webhook_url, message):
"""슬랙으로 메시지를 전송합니다."""
headers = {
'Content-Type': 'application/json',
}
data = {
"text": message
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("슬랙으로 메시지가 성공적으로 전송되었습니다!")
else:
print(f"슬랙으로 메시지 전송 실패: {response.status_code}, {response.text}")
def consume_kafka_messages(kafka_servers, topic, slack_webhook_url):
"""Kafka에서 메시지를 소비하고 슬랙으로 전송합니다."""
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_servers,
auto_offset_reset='earliest', # 'latest'로 변경하면 마지막 오프셋부터 시작
enable_auto_commit=True,
group_id='slack-consumer-group',
value_deserializer=lambda x: x.decode('utf-8')
)
print(f"Kafka 토픽 '{topic}'에서 메시지를 소비 중...")
try:
# Kafka에서 메시지 수신 및 Slack 전송
for message in consumer:
print(f"수신된 메시지: {message.value}")
send_slack_message(slack_webhook_url, message.value)
except KeyboardInterrupt:
print("\n종료 중입니다...")
finally:
consumer.close()
print("Kafka 소비자가 종료되었습니다.")
if __name__ == "__main__":
# 설정 값
kafka_bootstrap_servers = ['node3:9092'] # Kafka 서버 주소
topic = 'test-topic' # Kafka 토픽 이름
slack_webhook_url = 'https://hooks.slack.com/services/your/webhook/url' # Slack Webhook URL
# Kafka 메시지를 소비하고 Slack으로 전송
consume_kafka_messages(kafka_bootstrap_servers, topic, slack_webhook_url)
코드 실행
python python_kafka_consumer.py
$ python python_kafka_consumer.py
Kafka 토픽 'test-topic'에서 메시지를 소비 중...
수신된 메시지: hi
슬랙으로 메시지가 성공적으로 전송되었습니다!
^C
종료 중입니다...
Kafka 소비자가 종료되었습니다.
Ctrl+C로 코드를 종료합니다.
가상 환경 비활성화 (Linux/Mac)
deactivate
728x90
반응형
'리눅스' 카테고리의 다른 글
우분투에서 고정 IP 주소를 설정하는 방법 (3) | 2024.10.15 |
---|---|
리눅스에서 LISTEN 포트를 확인하는 방법 (0) | 2024.10.15 |
우분투에서 PHP 8.3으로 업그레이드하고 기존의 PHP 8.1을 삭제하는 방법 (0) | 2024.10.14 |
Shell 스크립트에서 CRLF와 LF 문제를 해결하는 방법 (0) | 2024.10.11 |
Python으로 Kafka Consumer 생성 (0) | 2024.10.11 |