一、为什么需要异步编程?¶
在开始FastAPI的异步之旅前,我们先理解一个概念:同步编程和异步编程的区别。
想象你在一家咖啡店点单:
- 同步:你站在吧台前,一直等到咖啡做好才离开。期间吧台无法接待其他客人,效率很低。
- 异步:你点单后拿到一个号码,然后去旁边坐着等待叫号。期间吧台可以接待其他客人,你只需要在叫号时回来取咖啡。
异步编程的核心就是让程序在等待I/O操作(如数据库查询、API请求、文件读写)时,不阻塞当前任务,而是去处理其他事情。这对于处理大量并发请求(比如用户访问、数据同步)的API服务来说非常重要。
FastAPI的异步支持让它能在高并发场景下保持高效,尤其适合处理I/O密集型任务(如与数据库交互、调用第三方API)。
二、FastAPI中的异步基础¶
2.1 异步函数的定义¶
在FastAPI中,定义异步视图函数需要使用async def关键字,并且调用异步函数时必须用await。
from fastapi import FastAPI
app = FastAPI()
# 异步视图函数
@app.get("/")
async def read_root():
return {"message": "Hello, Async FastAPI!"} # 无需await,因为没有异步操作
注意:当函数中没有异步I/O操作时,async def和普通函数def的效果相同,但异步函数允许内部调用await。
2.2 使用await调用异步函数¶
如果你的函数需要调用另一个异步函数(比如异步数据库查询),必须用await关键字等待其完成,否则会报错。
import asyncio
async def async_database_query():
await asyncio.sleep(1) # 模拟异步数据库查询耗时1秒
return {"user": "Alice", "age": 30}
@app.get("/user")
async def get_user():
# 调用异步函数必须用await
result = await async_database_query()
return result
三、异步视图与同步视图的区别¶
| 类型 | 定义方式 | 适用场景 | 特点 |
|---|---|---|---|
| 同步视图 | def |
简单逻辑、CPU密集型任务 | 直接执行,无事件循环阻塞 |
| 异步视图 | async def |
I/O密集型任务(数据库、API) | 非阻塞,可同时处理多个请求 |
示例对比:
# 同步视图(用requests请求外部API)
import requests
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync-data")
def sync_data():
response = requests.get("https://api.example.com/data") # 同步请求会阻塞
return response.json()
# 异步视图(用aiohttp请求外部API)
import aiohttp
from fastapi import FastAPI
app = FastAPI()
@app.get("/async-data")
async def async_data():
async with aiohttp.ClientSession() as session: # 异步会话
async with session.get("https://api.example.com/data") as response:
return await response.json() # 用await等待响应
四、异步数据库操作¶
在FastAPI中,异步数据库操作需要配合异步数据库驱动和ORM工具。以下是最常用的两种方案:
4.1 使用SQLAlchemy(1.4+支持异步)¶
SQLAlchemy 1.4版本后引入了异步支持,需要搭配asyncpg(PostgreSQL)或aiosqlite(SQLite)等驱动。
步骤1:安装依赖
pip install sqlalchemy asyncpg # PostgreSQL示例
步骤2:配置异步数据库会话
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 依赖项:获取异步数据库会话
async def get_db():
async with AsyncSessionLocal() as session:
yield session
await session.commit()
步骤3:异步查询示例
from sqlalchemy import select
from sqlalchemy.orm import Session
from fastapi import Depends, FastAPI
app = FastAPI()
class User(Base): # Base为SQLAlchemy的声明基类
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# 异步查询用户
result = await db.execute(select(User).filter(User.id == user_id))
user = result.scalars().first()
if user:
return {"id": user.id, "name": user.name}
return {"error": "User not found"}
4.2 使用Tortoise-ORM(更简单的异步ORM)¶
如果你觉得SQLAlchemy配置复杂,可以用Tortoise-ORM(专为异步设计),代码更简洁。
安装依赖:
pip install tortoise-orm
定义模型与查询:
from tortoise import BaseDBAsyncClient, fields, Tortoise
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI
app = FastAPI()
# 定义User模型
class User(Tortoise.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
# 初始化Tortoise ORM
register_tortoise(
app,
db_url="sqlite://db.sqlite3", # 数据库路径
modules={"models": ["__main__"]}, # 包含模型的模块
generate_schemas=True, # 自动创建表结构
)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await User.get(id=user_id) # 异步查询
return {"id": user.id, "name": user.name}
五、异步任务处理¶
FastAPI的异步能力不仅限于API请求,还能处理后台任务(如发送邮件、生成报表)。以下是简单示例:
5.1 使用asyncio处理简单异步任务¶
import asyncio
from fastapi import FastAPI
app = FastAPI()
# 模拟耗时任务(如发送邮件)
async def send_email_task(to: str, content: str):
await asyncio.sleep(2) # 模拟发送耗时
return f"Email sent to {to}"
@app.post("/send-email")
async def send_email(to: str, content: str):
# 创建后台任务(不阻塞API响应)
task = asyncio.create_task(send_email_task(to, content))
return {"status": "Email queued", "task_id": id(task)} # task_id仅作演示
5.2 结合异步任务队列(进阶)¶
对于长时间运行的任务(如批量处理数据),推荐使用异步任务队列(如Celery+Redis)。FastAPI负责接收请求,异步任务队列处理耗时操作:
# 简单示例:FastAPI + Celery异步任务
from fastapi import FastAPI
from celery import Celery
import time
app = FastAPI()
# 初始化Celery(需配置Redis/RabbitMQ)
celery = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
# 定义异步任务
@celery.task
def long_running_task():
time.sleep(5) # 模拟CPU密集型任务
return "Task completed"
@app.post("/run-task")
async def run_task():
task = long_running_task.delay() # 发送任务到队列
return {"task_id": task.id, "status": "running"}
六、实践注意事项¶
6.1 避免阻塞事件循环¶
- 不要使用
time.sleep():会完全阻塞事件循环,导致服务器无法响应其他请求。
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/bad-sleep")
async def bad_sleep():
time.sleep(5) # ❌ 阻塞事件循环!
return {"error": "This is bad"}
@app.get("/good-sleep")
async def good_sleep():
await asyncio.sleep(5) # ✅ 非阻塞,事件循环可处理其他请求
return {"success": "This is good"}
- 远离CPU密集型操作:异步适合I/O,不适合CPU密集(如计算百万级数据),此时应使用多进程。
6.2 合理使用asyncio任务¶
- 用
asyncio.gather()并行执行多个异步任务:
async def task1():
await asyncio.sleep(1)
return "Task 1 done"
async def task2():
await asyncio.sleep(2)
return "Task 2 done"
@app.get("/parallel-tasks")
async def parallel_tasks():
results = await asyncio.gather(task1(), task2()) # 并行执行,总耗时≈2秒
return {"results": results}
七、简单异步应用示例¶
以下是一个完整的FastAPI异步应用,包含:
- 异步数据库查询(用Tortoise-ORM)
- 异步任务处理
- 简单的JSON响应
from tortoise import fields, Tortoise
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI
app = FastAPI()
# 1. 定义Tortoise模型
class User(Tortoise.Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=100)
age = fields.IntField()
# 2. 初始化Tortoise ORM
register_tortoise(
app,
db_url="sqlite://async_demo.db",
modules={"models": ["__main__"]},
generate_schemas=True,
)
# 3. 异步数据库任务(创建用户)
@app.post("/users")
async def create_user(name: str, age: int):
user = await User.create(name=name, age=age)
return {"id": user.id, "name": user.name, "age": user.age}
# 4. 异步API端点(获取用户列表)
@app.get("/users")
async def get_users():
users = await User.all() # 异步查询所有用户
return [{"id": u.id, "name": u.name, "age": u.age} for u in users]
# 5. 异步任务处理(耗时操作)
import asyncio
async def async_task(delay: int):
await asyncio.sleep(delay)
return f"Task done after {delay}s"
@app.post("/start-task")
async def start_task(delay: int = 2):
task = asyncio.create_task(async_task(delay))
return {"task_id": id(task), "status": "started"}
八、总结¶
FastAPI的异步编程能显著提升I/O密集型应用的并发能力,核心要点:
- 定义异步视图:用
async def+await,避免阻塞事件循环。 - 异步数据库:用SQLAlchemy 1.4+或Tortoise-ORM,简化配置。
- 任务处理:小任务用
asyncio,大任务用异步任务队列。 - 避免陷阱:不用
time.sleep(),远离CPU密集型操作。
通过本文的基础和示例,你已经可以尝试在FastAPI中编写异步应用了!