FastAPI의 비동기 처리와 백그라운드 작업 완벽 가이드
개요
FastAPI에서 엔드포인트를 정의할 때 def
와 async def
중 어떤 것을 사용해야 하는지, 그리고 왜 그런 선택을 해야 하는지에 대한 완벽한 가이드입니다.
FastAPI 공식 문서의 핵심 원칙
FastAPI 공식 문서에 따르면:
async def
대신 일반def
로 엔드포인트를 선언하면, 서버를 블록하지 않기 위해 외부 스레드풀에서 실행되고 대기됩니다.
그리고:
외부 라이브러리(데이터베이스, API, 파일 시스템 등)와 통신하지만
await
를 지원하지 않는 경우(현재 대부분의 데이터베이스 라이브러리가 이에 해당), 엔드포인트를 일반def
로 선언하세요.
애플리케이션이 다른 곳과 통신하고 응답을 기다릴 필요가 없다면
async def
를 사용하세요.
def vs async def: 내부 동작 원리
1. def
(동기) 엔드포인트
@app.get("/ping")
def ping(request: Request):
print("Hello")
time.sleep(5) # 블로킹 작업
print("bye")
return "pong"
동작 방식:
- FastAPI는 이 함수를 외부 스레드풀에서 실행
- 메인 이벤트 루프를 블록하지 않음
- 각 요청마다 새로운 스레드를 생성하거나 기존 스레드 재사용
- 여러 요청을 동시에 처리 가능
2. async def
(비동기) 엔드포인트
@app.get("/ping")
async def ping(request: Request):
print("Hello")
await asyncio.sleep(5) # 논블로킹 작업
print("bye")
return "pong"
동작 방식:
- 이벤트 루프에서 직접 실행 (단일 스레드)
await
호출 시 다른 작업에 제어권 양보- 진정한 비동기 동시성 제공
⚠️ 잘못된 async def 사용 예시
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5) # ❌ 블로킹 작업 - 전체 서버가 멈춤!
print("bye")
return "pong"
이 경우 time.sleep()
이 이벤트 루프를 블록하여 전체 서버가 멈춥니다.
언제 무엇을 사용해야 할까?
def
사용 시기
- 블로킹 I/O 작업 (파일 읽기, 데이터베이스 쿼리)
- CPU 집약적 작업
await
를 지원하지 않는 라이브러리 사용- 복잡한 JSON 인코딩 작업
async def
사용 시기
- 비동기 라이브러리 사용 (
httpx
,aiofiles
등) - 간단한 데이터 반환
await
호출이 있는 I/O 작업- 비동기 데이터베이스 드라이버 사용
중요한 개념 설명
1. 이벤트 루프 (Event Loop)
- 정의: 비동기 작업을 관리하는 단일 스레드 실행 환경
- 역할:
await
키워드를 만나면 다른 작업으로 제어권 전환 - 특징: 한 번에 하나의 코루틴만 실행
2. 스레드풀 (ThreadPool)
- 정의: 미리 생성된 스레드들의 집합
- FastAPI 기본값: 40개 스레드
- 용도:
def
엔드포인트 실행
3. GIL (Global Interpreter Lock)
- 정의: Python 인터프리터를 한 번에 하나의 스레드만 사용하도록 제한하는 뮤텍스
- 영향: CPU 집약적 작업에서는 멀티스레딩 효과 제한
- 해결책: 멀티프로세싱 사용
블로킹 작업 처리 방법
방법 1: def
엔드포인트로 변경
@app.post("/process")
def process_file(file: UploadFile = File(...)):
contents = file.file.read() # 동기 읽기
result = cpu_intensive_task(contents)
return result
방법 2: run_in_threadpool()
사용
from fastapi.concurrency import run_in_threadpool
@app.post("/process")
async def process_file(file: UploadFile = File(...)):
contents = await file.read()
result = await run_in_threadpool(cpu_intensive_task, contents)
return result
방법 3: asyncio.to_thread()
사용 (Python 3.9+)
import asyncio
@app.post("/process")
async def process_file(file: UploadFile = File(...)):
contents = await file.read()
result = await asyncio.to_thread(cpu_intensive_task, contents)
return result
방법 4: ProcessPoolExecutor 사용 (CPU 집약적 작업)
import asyncio
import concurrent.futures
@app.post("/process")
async def process_file(file: UploadFile = File(...)):
contents = await file.read()
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive_task, contents)
return result
성능 최적화 팁
1. 워커 프로세스 증가
uvicorn main:app --workers 4
2. 스레드풀 크기 조정
# 기본값: min(32, os.cpu_count() + 4)
# 필요에 따라 커스텀 ThreadPoolExecutor 생성
3. 적절한 엔드포인트 타입 선택
- 간단한 JSON 반환:
async def
- 복잡한 데이터 처리:
def
- 비동기 I/O:
async def
+await
브라우저 테스트 시 주의사항
동일한 브라우저 탭에서 연속 요청 시 브라우저가 요청을 순차적으로 보낼 수 있습니다.
해결 방법:
- 시크릿 모드 새 탭 사용
- 다른 브라우저 사용
httpx
를 이용한 비동기 테스트
import httpx
import asyncio
async def test_concurrent_requests():
async with httpx.AsyncClient() as client:
tasks = [
client.get("http://127.0.0.1:8000/ping"),
client.get("http://127.0.0.1:8000/ping")
]
responses = await asyncio.gather(*tasks)
return responses
백그라운드 작업 대안
1. FastAPI BackgroundTasks
from fastapi import BackgroundTasks
@app.post("/send-email")
async def send_email(background_tasks: BackgroundTasks):
background_tasks.add_task(send_email_task, "user@example.com")
return {"message": "Email sent in background"}
2. Celery (분산 작업 큐)
# 무거운 계산 작업이나 별도 프로세스가 필요한 경우
celery_app.send_task('heavy_computation', args=[data])
3. APScheduler
# 스케줄된 작업이나 주기적 작업
from apscheduler.schedulers.asyncio import AsyncIOScheduler
실제 사용 예시
파일 업로드 + CPU 집약 작업
@app.post("/process-document")
async def process_document(file: UploadFile = File(...)):
# 1. 파일 읽기 (비동기)
contents = await file.read()
# 2. CPU 집약적 처리 (별도 프로세스)
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, analyze_document, contents
)
# 3. 결과 저장 (비동기 DB)
await save_to_database(result)
return {"status": "processed", "result": result}
외부 API 호출 + 데이터베이스 저장
@app.post("/fetch-and-save")
async def fetch_and_save(url: str):
# 1. 외부 API 호출 (비동기)
async with httpx.AsyncClient() as client:
response = await client.get(url)
# 2. 데이터 처리 (필요시 스레드풀)
if response.headers.get('content-type') == 'application/json':
data = response.json()
else:
# 복잡한 파싱이 필요한 경우
data = await run_in_threadpool(parse_complex_data, response.content)
# 3. 데이터베이스 저장 (비동기)
await save_to_database(data)
return {"status": "saved"}
결론
FastAPI에서 올바른 엔드포인트 타입을 선택하는 것은 성능에 매우 중요합니다:
- 간단한 작업:
async def
- 블로킹 I/O:
def
또는async def
+run_in_threadpool
- CPU 집약적:
ProcessPoolExecutor
- 혼합 작업: 상황에 맞는 적절한 조합
핵심은 이벤트 루프를 블록하지 않는 것입니다. 올바른 선택으로 FastAPI의 고성능 비동기 처리 능력을 최대한 활용할 수 있습니다.