YoonWould!!

[메시지 큐 만들기]RabbitMQ를 이용한 메시지 큐 2탄 본문

<인턴생활>/[python 예제]

[메시지 큐 만들기]RabbitMQ를 이용한 메시지 큐 2탄

Hading 2019. 4. 29. 18:07
728x90

지난 시간에 이어서 진행되는 점 참고바랍니다.

1. 작업 분배 : 큐에 넣고, 여러 개의 워커가 가져가고, 작업 종료 확인하기

퍼블리셔 => 큐 => 컨슈머 1,2

퍼블리셔가 생성한 메시지를 메시지 큐에서 컨슈머 1과2에 나워 전달합니다.

지난 시간에 한 예제와 규모를 조절하면 비슷하나 이번 예제는 컨슈머가 작업한 후 해당 작업이 정상적으로 완료되었는지 확인하는 것과 퍼블리셔, 메시지 큐 서버, 컨슈머 중 어느 하나에 문제가 생겼을 때 해당 데이터를 어떻게 보존시킬 지입니다.

#메시지 센더 구현
#new_sender.py

import pika  

# 무작위 수를 생성하는 random 모듈을 임포트합니다.  
import random  

# 서버와 연결을 맺습니다.  
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  

# 연결 안에서 채널을 만듭니다.  
channel = connection.channel()  

# 채널 안에서 큐를 선언합니다. 새 큐를 만든다고 볼 수 있습니다.  
# \[코드 13-1\]과 차이점은 durable 옵션으로  
# 서버가 죽었다 살아 났을 때도 상태를 유지시킵니다.  
channel.queue\_declare(queue='task\_queue', durable=True)  

# 큐에 쌓아둘 메시지 리스트를 만듭니다.  
# 총 100개의 메시지를 만들며 각각의 메시지는 1~10 사이의 정수입니다.  
# 메시지 숫자가 곧 작업에 걸리는 시간이라고 생각해봅시다.  
# 여기서 메시지의 형태는 "N:M"입니다.  
# N번째로 생성되었고, 0.M초가 걸리는 작업이란 이야기입니다.  
msgs = \[str(i) + ":" + str(random.randrange(1, 11)) for i in range(100)\]  

# 메시지를 한번에 여러 개 보낼 거니까 적당히 함수 하나로 깔끔하게 묶어줍니다.  
def send\_msg(msg):  
    channel.basic\_publish(exchange='', routing\_key='task\_queue', body=str(msg),  
        # 아까의 예제와 다른 부분입니다.  
        properties=pika.BasicProperties(  
            # 이 프로퍼티를 적용함으로서 메시지를 디스크에 저장해 사라지지 않게 합니다.  
            # 즉, 서버가 다시 시작되어도 메시지는 살아남습니다.  
            delivery\_mode = 2, )  
    )  

# 메시지들을 큐에다 쌓아줍시다!  
for msg in msgs:  
    send\_msg(msg)  
    print(" # 메시지를 보냈습니다: %r" % msg)

# 메시지를 다 보낸 후 닫아줍니다.  
connection.close()
#메시지 리시버 구현
#new_reciver.py
import pika
import time
import datetime

# 서버와 연결을 맺습니다.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결 안에서 채널을 만듭니다.
channel = connection.channel()

# [코드 13-2]와 차이점은 durable 옵션으로
# 메시지 큐 서버가 죽었다 살아 났을 때도 상태를 유지시킵니다.
channel.queue_declare(queue='task_queue', durable=True)
print(' # 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.')

# 메시지를 처리할 콜백 함수를 지정합니다.
def callback(ch, method, properties, body):
    # 받아오는 메시지는 바이트 문자열입니다.
    # 따라서 UTF-8로 인코딩해서 msg에 저장합니다.
    msg = str(body, "utf8").split(":")

    # 몇 번째로 생성된 메시지인지 표시합니다.
    print(" # [%s] %s 메시지를 받았습니다.\n %r" % (datetime.datetime.now(), msg[0], body))

    # 받은 숫자대로 잠깐 멈춥니다.
    # 여기서는 받은 숫자를 10으로 나눠서 최대 1초만 걸리게 했습니다.
    time.sleep(int(str(msg[1]))/10)
    print(" # [%s] 완료하였습니다."% datetime.datetime.now())

    # 메시지 큐 서버에 완료했다는 응답을 보냅니다.
    # 이 응답이 가야 새로운 큐가 새로운 메시지를 보내줍니다.
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 컨슈머는 메시지를 미리 가져오는데, 얼마나 가져오게 할지 결정합니다.
# 만약 이 설정이 없다면 컨슈머가 큐에 메시지를 요청할 때 무제한으로 가져옵니다.
# 또한 중간에 새로운 컨슈머를 실행하면 기존에 큐에 들어가 있던 메시지를 분배하지 않습니다.
channel.basic_qos(prefetch_count=1)

# 이 클라이언트가 수립한 채널이 어떤 큐에서 어떤 함수로 메시지를 보낼지 설정합니다.
channel.basic_consume(callback, queue='task_queue')

# 메시지 처리를 시작합니다.
channel.start_consuming()
728x90