一、为什么请求会超时?¶
想象一下,你点了一份外卖,等了15分钟还没送到,系统会提示“配送超时”。同样,如果你的FastAPI接口处理请求时耗时太长,用户也会觉得“等得太久”,甚至直接放弃。请求超时的本质是服务器处理请求的时间超过了客户端的等待阈值,常见原因包括:
- 用户网络卡顿:客户端网络不稳定,提前终止请求。
- 服务器负载过高:同时有大量请求,服务器处理不过来,导致响应延迟。
- 接口本身耗时:比如调用数据库查询、文件读写等IO操作,代码阻塞导致处理时间过长。
超时的危害很明显:用户体验差、请求失败、甚至引发下游服务连锁反应。因此,我们需要在FastAPI中主动设置超时,并通过异步处理提升响应速度。
二、FastAPI如何设置请求超时?¶
FastAPI提供了简单的方式设置请求超时,核心是通过timeout参数控制单个请求的最大处理时间。以下是两种常见场景:
1. 路由级超时设置¶
在异步路由中,直接在async def函数中设置timeout(单位:秒)。如果请求处理超过该时间,FastAPI会立即返回超时错误。
from fastapi import FastAPI
import asyncio
app = FastAPI()
# 模拟耗时15秒的异步任务(比如数据库查询、API调用)
@app.get("/slow-task")
async def slow_task(timeout: int = 10): # 设置超时10秒
try:
await asyncio.sleep(15) # 模拟耗时操作
return {"status": "任务完成"}
except TimeoutError:
return {"error": "请求超时!服务器处理时间超过10秒"}
2. 全局超时设置¶
如果希望所有请求统一设置超时,可以通过app的lifespan 或中间件实现。以下是一个简单的全局超时中间件示例:
from fastapi import Request, Response
from fastapi.responses import JSONResponse
class TimeoutMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# 设置全局超时为10秒(注意:需结合异步框架使用)
timeout = 10
try:
return await asyncio.wait_for(
self.app(scope, receive, send),
timeout=timeout
)
except asyncio.TimeoutError:
return JSONResponse(
status_code=504, # 504表示网关超时
content={"error": "全局超时!请稍后重试"}
)
app.add_middleware(TimeoutMiddleware)
三、异步处理:让FastAPI更快的秘诀¶
为什么异步能解决超时问题?同步处理会让代码“干等”,而异步可以在等待时处理其他请求。比如烧水时,同步是“一直盯着水壶等水开”,异步是“设置闹钟后去做别的事,水开时再回来”。
1. 同步vs异步:代码对比¶
- 同步路由(阻塞,适合CPU密集型任务):
import time
def sync_task():
time.sleep(5) # 同步耗时操作会阻塞整个线程
return "同步完成"
- 异步路由(非阻塞,适合IO密集型任务):
import asyncio
async def async_task():
await asyncio.sleep(5) # 异步等待,不阻塞线程
return "异步完成"
2. FastAPI中异步路由的写法¶
FastAPI推荐用async def定义异步路由,配合await调用异步操作。例如:
@app.get("/async-route")
async def async_route():
# 调用异步IO操作(如异步数据库查询)
await asyncio.sleep(3) # 模拟异步耗时
return {"message": "异步处理完成"}
3. 异步任务:后台执行不阻塞¶
如果任务不需要立即返回结果(比如发送邮件、生成报表),可以用BackgroundTasks或asyncio.create_task在后台执行:
from fastapi import BackgroundTasks
@app.post("/send-email")
async def send_email(background_tasks: BackgroundTasks):
# 将耗时任务加入后台,不阻塞主请求
background_tasks.add_task(send_async, "user@example.com")
return {"status": "邮件已加入发送队列,立即返回"}
async def send_async(to_email):
await asyncio.sleep(10) # 异步发送邮件(需异步库支持)
print(f"邮件已发送给{to_email}")
四、性能优化实战:从代码到部署¶
优化性能不能只靠超时和异步,还需要从代码、数据库、服务器等层面综合优化。
1. 缓存:减少重复计算¶
对高频但不常变化的数据(如热门商品列表),用lru_cache或Redis缓存结果:
from functools import lru_cache
# 缓存最近100次调用结果
@lru_cache(maxsize=100)
def get_cached_data(param: int):
# 实际场景:从数据库查询
return param * 2 # 模拟计算
@app.get("/cached-data/{param}")
async def cached_data(param: int):
data = get_cached_data(param)
return {"data": data}
2. 数据库连接池¶
数据库操作是最常见的IO瓶颈。使用连接池可以避免频繁创建/关闭连接:
import asyncpg
# 连接池配置(PostgreSQL示例)
async def init_db_pool():
return await asyncpg.create_pool(
user="user",
password="password",
database="mydb",
host="localhost",
max_size=20 # 最大连接数,根据服务器性能调整
)
# 路由中调用连接池查询
@app.get("/db-data")
async def db_data(pool=Depends(init_db_pool)):
async with pool.acquire() as connection:
result = await connection.fetchrow("SELECT * FROM users LIMIT 10")
return {"data": result}
3. 数据库优化¶
- 避免N+1查询:用
SELECT ... JOIN代替多次单条查询。 - 索引优化:给高频查询字段加索引(如用户ID、订单号)。
- 批量操作:多数据写入时用
INSERT INTO ... VALUES (), ()代替循环单条插入。
4. 部署层面:负载均衡¶
单机性能有限时,通过Nginx、Gunicorn+Uvicorn多进程部署,或使用云服务的负载均衡:
- Uvicorn多进程:uvicorn main:app --workers 4 --reload(4个工作进程)。
- 负载均衡:将请求分发到多台服务器(如AWS ELB、阿里云SLB)。
五、总结:如何综合优化?¶
- 先解决超时:用
timeout参数设置合理阈值(如10-30秒),避免用户干等。 - 异步处理IO密集型任务:所有数据库、API调用等耗时操作必须用异步库(如
asyncpg、httpx.AsyncClient)。 - 缓存高频数据:用
lru_cache或Redis减少重复计算。 - 优化数据库连接:配置连接池,避免频繁连接开销。
- 部署扩展:多进程、负载均衡提升并发处理能力。
通过以上步骤,你的FastAPI接口将既能避免超时,又能高效处理高并发请求,让用户体验和系统稳定性同时提升。