Etc Programming
FastAPI
Celery vs FastAPI Background Task

FastAPI의 비동기 처리와 백그라운드 작업 완벽 가이드

개요

FastAPI에서 엔드포인트를 정의할 때 defasync def 중 어떤 것을 사용해야 하는지, 그리고 왜 그런 선택을 해야 하는지에 대한 완벽한 가이드입니다.

FastAPI 공식 문서의 핵심 원칙

FastAPI 공식 문서에 따르면:

async def 대신 일반 def로 엔드포인트를 선언하면, 서버를 블록하지 않기 위해 외부 스레드풀에서 실행되고 대기됩니다.

그리고:

외부 라이브러리(데이터베이스, API, 파일 시스템 등)와 통신하지만 await를 지원하지 않는 경우(현재 대부분의 데이터베이스 라이브러리가 이에 해당), 엔드포인트를 일반 def로 선언하세요.

애플리케이션이 다른 곳과 통신하고 응답을 기다릴 필요가 없다면 async def를 사용하세요.

def vs async def: 내부 동작 원리

1. def (동기) 엔드포인트

@app.get("/ping")
def ping(request: Request):
    print("Hello")
    time.sleep(5)  # 블로킹 작업
    print("bye")
    return "pong"

동작 방식:

  • FastAPI는 이 함수를 외부 스레드풀에서 실행
  • 메인 이벤트 루프를 블록하지 않음
  • 각 요청마다 새로운 스레드를 생성하거나 기존 스레드 재사용
  • 여러 요청을 동시에 처리 가능

2. async def (비동기) 엔드포인트

@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    await asyncio.sleep(5)  # 논블로킹 작업
    print("bye")
    return "pong"

동작 방식:

  • 이벤트 루프에서 직접 실행 (단일 스레드)
  • await 호출 시 다른 작업에 제어권 양보
  • 진정한 비동기 동시성 제공

⚠️ 잘못된 async def 사용 예시

@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    time.sleep(5)  # ❌ 블로킹 작업 - 전체 서버가 멈춤!
    print("bye")
    return "pong"

이 경우 time.sleep()이 이벤트 루프를 블록하여 전체 서버가 멈춥니다.

언제 무엇을 사용해야 할까?

def 사용 시기

  • 블로킹 I/O 작업 (파일 읽기, 데이터베이스 쿼리)
  • CPU 집약적 작업
  • await를 지원하지 않는 라이브러리 사용
  • 복잡한 JSON 인코딩 작업

async def 사용 시기

  • 비동기 라이브러리 사용 (httpx, aiofiles 등)
  • 간단한 데이터 반환
  • await 호출이 있는 I/O 작업
  • 비동기 데이터베이스 드라이버 사용

중요한 개념 설명

1. 이벤트 루프 (Event Loop)

  • 정의: 비동기 작업을 관리하는 단일 스레드 실행 환경
  • 역할: await 키워드를 만나면 다른 작업으로 제어권 전환
  • 특징: 한 번에 하나의 코루틴만 실행

2. 스레드풀 (ThreadPool)

  • 정의: 미리 생성된 스레드들의 집합
  • FastAPI 기본값: 40개 스레드
  • 용도: def 엔드포인트 실행

3. GIL (Global Interpreter Lock)

  • 정의: Python 인터프리터를 한 번에 하나의 스레드만 사용하도록 제한하는 뮤텍스
  • 영향: CPU 집약적 작업에서는 멀티스레딩 효과 제한
  • 해결책: 멀티프로세싱 사용

블로킹 작업 처리 방법

방법 1: def 엔드포인트로 변경

@app.post("/process")
def process_file(file: UploadFile = File(...)):
    contents = file.file.read()  # 동기 읽기
    result = cpu_intensive_task(contents)
    return result

방법 2: run_in_threadpool() 사용

from fastapi.concurrency import run_in_threadpool
 
