FastAPI异步编程:从基础到简单应用的实践指南

一、为什么需要异步编程?

在开始FastAPI的异步之旅前,我们先理解一个概念:同步编程异步编程的区别。

想象你在一家咖啡店点单:
- 同步:你站在吧台前,一直等到咖啡做好才离开。期间吧台无法接待其他客人,效率很低。
- 异步:你点单后拿到一个号码,然后去旁边坐着等待叫号。期间吧台可以接待其他客人,你只需要在叫号时回来取咖啡。

异步编程的核心就是让程序在等待I/O操作(如数据库查询、API请求、文件读写)时,不阻塞当前任务,而是去处理其他事情。这对于处理大量并发请求(比如用户访问、数据同步)的API服务来说非常重要。

FastAPI的异步支持让它能在高并发场景下保持高效,尤其适合处理I/O密集型任务(如与数据库交互、调用第三方API)。

二、FastAPI中的异步基础

2.1 异步函数的定义

在FastAPI中,定义异步视图函数需要使用async def关键字,并且调用异步函数时必须用await

from fastapi import FastAPI

app = FastAPI()

# 异步视图函数
@app.get("/")
async def read_root():
    return {"message": "Hello, Async FastAPI!"}  # 无需await,因为没有异步操作

注意:当函数中没有异步I/O操作时,async def和普通函数def的效果相同,但异步函数允许内部调用await

2.2 使用await调用异步函数

如果你的函数需要调用另一个异步函数(比如异步数据库查询),必须用await关键字等待其完成,否则会报错。

import asyncio

async def async_database_query():
    await asyncio.sleep(1)  # 模拟异步数据库查询耗时1秒
    return {"user": "Alice", "age": 30}

@app.get("/user")
async def get_user():
    # 调用异步函数必须用await
    result = await async_database_query()
    return result

三、异步视图与同步视图的区别

类型 定义方式 适用场景 特点
同步视图 def 简单逻辑、CPU密集型任务 直接执行,无事件循环阻塞
异步视图 async def I/O密集型任务(数据库、API) 非阻塞,可同时处理多个请求

示例对比

# 同步视图(用requests请求外部API)
import requests
from fastapi import FastAPI

app = FastAPI()

@app.get("/sync-data")
def sync_data():
    response = requests.get("https://api.example.com/data")  # 同步请求会阻塞
    return response.json()

# 异步视图(用aiohttp请求外部API)
import aiohttp
from fastapi import FastAPI

app = FastAPI()

@app.get("/async-data")
async def async_data():
    async with aiohttp.ClientSession() as session:  # 异步会话
        async with session.get("https://api.example.com/data") as response:
            return await response.json()  # 用await等待响应

四、异步数据库操作

在FastAPI中,异步数据库操作需要配合异步数据库驱动和ORM工具。以下是最常用的两种方案:

4.1 使用SQLAlchemy(1.4+支持异步)

SQLAlchemy 1.4版本后引入了异步支持,需要搭配asyncpg(PostgreSQL)或aiosqlite(SQLite)等驱动。

步骤1:安装依赖

pip install sqlalchemy asyncpg  # PostgreSQL示例

步骤2:配置异步数据库会话

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# 依赖项:获取异步数据库会话
async def get_db():
    async with AsyncSessionLocal() as session:
        yield session
        await session.commit()

步骤3:异步查询示例

from sqlalchemy import select
from sqlalchemy.orm import Session

from fastapi import Depends, FastAPI

app = FastAPI()

class User(Base):  # Base为SQLAlchemy的声明基类
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    # 异步查询用户
    result = await db.execute(select(User).filter(User.id == user_id))
    user = result.scalars().first()
    if user:
        return {"id": user.id, "name": user.name}
    return {"error": "User not found"}

4.2 使用Tortoise-ORM(更简单的异步ORM)

如果你觉得SQLAlchemy配置复杂,可以用Tortoise-ORM(专为异步设计),代码更简洁。

安装依赖

pip install tortoise-orm

定义模型与查询

from tortoise import BaseDBAsyncClient, fields, Tortoise
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI

app = FastAPI()

# 定义User模型
class User(Tortoise.Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=100)

# 初始化Tortoise ORM
register_tortoise(
    app,
    db_url="sqlite://db.sqlite3",  # 数据库路径
    modules={"models": ["__main__"]},  # 包含模型的模块
    generate_schemas=True,  # 自动创建表结构
)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await User.get(id=user_id)  # 异步查询
    return {"id": user.id, "name": user.name}

