RabbitMQ

RabbitMQ는 Message Queue의 일종으로, Message Queue에 대한 내용은 celery + Mesage Queue 사용하기 포스트에서 간단히 다뤘으므로, 이번 포스트에서는 Message Queue 자체에 대한 특성은 다루지 않으려 한다.

 

파이썬으로 애플리케이션을 개발하다 보면 비동기 처리를 해야할 때가 있는데, 그럴 때 나는 메시지큐, 그 중에서도 RabbitMQ를 주로 사용하는 편이다.

 

RabbitMQ 는 AMQP 프로토콜을 베이스로 지원하며(플러그인을 설치하면 MQTT 등도 지원한다), python 에서는 AMQP로 메시지를 송수신 할 수 있게 해주는 pika 라이브러리를 사용할 수 있다.

 

pika를 사용하기에 앞서 RabbitMQ와 AMQP에 대해 간략하게 알아보고 넘어가자.

RabbitMQ는 조금 특이하게, Message 를 Publish 하게 되면 바로 Queue에 들어가지 않고 Exchange라는 곳을 통하게 된다. 그리고 Exchange Type  Binding 규칙에 따라 적절한 Queue 로 전달된다. 이후, Queue와 Connection을 맺고 Consume을 하는 형태로 작업이 진행된다.

 

이 과정을 수행할 때 알아야 할 용어들은 다음과 같다.

 

Exchange Type

1) Direct exchange
1:1 관계로, Exchange에 바인딩 된 Queue 중에서 메시지의 라우팅 키와 매핑되어 있는 Queue로 메시지를 전달한다
 
2) Fanout exchange
1:N 관계로, 메시지의 라우팅 키를 무시하고 Exchange에 바인딩 된 모든 Queue에 메시지를 전달한다
 
3) Topic exchange

마찬가지로 1:N 관계이지만, Exchange에 바인딩 된 Queue 중에서 메시지의 라우팅 키가 패턴에 맞는 Queue에게만 모두 메시지를 전달한다.


4) Headers exchange
라우팅 키 대신 메시지 헤더에 여러 속성들을 더해 속성들이 매칭되는 큐에 메시지를 전달한다.

 

Binding

생성된 Exchange 에는 전달 받은 메시지를 원하는 Queue 로 전달하기 위해 정의하는 규칙이다.

direct exchange의 경우 Queue 이름만으로 Binding 할 수도 있고, topic exchange의 경우 routing key 를 지정해서 메시지를 필터링 한 후 지정한 Queue 에만 보내도록 Binding 할 수 있다.

 

Connection

 

  • RabbitMQ에서 지원하는 모든 프로토콜은 TCP 기반이다.
  • 효율성을 위해 긴 연결을 가정한다. (프로토콜 작업당 새 연결이 열리지 않음.)
  • 하나의 클라이언트 연결은 단일 TCP 연결을 사용한다.
  • 연결이 더 이상 필요하지 않은 경우, 리소스 절약을 위해 연결을 닫아야 한다. 이를 수행하지 못하는 클라이언트는 리소스의 대상 노드를 고갈시킬 위험이 있다.

 

Channel

RabbitMQ는 Connection 외에도, Channel 이라는 특이한 개념을 가지고 있다. 

  • 단일 TCP 연결을 공유하는 논리적인 개념의 경량 연결로 다중화된다.
  • 클라이언트가 수행하는 모든 프로토콜 작업은 채널에서 발생한다.
  • 채널 안에 연결할 Queue를 선언할 수 있으며, 채널 하나당 하나의 Queue만 선언이 가능하다.
  • 특정 채널의 통신은 다른 채널의 통신과 완전히 분리되어 있기 때문에 프로토콜은 채널 ID와 같은 식별자를 포함시켜 전달한다.
  • 채널 ID를 통해 클라이언트나 브로커 모두 채널에 대한 파악이 가능하다.
  • 채널은 Connection Context에만 존재하기 때문에 Connection이 닫히면, 연결된 모든 채널도 닫힌다.
  • 클라이언트에서 처리를 위해 멀티 프로세스/스레드를 사용한다면, 프로세스/스레드 별로 새 채널을 열고 공유하지 않는 것이 일반적이다.

Connection 과 Channel 의 관계를 정리하면,

  • Connection은 물리적인 연결이다.
  • Connection은 단일 TCP 연결만 가능하다.
  • Channel은 Connection Context를 공유하며, 하나 이상의 경량 연결이 가능하다.
  • Channel 하나당 하나의 Queue만 연결이 가능하다.

 

Pika

