Django 에는 비동기로 작업을 할 수 있도록 Celery 라는 패키지가 존재한다.

 

Celery 의 공식 문서에는 다음과 같이 설명이 되어 있다.

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

 

 실시간 작업을 지원하는 task queue 이며 스케줄링 또한 지원한다고 한다.

 

이번 포스트는 Celery + MQ 에 대해서만 다루고, 스케줄링 기능은 추후에 다뤄보려 한다.

개인적으로, 알고 있던 스케줄링 옵션이 cron 밖에 없었기 때문에 celery 를 활용한 스케줄링은 cron 과 비교했을 때 어떤 이점이 있을지 매우 궁금하다.

 

내가 Celery 를 사용하면서 의문이 들었던 것은 2가지이다.

 

  1. Task Queue 와 Message Queue는 뭐가 다른가?
  2. Celery 대신에 thread pool 을 사용할 수도 있지 않을까?

하나씩 살펴보도록 하자.

 

1번의 경우는 stack overflow 에서 답을 찾을 수 있었다. 

 

Message Queue 는 말 그대로 Queue 라는 자료구조로, FIFO에 의거하여 데이터를 받고 데이터를 보내주는 buffer 의 역할을 한다고 보면 될 것 같다.

반면 Task Queue(Celery)는 우리가 하고자 하는 비동기 작업 전체를 인터페이스화 하여 사용하기 쉽게 해준 것이다.

정리하면, producer / consumer 로 이루어진 비동기 작업을 처리하고자 할 때 producing 과정과 consuming 과정을 직접 구현하지 않도록 해주는 것이 Celery 이고, 그 내부에서 producer -> consumer 로의 연결고리가 되는 broker(buffer)의 옵션으로 AMQP(Message Queue) 가 있는 것이다.

 

뇌피셜: 이러한 관점에서 보면, FIFO 가 필요하지 않다면 Celery 의 broker 로 AMQP 를 지원하지 않는 데이터 저장소를 사용할 수도 있지 않을까. DB나 S3도 가능할지도?

 

2번은 CPU bound task 와 I/O bound task로 나누어 생각해봐야 할 것 같다.

 

우선 email 전송과 같은 I/O bound task 를 생각해보자.

task를 생성하는 App은 여러개가 될 수 있다. 하나의 App이 scale out 으로 여러개가 되었을 수도 있고, MSA의 구조로 인해 task를 생성하는 App이 여러개일 수도 있다.

이 때 Celery를 사용한다면 task 들이 하나의 Queue에 들어가고, 순차적으로 Celery 에 의해 digest가 될 것이다.

반대로, 별도의 thread를 할당한다면 순서의 상관 없이 task들이 수행될 것이다.

 

무엇보다도 가장 큰 차이점은 Celery 는 Process 이고 thread pool 은 thread 라는 점이다.

이러한 이유로, GIL로 인해 multi threading 이 불가능한 CPU bound task 들도 Celery로 multi processing 이 가능하다.

또한, 마찬가지의 이유로 Celery는 scale out 이 가능하다.

 

그럼 이제 Celery를 직접 사용해보자.

 

일단 필요한 패키지 install 이 필요하다.

pip install celery
pip install pytest-celery # celery test 를 위한 패키지

 

프로젝트 구조는 다음과 같다.

/config
    /settings
        /_init__.py
        /dev.py
        /test.py
/utils
    /__init__.py
    /signals.py
    /tasks.py
    /conftests.py
    /tests.py
/user
    /_init__.py
    /tasks.py
    /example.py

 

Celery 사용을 위한 세팅 파일을 생성한다.

 

Celery App 객체를 초기화 할 때, 어떤 Broker를 사용할 지, 어떤 Result Backend 를 사용할지에 대한 옵션을 줄 수 있다.

Broker: 메시지를 주고 받기 위한 중개자

Result Backend: 결과값을 받아서 저장하는 저장소 

아래의 예제에서는 Broker 는 RabbitMQ, Result Backend는 단순히 메시지를 리턴하는 rpc(기본 옵션)를 사용했다.

더욱 많은 옵션은 공식 문서를 참고하도록 하자.

# utils/celery.py

import os

from celery import Celery

# Celery 의 기본 세팅으로 사용할 settings 파일 지정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.dev')

# Celery App 이름 명시
app = Celery('utils')


# namespace='CELERY': Celery 관련 설정 key 값 들이 CELERY_로 시작해야함을 명시
app.config_from_object('django.conf:settings', namespace='CELERY')

# 모든 app 으로부터 자동으로 task 들을 지정
app.autodiscover_tasks()

 

아래의 세팅은 docker container 를 사용한다고 가정했을 때의 환경이다.

원한다면 셀러리를 호스트 OS에 설치하고 사용할 수도 있다.

또한, 로컬 환경에서 테스트를 해본다면 docker container 이름을 준 자리에 localhost를 사용해도 된다.

여기서 docker container 이름을 준 이유는, Django App 과 Message Queue 를 각각의 container 로 띄울 것이고 둘의 host가 달라지기 때문이다.

# config/settings/dev.py

# amqp://myuser:mypassword@localhost:5672/myvhost
# amqp: RabbitMQ 사용
# mq: broker 로 사용할 RabbitMQ 의 docker container 이름 -> 뒤에 나올 docker-compose.yaml 참고
CELERY_BROKER_URL = 'amqp://guest@mq:5672//'

 

Celery Worker 가 수행할 task를 작성한다.

task 가 작성될 파이썬 파일은 tasks.py 로 작성하는 것이 원칙이다.

