본문 바로가기
IT & Tech 정보

🚀 비동기 작업 큐 처리: Python + Celery + RabbitMQ

by 지식과 지혜의 나무 2025. 5. 25.
반응형


안녕하세요, 즐로그 운영자입니다!
오늘은 백그라운드 작업을 안정적으로 처리하는 핵심 기술인
Celery + RabbitMQ를 활용해 비동기 작업 큐를 구현하는 법을 단계별로 보여드립니다.
코드 스니펫을 그대로 복사해 붙여넣으면 바로 실습 OK!



📋 목차
1. 왜 비동기 작업 큐인가?
2. 프로젝트 세팅 & RabbitMQ 준비
3. Celery 앱·태스크 정의
4. 워커(Worker) 실행 & 태스크 호출
5. 모니터링(Flower)으로 실시간 확인
6. 실전 팁 & 주의사항
7. 마무리



1️⃣ 왜 비동기 작업 큐인가?
• 응답 속도 개선: 웹 요청 처리 시 무거운 작업은 백그라운드로 이관
• 재시도·스케줄링: 실패 태스크 자동 재시도, 정기 작업 스케줄링 지원
• 수평 확장: 워커 수만큼 동시 처리량 확대



2️⃣ 프로젝트 세팅 & RabbitMQ 준비

# 1) 가상환경 & 패키지 설치
python3 -m venv venv && source venv/bin/activate
pip install celery==5.5.0 pika flask

# 2) RabbitMQ 도커로 띄우기
cat <<EOF > docker-compose.yml
version: '3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"      # AMQP
      - "15672:15672"    # 관리 UI
EOF
docker-compose up -d

• RabbitMQ 관리 UI: http://localhost:15672 (guest/guest)



3️⃣ Celery 앱·태스크 정의

# tasks.py
from celery import Celery
import time

# 1) Celery 인스턴스 생성
app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost:5672//',   # RabbitMQ 브로커 URL
    backend='rpc://'                                # 결과 저장 방식
)

# 2) 간단한 태스크 정의
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries':3})
def long_task(self, n):
    """n초 동안 대기 후 제곱값 반환"""
    try:
        time.sleep(n)
        return n * n
    except Exception as exc:
        raise self.retry(exc=exc)

• bind=True: 태스크 내부에서 self.retry() 호출 가능
• retry_backoff: 지수 백오프 자동 적용



4️⃣ 워커(Worker) 실행 & 태스크 호출

# 워커 실행 (터미널1)
celery -A tasks worker --loglevel=info --concurrency=4

# 간단한 Flask 앱으로 호출 (app.py)
from flask import Flask, request, jsonify
from tasks import long_task

app = Flask(__name__)

@app.route('/compute', methods=['POST'])
def compute():
    data = request.json or {}
    n = data.get('n', 5)
    # 비동기 태스크 호출
    task = long_task.apply_async(args=(n,))
    return jsonify({'task_id': task.id}), 202

@app.route('/status/<task_id>')
def status(task_id):
    res = long_task.AsyncResult(task_id)
    return jsonify({
        'status': res.status,
        'result': res.result if res.ready() else None
    })

if __name__ == '__main__':
    app.run(port=5000)

• 테스트

curl -X POST -H "Content-Type: application/json" -d '{"n":3}' http://localhost:5000/compute
curl http://localhost:5000/status/<태스크ID>





5️⃣ 모니터링(Flower)으로 실시간 확인

# Flower 설치·실행
pip install flower
celery -A tasks flower --port=5555

• 접속 주소: http://localhost:5555
• 태스크 진행 상태, 큐 길이, 워커 상태 등 실시간 대시보드 제공



6️⃣ 실전 팁 & 주의사항
• 배포 환경 분리
• 개발·스테이징·프로덕션용 브로커/백엔드 URL 별도 관리
• 타임아웃 설정
• soft_time_limit·time_limit 옵션으로 과도 작업 차단
• 로드 밸런싱
• 워커마다 큐별 binding 키(Queue routing) 지정 → 작업 유형별 분리 처리
• 오토스케일링
• Kubernetes HPA(Horizontal Pod Autoscaler)로 워커 수 자동 조절
• 보안
• RabbitMQ 사용자·권한 최소화, TLS 암호화 연결 권장



7️⃣ 마무리

오늘은 Celery + RabbitMQ 기반으로
• 백그라운드 태스크 정의
• 워커 실행
• 모니터링
• 재시도·백오프까지

전 과정을 실습해봤습니다.
비동기 작업 큐는 웹 애플리케이션뿐 아니라 데이터 파이프라인·알림 시스템 등
다양한 분야에 적용할 수 있으니,
여러분 프로젝트에 맞춰 튜닝해 사용해보세요!

다음 포스트에서 **GraphQL 서버 설계·구현 (Apollo Server + TypeScript)**로 이어갑니다~
행복한 개발 되세요! 👋

반응형