一、爲什麼需要異步編程?¶
在開始FastAPI的異步之旅前,我們先理解一個概念:同步編程和異步編程的區別。
想象你在一家咖啡店點單:
- 同步:你站在吧檯前,一直等到咖啡做好才離開。期間吧檯無法接待其他客人,效率很低。
- 異步:你點單後拿到一個號碼,然後去旁邊坐着等待叫號。期間吧檯可以接待其他客人,你只需要在叫號時回來取咖啡。
異步編程的核心就是讓程序在等待I/O操作(如數據庫查詢、API請求、文件讀寫)時,不阻塞當前任務,而是去處理其他事情。這對於處理大量併發請求(比如用戶訪問、數據同步)的API服務來說非常重要。
FastAPI的異步支持讓它能在高併發場景下保持高效,尤其適合處理I/O密集型任務(如與數據庫交互、調用第三方API)。
二、FastAPI中的異步基礎¶
2.1 異步函數的定義¶
在FastAPI中,定義異步視圖函數需要使用async def關鍵字,並且調用異步函數時必須用await。
from fastapi import FastAPI
app = FastAPI()
# 異步視圖函數
@app.get("/")
async def read_root():
return {"message": "Hello, Async FastAPI!"} # 無需await,因爲沒有異步操作
注意:當函數中沒有異步I/O操作時,async def和普通函數def的效果相同,但異步函數允許內部調用await。
2.2 使用await調用異步函數¶
如果你的函數需要調用另一個異步函數(比如異步數據庫查詢),必須用await關鍵字等待其完成,否則會報錯。
import asyncio
async def async_database_query():
await asyncio.sleep(1) # 模擬異步數據庫查詢耗時1秒
return {"user": "Alice", "age": 30}
@app.get("/user")
async def get_user():
# 調用異步函數必須用await
result = await async_database_query()
return result
三、異步視圖與同步視圖的區別¶
| 類型 | 定義方式 | 適用場景 | 特點 |
|---|---|---|---|
| 同步視圖 | def |
簡單邏輯、CPU密集型任務 | 直接執行,無事件循環阻塞 |
| 異步視圖 | async def |
I/O密集型任務(數據庫、API) | 非阻塞,可同時處理多個請求 |
示例對比:
# 同步視圖(用requests請求外部API)
import requests
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync-data")
def sync_data():
response = requests.get("https://api.example.com/data") # 同步請求會阻塞
return response.json()
# 異步視圖(用aiohttp請求外部API)
import aiohttp
from fastapi import FastAPI
app = FastAPI()
@app.get("/async-data")
async def async_data():
async with aiohttp.ClientSession() as session: # 異步會話
async with session.get("https://api.example.com/data") as response:
return await response.json() # 用await等待響應
四、異步數據庫操作¶
在FastAPI中,異步數據庫操作需要配合異步數據庫驅動和ORM工具。以下是最常用的兩種方案:
4.1 使用SQLAlchemy(1.4+支持異步)¶
SQLAlchemy 1.4版本後引入了異步支持,需要搭配asyncpg(PostgreSQL)或aiosqlite(SQLite)等驅動。
步驟1:安裝依賴
pip install sqlalchemy asyncpg # PostgreSQL示例
步驟2:配置異步數據庫會話
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 依賴項:獲取異步數據庫會話
async def get_db():
async with AsyncSessionLocal() as session:
yield session
await session.commit()
步驟3:異步查詢示例
from sqlalchemy import select
from sqlalchemy.orm import Session
from fastapi import Depends, FastAPI
app = FastAPI()
class User(Base): # Base爲SQLAlchemy的聲明基類
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# 異步查詢用戶
result = await db.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user:
return {"id": user.id, "name": user.name}
return {"error": "User not found"}
4.2 使用Tortoise-ORM(更簡單的異步ORM)¶
如果你覺得SQLAlchemy配置複雜,可以用Tortoise-ORM(專爲異步設計),代碼更簡潔。
安裝依賴:
pip install tortoise-orm
定義模型與查詢:
from tortoise import BaseDBAsyncClient, fields, Tortoise
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI
app = FastAPI()
# 定義User模型
class User(Tortoise.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
# 初始化Tortoise ORM
register_tortoise(
app,
db_url="sqlite://db.sqlite3", # 數據庫路徑
modules={"models": ["__main__"]}, # 包含模型的模塊
generate_schemas=True, # 自動創建表結構
)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await User.get(id=user_id) # 異步查詢
return {"id": user.id, "name": user.name}
五、異步任務處理¶
FastAPI的異步能力不僅限於API請求,還能處理後臺任務(如發送郵件、生成報表)。以下是簡單示例:
5.1 使用asyncio處理簡單異步任務¶
import asyncio
from fastapi import FastAPI
app = FastAPI()
# 模擬耗時任務(如發送郵件)
async def send_email_task(to: str, content: str):
await asyncio.sleep(2) # 模擬發送耗時
return f"Email sent to {to}"
@app.post("/send-email")
async def send_email(to: str, content: str):
# 創建後臺任務(不阻塞API響應)
task = asyncio.create_task(send_email_task(to, content))
return {"status": "Email queued", "task_id": id(task)} # task_id僅作演示
5.2 結合異步任務隊列(進階)¶
對於長時間運行的任務(如批量處理數據),推薦使用異步任務隊列(如Celery+Redis)。FastAPI負責接收請求,異步任務隊列處理耗時操作:
# 簡單示例:FastAPI + Celery異步任務
from fastapi import FastAPI
from celery import Celery
import time
app = FastAPI()
# 初始化Celery(需配置Redis/RabbitMQ)
celery = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
# 定義異步任務
@celery.task
def long_running_task():
time.sleep(5) # 模擬CPU密集型任務
return "Task completed"
@app.post("/run-task")
async def run_task():
task = long_running_task.delay() # 發送任務到隊列
return {"task_id": task.id, "status": "running"}
六、實踐注意事項¶
6.1 避免阻塞事件循環¶
- 不要使用
time.sleep():會完全阻塞事件循環,導致服務器無法響應其他請求。
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/bad-sleep")
async def bad_sleep():
time.sleep(5) # ❌ 阻塞事件循環!
return {"error": "This is bad"}
@app.get("/good-sleep")
async def good_sleep():
await asyncio.sleep(5) # ✅ 非阻塞,事件循環可處理其他請求
return {"success": "This is good"}
- 遠離CPU密集型操作:異步適合I/O,不適合CPU密集(如計算百萬級數據),此時應使用多進程。
6.2 合理使用asyncio任務¶
- 用
asyncio.gather()並行執行多個異步任務:
async def task1():
await asyncio.sleep(1)
return "Task 1 done"
async def task2():
await asyncio.sleep(2)
return "Task 2 done"
@app.get("/parallel-tasks")
async def parallel_tasks():
results = await asyncio.gather(task1(), task2()) # 並行執行,總耗時≈2秒
return {"results": results}
七、簡單異步應用示例¶
以下是一個完整的FastAPI異步應用,包含:
- 異步數據庫查詢(用Tortoise-ORM)
- 異步任務處理
- 簡單的JSON響應
from tortoise import fields, Tortoise
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI
app = FastAPI()
# 1. 定義Tortoise模型
class User(Tortoise.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
age = fields.IntField()
# 2. 初始化Tortoise ORM
register_tortoise(
app,
db_url="sqlite://async_demo.db",
modules={"models": ["__main__"]},
generate_schemas=True,
)
# 3. 異步數據庫任務(創建用戶)
@app.post("/users")
async def create_user(name: str, age: int):
user = await User.create(name=name, age=age)
return {"id": user.id, "name": user.name, "age": user.age}
# 4. 異步API端點(獲取用戶列表)
@app.get("/users")
async def get_users():
users = await User.all() # 異步查詢所有用戶
return [{"id": u.id, "name": u.name, "age": u.age} for u in users]
# 5. 異步任務處理(耗時操作)
import asyncio
async def async_task(delay: int):
await asyncio.sleep(delay)
return f"Task done after {delay}s"
@app.post("/start-task")
async def start_task(delay: int = 2):
task = asyncio.create_task(async_task(delay))
return {"task_id": id(task), "status": "started"}
八、總結¶
FastAPI的異步編程能顯著提升I/O密集型應用的併發能力,核心要點:
- 定義異步視圖:用
async def+await,避免阻塞事件循環。 - 異步數據庫:用SQLAlchemy 1.4+或Tortoise-ORM,簡化配置。
- 任務處理:小任務用
asyncio,大任務用異步任務隊列。 - 避免陷阱:不用
time.sleep(),遠離CPU密集型操作。
通過本文的基礎和示例,你已經可以嘗試在FastAPI中編寫異步應用了!