본문 바로가기

퍼블릭 클라우드

Slack 메시지 포맷(AWS Lambda 함수)

반응형

Slack 메시지 포맷(AWS Lambda 함수)

AWS Lambda 함수를 사용하여 CloudWatch 경보를 Slack 채널로 전송하는 예시입니다.

코드 작동 설명

  1. CloudWatch 경보의 알림이 SNS 주제로 전송됩니다.
  2. Lambda 함수는 SNS 메시지를 수신하고 해당 메시지를 CloudWatch 경보 이벤트로 해석합니다.
  3. Slack 메시지를 구성하고 설정된 Slack 채널로 Webhook을 사용하여 메시지를 전송합니다.

Slack 메시지

Slack_메시지

  • SLACK_CHANNEL : Slack 메시지를 전송할 채널입니다. Lambda 함수의 환경 변수로 설정됩니다.
  • HOOK_URL: Slack Webhook URL입니다. Lambda 함수의 환경 변수로 설정됩니다.
  • lambda_handler : Lambda 함수의 핵심 핸들러 함수로 CloudWatch 경보를 받아 Slack 메시지를 구성하고 전송합니다.

lambda function

import json
import logging
import os
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def send_slack_message(channel, webhook_url, pretext, color, text):
    slack_message = {
        'channel': channel,
        'username': 'CloudWatch',
        'pretext': pretext,
        'color': color,
        'text': text
    }

    req = Request(webhook_url, json.dumps(slack_message).encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted to %s", channel)
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

def lambda_handler(event, context):
    logger.info("Event: %s", json.dumps(event))
    message = json.loads(event['Records'][0]['Sns']['Message'])
    logger.info("Message: %s", json.dumps(message))

    alarm_name = message['AlarmName']
    new_state = message['NewStateValue']
    reason = message['NewStateReason']

    color = '00e200' if new_state != 'ALARM' else '#ff0000'
    pretext = f"{new_state}: {alarm_name}"
    text = f"{alarm_name} state is now {new_state}: {reason}"

    send_slack_message(os.environ['slackChannel'], os.environ['hookUrl'], pretext, color, text)

이 코드에서 send_slack_message 함수는 Slack 메시지를 구성하고 전송하는 역할을 수행합니다. lambda_handler 함수에서는 CloudWatch 이벤트를 처리하고 send_slack_message 함수를 호출하여 Slack으로 메시지를 전송합니다.

 

더보기

---

import boto3
import json
import logging
import os

from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

# The Slack channel to send a message to stored in the slackChannel environment variable
SLACK_CHANNEL = os.environ['slackChannel']
HOOK_URL = os.environ['hookUrl']

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
    logger.info("Event: " + str(event))
    message = json.loads(event['Records'][0]['Sns']['Message'])
    logger.info("Message: " + str(message))

    alarm_name = message['AlarmName']
    #old_state = message['OldStateValue']
    new_state = message['NewStateValue']
    reason = message['NewStateReason']

    color = '00e200'
    username = 'CloudWatch'
    
    if new_state == 'ALARM':
        color = '#ff0000'

    slack_message = {
        'channel': SLACK_CHANNEL,
        'username': username,
        'pretext': "%s: %s" % (new_state, alarm_name),
        'color': color,
        'text': "%s state is now %s: %s" % (alarm_name, new_state, reason)
    }
    
    req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted to %s", slack_message['channel'])
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

---

 

728x90
반응형