@app.post("/process")
async def process_file(file: UploadFile = File(...)):
    contents = await file.read()
    result = await run_in_threadpool(cpu_intensive_task, contents)
    return result

방법 3: asyncio.to_thread() 사용 (Python 3.9+)

import asyncio
 
@app.post("/process")
async def process_file(file: UploadFile = File(...)):
    contents = await file.read()
    result = await asyncio.to_thread(cpu_intensive_task, contents)
    return result

방법 4: ProcessPoolExecutor 사용 (CPU 집약적 작업)

import asyncio
import concurrent.futures
 
@app.post("/process")
async def process_file(file: UploadFile = File(...)):
    contents = await file.read()
    loop = asyncio.get_running_loop()
    
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_intensive_task, contents)
    
    return result

성능 최적화 팁

1. 워커 프로세스 증가

uvicorn main:app --workers 4

2. 스레드풀 크기 조정

# 기본값: min(32, os.cpu_count() + 4)
# 필요에 따라 커스텀 ThreadPoolExecutor 생성

3. 적절한 엔드포인트 타입 선택

  • 간단한 JSON 반환: async def
  • 복잡한 데이터 처리: def
  • 비동기 I/O: async def + await

브라우저 테스트 시 주의사항

동일한 브라우저 탭에서 연속 요청 시 브라우저가 요청을 순차적으로 보낼 수 있습니다.

해결 방법:

  1. 시크릿 모드 새 탭 사용
  2. 다른 브라우저 사용
  3. httpx를 이용한 비동기 테스트
import httpx
import asyncio
 
async def test_concurrent_requests():
    async with httpx.AsyncClient() as client:
        tasks = [
            client.get("http://127.0.0.1:8000/ping"),
            client.get("http://127.0.0.1:8000/ping")
        ]
        responses = await asyncio.gather(*tasks)
        return responses

백그라운드 작업 대안

1. FastAPI BackgroundTasks

from fastapi import BackgroundTasks
 
@app.post("/send-email")
async def send_email(background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email_task, "user@example.com")
    return {"message": "Email sent in background"}

2. Celery (분산 작업 큐)

# 무거운 계산 작업이나 별도 프로세스가 필요한 경우
celery_app.send_task('heavy_computation', args=[data])

3. APScheduler

# 스케줄된 작업이나 주기적 작업
from apscheduler.schedulers.asyncio import AsyncIOScheduler

실제 사용 예시

파일 업로드 + CPU 집약 작업

@app.post("/process-document")
async def process_document(file: UploadFile = File(...)):
    # 1. 파일 읽기 (비동기)
    contents = await file.read()
    
    # 2. CPU 집약적 처리 (별도 프로세스)
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, analyze_document, contents
        )
    
    # 3. 결과 저장 (비동기 DB)
    await save_to_database(result)
    
    return {"status": "processed", "result": result}

외부 API 호출 + 데이터베이스 저장

@app.post("/fetch-and-save")
async def fetch_and_save(url: str):
    # 1. 외부 API 호출 (비동기)
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
    
    # 2. 데이터 처리 (필요시 스레드풀)
    if response.headers.get('content-type') == 'application/json':
        data = response.json()
    else:
        # 복잡한 파싱이 필요한 경우
        data = await run_in_threadpool(parse_complex_data, response.content)
    
    # 3. 데이터베이스 저장 (비동기)
    await save_to_database(data)
    
    return {"status": "saved"}

결론

FastAPI에서 올바른 엔드포인트 타입을 선택하는 것은 성능에 매우 중요합니다:

  • 간단한 작업: async def
  • 블로킹 I/O: def 또는 async def + run_in_threadpool
  • CPU 집약적: ProcessPoolExecutor
  • 혼합 작업: 상황에 맞는 적절한 조합

핵심은 이벤트 루프를 블록하지 않는 것입니다. 올바른 선택으로 FastAPI의 고성능 비동기 처리 능력을 최대한 활용할 수 있습니다.


출처