YoonWould!!

[메시지 큐 만들기]RabbitMQ를 이용한 메시지 큐 본문

<인턴생활>/[python 예제]

[메시지 큐 만들기]RabbitMQ를 이용한 메시지 큐

Hading 2019. 4. 29. 17:40
728x90

RabbitMQ를 사용해서 메시지 큐를 만들고, 큐에 메세지를 넣고, 그 메시지를 가져가는 작업을 해보겠습니다.

1.메시지 큐

메시지 큐는 서로 다른 프로그램 사이에 공유할 수 있는 무제한 크기의 버퍼입니다. 이 큐를 이용해서 데이터를 만들고, 큐에 쌓아두고, 큐에서 데이터를 빼내어 순서대로 처리하거나, 라운드 로빈 방식으로 분배해서 처리하거나, 규칙에 따라 여러 가지 작업을 할 수 있습니다.
즉, 메시지 큐는 사용자가 입력한 메시지를 보낼 때의 중간 자료구조임을 알 수 있습니다.
위키 백과 : https://ko.wikipedia.org/wiki/메시지_큐

2.RabbitMQ 소개

RabbitMQ는 사용하기 간단하고, 대부분 운영체제에서 실행되며, 메시지 큐의 표준 중 하나인 'AMQP(Advanced Message Queuing Protocol) '를 준수하는 오픈 소스 메시지 브로커입니다. RabbitMQ 홈페이지에서는 거의 대부분 운영체제와 프로그래밍 언어의 조합을 지원한다고 소개합니다. 게다가 메시지 큐 서버에서는 큐들을 관리할 수 있는 웹 GUI를 지원합니다.

퍼블리셔가 메시지를 생성하면, 메시지 큐 서버 안의 익스체인지로 전달되어 규칙에 따라 익스체인지를 큐에 넣습니다. 컨슈머는 큐에서 메시지를 꺼내 와서 처리합니다. 익스체인지와 큐의 관계, 큐와 컨슈머의 관계에 따라 여러 변형이 나타나게 됩니다.

대표적인 메시지 큐 유형 - RabbitMQ 홈페이지에서 6가지 제시

  1. 전달
  2. 작업 분배 및 확인
  3. 브로드캐스팅
  4. 선택적 분배
  5. 패턴에 따른 분배
  6. RPC

저는 단순 메시지 전달과 라운드 로빈 방식으로 메시지 큐에서 분배하는 작업을 해보겠습니다.
단순 메시지 전달에서는 퍼블리셔와 큐, 컨슈머가 하나뿐이지만 두 번째에서는 컨슈머의 개수가 둘 이상이 될 겁니다.

3. RabbitMQ 설치와 메시지 큐 서버 실행

OS : Centos 7
python : 2.7.5

# Epel 저장소 설치.
yum install http://mirror.premi.st/epel/7/x86_64/e/epel-release-7-1.noarch.rpm
#업그레이드
yum update
#설치
yum install -y rabbitmq-server
#실행
rabbitmq-server

2개의 터미널 창을 열어서 하나는 RabbitMQ 서버를 실행하고 하나는 파이썬 가상 환경을 실행해서 작업하면 수월합니다.

3-1. pika 패키지 설치

RabbitMQ를 파이썬으로 다룰 때는 pika라는 패키지를 사용합니다. RabbitMQ 홈페이지에서도 pika 패키지를 사용하였기 때문에 안정성은 걱정할 필요가 없습니다.

pip install pika

3-2. 메시지 전달 : 큐에 넣고 가져오기

퍼블리셔 => 큐 => 컨슈머 의 단계로 전달됩니다.

메시지를 보내는 과정

  1. 메시지 브로커 서버에 연결
  2. 연결 안에 채널 만들기
  3. 채널 안에 큐 선언하기
  4. 메시지 보내기(이때 여러 가지 파라미터 등을 설정할 수 있습니다.)
  5. 연결 끊기
#메시지 센더 구현
#sender.py
# -*- coding: utf-8 -*-
# pika를 임포트합니다.
import pika

# 서버와 연결을 맺습니다.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결 안에서 채널을 만듭니다.
channel = connection.channel()

# 채널 안에서 큐를 선언합니다. 새 큐를 만든다고 할 수 있습니다.
channel.queue_declare(queue='hello')

# 메시지를 보냅니다. 여기서는 교환기excahnge와 routing_key를 다루지 않을 겁니다.
# channel.basic_publish(exchange='', routing_key='hello', body='Hello World!!!')
# print("# 메시지를 보냈습니다!")
for i in range(10000):
    # 10,000개의 메시지를 큐에 쌓습니다.
    channel.basic_publish(exchange='', routing_key='hello', body=str(i))
    print("# 메시지를 보냈습니다!" + str(i))

# 서버와의 연결을 끊습니다.
connection.close()
#메시지 리시버 구현
#receiver.py
# -*- coding: utf-8 -*-


# pika를 임포트합니다.
import pika

# 서버와 연결을 맺습니다.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결 안에서 채널을 만듭니다.
channel = connection.channel()

# 채널 안에서 큐를 선언합니다. 새 큐를 만든다고 볼 수 있습니다.
# 이미 발송기 쪽에서 큐를 만들었지만, 확실히 하기 위해서 여기서 한 번 더 만들어줍니다.
channel.queue_declare(queue='hello')

# 큐에서 가져온 메시지를 처리할 콜백 함수를 만듭니다.
# 이 함수는 단순히 body를 가져와서 출력합니다.
def callback(ch, method, properties, body):
    print(" # 메시지를 받았습니다: %r" % body)

# 메시지를 보낼 때 어떻게 할 것인지 설정합니다.
# 함수, 큐, 응답 여부(no_ack)를 지정합니다.
#basic_consume는 버전에 따라 구조가 달라서 주의하셔야 합니다.
channel.basic_consume(queue='hello',on_message_callback=callback)

print('# 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요')

# 메시지 보내기를 시작합니다. 명시적으로 종료하기 전까지 계속 실행되면서
# 큐에 메시지가 들어올 때마다 callback이 메시지를 처리합니다.
channel.start_consuming()
# -*- coding: utf-8 -*-
# pika를 임포트합니다.
import pika

# 서버와 연결을 맺습니다.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

# 연결 안에서 채널을 만듭니다.
channel = connection.channel()

# 채널 안에서 큐를 선언합니다. 새 큐를 만든다고 볼 수 있습니다.
# 이미 발송기 쪽에서 큐를 만들었지만, 확실히 하기 위해서 여기서 한 번 더 만들어줍니다.
channel.queue_declare(queue='hello')

# 큐에서 가져온 메시지를 처리할 콜백 함수를 만듭니다.
# 이 함수는 단순히 body를 가져와서 출력합니다.
def callback(ch, method, properties, body):
    print(" # 메시지를 받았습니다: %r" % body)

# 메시지를 보낼 때 어떻게 할 것인지 설정합니다.
# 함수, 큐, 응답 여부(no_ack)를 지정합니다.
channel.basic_consume(callback, queue='hello', no_ack=True)

print('# 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요')

# 메시지 보내기를 시작합니다. 명시적으로 종료하기 전까지 계속 실행되면서
# 큐에 메시지가 들어올 때마다 callback이 메시지를 처리합니다.
channel.start_consuming()

728x90