pika의 공식문서를 보면 위에서 적어 놓은 AMQP의 개념들을 class와 method 로 감싸서 추상화 해놓은 것을 볼 수 있다.

 

Subscribe

 BlockingConnection 을 맺어 queue 와 연결을 맺고, message를 받아 처리하는 예제 코드이다. 

import pika

class AmqpClient:
    def __init__(self, broker_url, queue):
        self.queue = queue
        parsed = self.parse_url(broker_url_or_connection)
        self.address = parsed['address']
        self.username = parsed['username']
        self.password = parsed['password']
        self.port = parsed['port']
        
        self.connection = self.connect()
        self.channel = self.assign_channel()

	# amqp://guest:guest@localhost:5672 등의 url을 받아 파싱
    def parse_url(self, url):
        scheme, rest = url.split('://')
        assert scheme in ['amqp', 'amqps']

        auth, rest = rest.split('@')
        username, password = auth.split(':')

        if ':' in rest:
            address, port = rest.split(':')
        else:
            address = rest
            port = '5432'
        port = int(port)
        return {
            'scheme': scheme, 'username': username, 'password': password,
            'address': address, 'port': port
        }

	# BlockingConnection 연결, channel 할당은 추가적으로 필요
    def connect(self):
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(host=self.address, port=self.port, credentials=credentials)
        connection = pika.BlockingConnection(parameters)

        return connection

	# channel 할당 및 queue 선언
    def assign_channel(self, connection):
        channel = connection.channel()
        channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False)
        channel.confirm_delivery()

        return channel

	# connection이 묶였는지 확인, 끊겼다면 다시 연결
    def ensure_connection(self, exchange, routing_key):
        try:
            self.channel.queue_bind(self.queue, exchange, routing_key)
        except StreamLostError:
            connection = self.connect()
            self.channel = self.assign_channel(connection)

	# packet을 어떻게 처리할지에 대한 callback 함수
    def on_message(self):
        pass

    def subscribe(self, topic, exchange):
        self.channel.queue_bind(self.queue, exchange, routing_key=topic)
        self.channel.basic_consume(
            queue=self.queue, on_message_callback=self.on_message, auto_ack=True)

        self.start_consuming(self.channel.connection)

	# connection이 끊겼다면 다시 consume을 다시 시작하도록
    def start_consuming(self, connection):
        try:
            while True:
                connection.process_data_events(None)
        except pika.exceptions.ConnectionClosed:
            # Handle connection closed
            # Re-establish the connection and restart the consuming process
            self.start_consuming(connection)

pika를 사용하면서 가장 헷갈렸던 부분이 hearbeat timeout이다. 

pika는 기본적으로 60초의 heartbeat timeout을 가지고 있는데, 이 시간 동안 packet을 보내지 않으면 connection을 더 이상 유지할 필요가 없다고 판단하여 끊어버린다. 이를 해결하기 위해 heartbeat time을 늘리거나 없애는 것도 방법이지만 권장되지 않는다. packet을 보내는 주기가 길다면 connectino을 끊고 packet을 보낼 때 다시 연결하는 것이 자연스러운 듯 하다.

 

위 예제에서는 고려되지 않았지만, packet을 받아서 처리하는 callback method의 처리 속도가 heartbeat timeout을 넘어가는 경우에도, connection을 끊어버리는 문제가 발생할 수 있다. 이 때는 상황이 조금 달라, callback method를 처리하는 동안 sub thread를 할당하여 RabbitMQ 서버에 수동으로 ack를 해주거나, heartbeat timeout 시간을 늘리는 방법을 생각해봐야 한다.

 

Publish

Subscribe 부분에서 정의한 AmqpClient 클래스를 활용하여 topic exchange 타겟으로 메시지를 publish 하는 예제코드이다.

import pika

def main():
    pub_client = AmqpClient(pub_broker_url, queue)
    pub_client.ensure_connection('amq.topic', queue)
    pub_client.channel.basic_publish(
        exchange='amq.topic',
        routing_key=routing_key,
        body=payload,
        properties=props,
    )
    ...

 

References

https://pika.readthedocs.io/en/stable/index.html

 

Introduction to Pika — pika 1.2.1 documentation

© Copyright 2009-2017, Tony Garnock-Jones, Gavin M. Roy, Pivotal Software, Inc and contributors. Revision 741cfc4c.

pika.readthedocs.io

https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/#gsc.tab=0

 

조은우 개발 블로그

조은우 개발자 블로그

jonnung.dev

https://stackoverflow.com/questions/57650669/how-to-change-timeout-using-rabbitmq-pika-basic-consume-in-python

+ Recent posts