250x250
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- RabbitMQ
- 일정
- 여행 #
- IT
- 메시지 큐
- 서버
- 예약
- 유럽여행
- 1달살기
- JAVA #언어 #프로그래밍 #코딩 #static #정적함수 #정적변수 #클래스
- 내심정
- 여행
- 유럽
- ip
- #DB#SQLD#자격증
- 샐러리
- 겨울
- 배낭여행
- 리눅스
- 인프라
- 실비용
- 파이썬
- 추억
- JAVA #언어 #프로그래밍 #IT #개발 #코딩
- JAVA #객체지향 #프로그래밍 #언어 #IT #기초
- 계획
- 이탈리아
- 영국
- 준비
- 경험
Archives
- Today
- Total
YoonWould!!
[메시지 큐 만들기]RabbitMQ를 이용한 메시지 큐 2탄 본문
728x90
지난 시간에 이어서 진행되는 점 참고바랍니다.
1. 작업 분배 : 큐에 넣고, 여러 개의 워커가 가져가고, 작업 종료 확인하기
퍼블리셔 => 큐 => 컨슈머 1,2
퍼블리셔가 생성한 메시지를 메시지 큐에서 컨슈머 1과2에 나워 전달합니다.
지난 시간에 한 예제와 규모를 조절하면 비슷하나 이번 예제는 컨슈머가 작업한 후 해당 작업이 정상적으로 완료되었는지 확인하는 것과 퍼블리셔, 메시지 큐 서버, 컨슈머 중 어느 하나에 문제가 생겼을 때 해당 데이터를 어떻게 보존시킬 지입니다.
#메시지 센더 구현
#new_sender.py
import pika
# 무작위 수를 생성하는 random 모듈을 임포트합니다.
import random
# 서버와 연결을 맺습니다.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 연결 안에서 채널을 만듭니다.
channel = connection.channel()
# 채널 안에서 큐를 선언합니다. 새 큐를 만든다고 볼 수 있습니다.
# \[코드 13-1\]과 차이점은 durable 옵션으로
# 서버가 죽었다 살아 났을 때도 상태를 유지시킵니다.
channel.queue\_declare(queue='task\_queue', durable=True)
# 큐에 쌓아둘 메시지 리스트를 만듭니다.
# 총 100개의 메시지를 만들며 각각의 메시지는 1~10 사이의 정수입니다.
# 메시지 숫자가 곧 작업에 걸리는 시간이라고 생각해봅시다.
# 여기서 메시지의 형태는 "N:M"입니다.
# N번째로 생성되었고, 0.M초가 걸리는 작업이란 이야기입니다.
msgs = \[str(i) + ":" + str(random.randrange(1, 11)) for i in range(100)\]
# 메시지를 한번에 여러 개 보낼 거니까 적당히 함수 하나로 깔끔하게 묶어줍니다.
def send\_msg(msg):
channel.basic\_publish(exchange='', routing\_key='task\_queue', body=str(msg),
# 아까의 예제와 다른 부분입니다.
properties=pika.BasicProperties(
# 이 프로퍼티를 적용함으로서 메시지를 디스크에 저장해 사라지지 않게 합니다.
# 즉, 서버가 다시 시작되어도 메시지는 살아남습니다.
delivery\_mode = 2, )
)
# 메시지들을 큐에다 쌓아줍시다!
for msg in msgs:
send\_msg(msg)
print(" # 메시지를 보냈습니다: %r" % msg)
# 메시지를 다 보낸 후 닫아줍니다.
connection.close()
#메시지 리시버 구현
#new_reciver.py
import pika
import time
import datetime
# 서버와 연결을 맺습니다.
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 연결 안에서 채널을 만듭니다.
channel = connection.channel()
# [코드 13-2]와 차이점은 durable 옵션으로
# 메시지 큐 서버가 죽었다 살아 났을 때도 상태를 유지시킵니다.
channel.queue_declare(queue='task_queue', durable=True)
print(' # 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.')
# 메시지를 처리할 콜백 함수를 지정합니다.
def callback(ch, method, properties, body):
# 받아오는 메시지는 바이트 문자열입니다.
# 따라서 UTF-8로 인코딩해서 msg에 저장합니다.
msg = str(body, "utf8").split(":")
# 몇 번째로 생성된 메시지인지 표시합니다.
print(" # [%s] %s 메시지를 받았습니다.\n %r" % (datetime.datetime.now(), msg[0], body))
# 받은 숫자대로 잠깐 멈춥니다.
# 여기서는 받은 숫자를 10으로 나눠서 최대 1초만 걸리게 했습니다.
time.sleep(int(str(msg[1]))/10)
print(" # [%s] 완료하였습니다."% datetime.datetime.now())
# 메시지 큐 서버에 완료했다는 응답을 보냅니다.
# 이 응답이 가야 새로운 큐가 새로운 메시지를 보내줍니다.
ch.basic_ack(delivery_tag = method.delivery_tag)
# 컨슈머는 메시지를 미리 가져오는데, 얼마나 가져오게 할지 결정합니다.
# 만약 이 설정이 없다면 컨슈머가 큐에 메시지를 요청할 때 무제한으로 가져옵니다.
# 또한 중간에 새로운 컨슈머를 실행하면 기존에 큐에 들어가 있던 메시지를 분배하지 않습니다.
channel.basic_qos(prefetch_count=1)
# 이 클라이언트가 수립한 채널이 어떤 큐에서 어떤 함수로 메시지를 보낼지 설정합니다.
channel.basic_consume(callback, queue='task_queue')
# 메시지 처리를 시작합니다.
channel.start_consuming()
728x90
'<인턴생활> > [python 예제]' 카테고리의 다른 글
[데이터분석] 팬더스 활용하기 (0) | 2019.04.30 |
---|---|
[메시지 큐 만들기]RabbitMQ를 이용한 메시지 큐 (0) | 2019.04.29 |
[크롤링] 스크래피 프로젝트 생성 (0) | 2019.04.24 |
[크롤링]크롤링 애플리케이션 만들기 (0) | 2019.04.24 |