tasks.py 안에 작성한 함수를 celery app 에 등록하는 방법으로는 2가지가 있다.

  1. @app.task
  2. @shared_task

둘의 큰 차이는 1번의 경우 어떤 app을 사용하는 것인지 task 쪽에서 등록 하게 되고, 2번의 경우는 app 쪽에서 등록하게 된다는 것이다.

따라서, 확장성과 유연성을 위해서 @shared_task 를 사용하는 것을 권장하고 싶다. 참고

from celery import shared_task
from django.core.mail import send_mail


@shared_task
def send_email(subject, plain_message, html_message, from_email, emails):
    send_mail(
        subject=subject,
        message=plain_message,
        html_message=html_message,
        from_email=from_email,
        recipient_list=emails,
    )

 

아래와 같은 세팅으로 app이 항상 import가 될 것이고, shared_task 는 해당 app을 바라보게 된다.

task를 직접 등록하고 싶다면, 생략해도 된다.

# utils.__init__.py

from .celery import app as celery_app

__all__ = ('celery_app',)

 

이제 작성한 task 를 Message Queue에 밀어 넣을 차례다.

굉장히 간단하다. delay 함수를 사용하면 된다. delay 외에 apply_async() 함수를 줄 수도 있지만, delay 메소드의 구현을 까보면 내부적으로 apply_async() 를 호출하게 된다. 용도에  따라 사용하면 될 것 같다.

또한, 필요에 의해 block 으로 호출되는 apply() 함수를 사용할 수도 있다.

# user/example.py

from user import tasks as t
...

def push_to_mq():
    ...
    t.send_email.delay(subject, plain_message, html_message, from_email, emails)

 

이제 Message Queue 서버를 띄워보자.

여기서는 docker container를 사용했다.

$ docker run -d --name mq -p 5672:5672 rabbitmq

 

push_to_mq가 호출될 때마다 Message Queue 에 task가 쌓이게 될 것이니, 이제 다음의 명령으로 task를 digest 하는 Celery Worker 프로세스를 시작하면 된다.

$ celery -A proj worker -l INFO

 

Celery에 대한 test 환경을 구축해보자

Celery 는 test 환경을 위해 pytest-celery 패키지를 제공한다. 패키지는 위에서 이미 설치해 두었으니, 사용법을 알아보자.

 

Celery 를 사용하는 옵션 중에, task_always_eager 라는 옵션이 있다. 해당 옵션은 broker 를 거치지 않고 task를 바로 전달하는, 말하자면 delay() 메소드를 사용하지만 호출하고자 하는 함수를 직접 호출하는 것과 동일한 결과를 얻을 수 있다.

 

pytest-celery 패키지는 해당 옵션을 테스트 케이스 개별적으로 줄 수 있도록 지원한다.

utils/conftest.py

@pytest.fixture(scope='session')
def celery_config():
    return {
        'task_always_eager': True
    }

 

다음과 같이 이메일을 전송하는 함수에 대한 test를 작성했다. (자세한 구현은 생략한다.)

 

주의할 점은, test 함수 인자로 celery_app, celery_worker 을 주어야 한다는 것이다.

이 부분에서 많이 헤맨 것이, conftest.py 에 celery_config 함수만 재정의를 했기 때문에 처음에 함수 인자로 celery_config 함수만 넘겨주었다. 그런데 실제로 테스트에 사용하는 것은 app과 worker이기 때문에 celery_app과 celery_worker를 넘겨줘야 하는 것이다.

# utils/tests.py

class TestEmail:
    @pytest.mark.django_db(transaction=True)
    def test_sending_email_should_increase_mail_outbox(self, celery_app, celery_worker):
        send_email.delay(...)
        assert 1 == len(mail.outbox)

 

 

이제 이 Celery 와 MQ 를 운영환경에 올려보자.

 

App 서버, MQ 서버, Worker 서버 이렇게 3개가 필요할 것이다.

Celery Worker에게 주어진 task는 App 서버에 구현이 되어 있기 때문에, App 서버를 2개 띄우고 하나를 Worker 서버로 사용하면 되겠다.

 

다음과 같이 docker-compose.yaml 파일을 작성했다. 도커 관련 자세한 내용은  생략한다.

version: "3.9"
services:
  app:
    build:
      context: .
      dockerfile: dockerfile
    restart: always
  mq:
    image: rabbitmq:management
    ports:
      - "5672:5672"
      - "15672:15672"
    restart: always
    environment:
      RABBITMQ_USER: guest
      RABBITMQ_PASSWORD: guest

  worker:
    build:
      context: .
      dockerfile: dockerfile
    restart: always
    command: celery -A maf_core worker --loglevel=info
    depends_on:
      - mq
      - app

 

References

https://docs.celeryq.dev/en/stable/userguide/testing.html

 

Testing with Celery — Celery 5.2.7 documentation

This document describes the current stable version of Celery (5.2). For development docs, go here. Testing with Celery Tasks and unit tests To test task behavior in unit tests the preferred method is mocking. Eager mode The eager mode enabled by the task_a

docs.celeryq.dev

https://devlog.jwgo.kr/2019/07/02/using-celery-with-django-1/

 

장고(Django)에서 셀러리(Celery) 사용하기 1편 · Tonic

사이트 운영에 도움을 주실 수 있습니다. 고맙습니다. --> 장고(Django)에서 셀러리(Celery) 사용하기 1편 2019년 07월 02일 이 문서는 [Celery 공식 문서](http://docs.celeryproject.org/en/latest/django/)를 번역한 것

devlog.jwgo.kr

https://kangprog.tistory.com/124

+ Recent posts