五、异步任务处理

FastAPI的异步能力不仅限于API请求,还能处理后台任务(如发送邮件、生成报表)。以下是简单示例:

5.1 使用asyncio处理简单异步任务

import asyncio
from fastapi import FastAPI

app = FastAPI()

# 模拟耗时任务(如发送邮件)
async def send_email_task(to: str, content: str):
    await asyncio.sleep(2)  # 模拟发送耗时
    return f"Email sent to {to}"

@app.post("/send-email")
async def send_email(to: str, content: str):
    # 创建后台任务(不阻塞API响应)
    task = asyncio.create_task(send_email_task(to, content))
    return {"status": "Email queued", "task_id": id(task)}  # task_id仅作演示

5.2 结合异步任务队列(进阶)

对于长时间运行的任务(如批量处理数据),推荐使用异步任务队列(如Celery+Redis)。FastAPI负责接收请求,异步任务队列处理耗时操作:

# 简单示例:FastAPI + Celery异步任务
from fastapi import FastAPI
from celery import Celery
import time

app = FastAPI()

# 初始化Celery(需配置Redis/RabbitMQ)
celery = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

# 定义异步任务
@celery.task
def long_running_task():
    time.sleep(5)  # 模拟CPU密集型任务
    return "Task completed"

@app.post("/run-task")
async def run_task():
    task = long_running_task.delay()  # 发送任务到队列
    return {"task_id": task.id, "status": "running"}

六、实践注意事项

6.1 避免阻塞事件循环

  • 不要使用time.sleep():会完全阻塞事件循环,导致服务器无法响应其他请求。
  import time
  from fastapi import FastAPI

  app = FastAPI()

  @app.get("/bad-sleep")
  async def bad_sleep():
      time.sleep(5)  # ❌ 阻塞事件循环!
      return {"error": "This is bad"}

  @app.get("/good-sleep")
  async def good_sleep():
      await asyncio.sleep(5)  # ✅ 非阻塞,事件循环可处理其他请求
      return {"success": "This is good"}
  • 远离CPU密集型操作:异步适合I/O,不适合CPU密集(如计算百万级数据),此时应使用多进程。

6.2 合理使用asyncio任务

  • asyncio.gather()并行执行多个异步任务
  async def task1():
      await asyncio.sleep(1)
      return "Task 1 done"

  async def task2():
      await asyncio.sleep(2)
      return "Task 2 done"

  @app.get("/parallel-tasks")
  async def parallel_tasks():
      results = await asyncio.gather(task1(), task2())  # 并行执行,总耗时≈2秒
      return {"results": results}

七、简单异步应用示例

以下是一个完整的FastAPI异步应用,包含:
- 异步数据库查询(用Tortoise-ORM)
- 异步任务处理
- 简单的JSON响应

from tortoise import fields, Tortoise
from tortoise.contrib.fastapi import register_tortoise
from fastapi import FastAPI

app = FastAPI()

# 1. 定义Tortoise模型
class User(Tortoise.Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=100)
    age = fields.IntField()

# 2. 初始化Tortoise ORM
register_tortoise(
    app,
    db_url="sqlite://async_demo.db",
    modules={"models": ["__main__"]},
    generate_schemas=True,
)

# 3. 异步数据库任务(创建用户)
@app.post("/users")
async def create_user(name: str, age: int):
    user = await User.create(name=name, age=age)
    return {"id": user.id, "name": user.name, "age": user.age}

# 4. 异步API端点(获取用户列表)
@app.get("/users")
async def get_users():
    users = await User.all()  # 异步查询所有用户
    return [{"id": u.id, "name": u.name, "age": u.age} for u in users]

# 5. 异步任务处理(耗时操作)
import asyncio

async def async_task(delay: int):
    await asyncio.sleep(delay)
    return f"Task done after {delay}s"

@app.post("/start-task")
async def start_task(delay: int = 2):
    task = asyncio.create_task(async_task(delay))
    return {"task_id": id(task), "status": "started"}

八、总结

FastAPI的异步编程能显著提升I/O密集型应用的并发能力,核心要点:

  1. 定义异步视图:用async def + await,避免阻塞事件循环。
  2. 异步数据库:用SQLAlchemy 1.4+或Tortoise-ORM,简化配置。
  3. 任务处理:小任务用asyncio,大任务用异步任务队列。
  4. 避免陷阱:不用time.sleep(),远离CPU密集型操作。

通过本文的基础和示例,你已经可以尝试在FastAPI中编写异步应用了!

小夜