登录
推荐 文章 Go 技术 课程 下载 专题 AI
首页 >  文章 >  python教程

Python asyncio 超时与取消实战:用 timeout 和 TaskGroup 管住慢任务

来源:17golang原创

时间:2026-06-12 20:37:12 457浏览 收藏

异步代码最容易出问题的地方,不是语法,而是“慢任务怎么办”。一个接口聚合三个上游服务,任意一个服务变慢,都可能把请求拖到超时;如果取消处理没有写好,还会留下半完成任务、未关闭连接和难以复现的偶发告警。

本文用一个可运行的小例子,梳理 Python asyncio 中常用的超时与取消手段:asyncio.timeout()asyncio.wait_for()TaskGroupshield()。目标不是把 API 背一遍,而是能在真实项目里给异步任务划边界、做清理、让故障更容易定位。

摘要

我们会从一个“订单详情聚合接口”出发,先写出会被慢上游拖住的版本,再逐步加入超时边界、结构化并发、取消清理和关键任务保护。读完后,你可以把同样的思路迁移到爬虫、消息消费、异步 Web 接口和批量任务调度中。

适合人群

适合已经会写基础 async defawait,但在项目里遇到慢请求、任务取消、异步资源释放问题的 Python 开发者。示例推荐使用 Python 3.11 及以上版本运行,因为 TaskGroupasyncio.timeout() 在这些版本里更适合组织结构化并发。

目录

  1. 先复现一个慢上游问题
  2. 用 timeout 给整段流程划边界
  3. wait_for 适合包住单个等待点
  4. TaskGroup 让一组任务同生共退
  5. 取消传播后一定要做资源清理
  6. 什么时候使用 shield
  7. 常见坑和排查思路

一、先复现一个慢上游问题

假设订单详情页需要同时读取订单、库存和优惠信息。为了模拟网络调用,下面的函数用 asyncio.sleep() 代表上游耗时:

import asyncio
from dataclasses import dataclass


@dataclass
class ServiceResult:
    name: str
    value: str


async def query_service(name: str, delay: float) -> ServiceResult:
    await asyncio.sleep(delay)
    return ServiceResult(name=name, value=f"{name}-ok")


async def build_order_detail() -> list[ServiceResult]:
    order_task = asyncio.create_task(query_service("order", 0.2))
    stock_task = asyncio.create_task(query_service("stock", 0.4))
    coupon_task = asyncio.create_task(query_service("coupon", 3.0))
    return await asyncio.gather(order_task, stock_task, coupon_task)


async def main() -> None:
    result = await build_order_detail()
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

这段代码可以正常返回,但 coupon 慢到 3 秒时,整个聚合结果也会被拖到 3 秒。如果这是一个对外接口,请求线程池、网关超时和用户体验都会受到影响。

asyncio 聚合接口的超时边界示意图

二、用 timeout 给整段流程划边界

当你希望“这段业务最多只能花多少时间”时,可以把整段逻辑放进 asyncio.timeout()。它像一个时间盒,时间到了就取消当前等待中的任务,并抛出超时异常。

async def build_order_detail_with_limit() -> list[ServiceResult]:
    try:
        async with asyncio.timeout(1.0):
            return await asyncio.gather(
                query_service("order", 0.2),
                query_service("stock", 0.4),
                query_service("coupon", 3.0),
            )
    except TimeoutError:
        return [
            ServiceResult(name="order", value="fallback"),
            ServiceResult(name="stock", value="fallback"),
            ServiceResult(name="coupon", value="timeout"),
        ]

这里的重点是边界放在哪里。如果把边界放在整段聚合外层,业务表现是“超过 1 秒就走兜底”;如果只给某一个上游设置边界,业务表现就是“慢上游单独降级,其它结果照常返回”。

三、wait_for 适合包住单个等待点

asyncio.wait_for() 更适合包住一个明确的等待点,比如某个上游、某个锁、某个队列读取。下面的写法让优惠服务最多等待 0.8 秒,订单和库存仍然按原计划返回:

async def safe_coupon() -> ServiceResult:
    try:
        return await asyncio.wait_for(query_service("coupon", 3.0), timeout=0.8)
    except TimeoutError:
        return ServiceResult(name="coupon", value="timeout")


async def build_order_detail_partly_degraded() -> list[ServiceResult]:
    return await asyncio.gather(
        query_service("order", 0.2),
        query_service("stock", 0.4),
        safe_coupon(),
    )

外层时间盒强调“整体耗时不能超过多少”,单点等待强调“这个上游不能拖累其它结果”。在接口聚合、批量读取和多数据源组合场景里,通常会同时使用这两种策略。

四、TaskGroup 让一组任务同生共退

如果一组任务属于同一个业务步骤,推荐用 TaskGroup 表达这种关系。它的好处是结构清楚:只要组内某个任务异常退出,组里的其它任务也会被取消,调用者可以在一个地方处理结果。

async def build_with_task_group() -> dict[str, ServiceResult]:
    tasks = {}
    try:
        async with asyncio.timeout(1.2):
            async with asyncio.TaskGroup() as group:
                tasks["order"] = group.create_task(query_service("order", 0.2))
                tasks["stock"] = group.create_task(query_service("stock", 0.4))
                tasks["coupon"] = group.create_task(query_service("coupon", 3.0))
    except TimeoutError:
        return {
            "order": ServiceResult("order", "fallback"),
            "stock": ServiceResult("stock", "fallback"),
            "coupon": ServiceResult("coupon", "timeout"),
        }

    return {name: task.result() for name, task in tasks.items()}

和手动保存多个任务相比,TaskGroup 更像一个任务作用域。作用域结束时,任务要么完成,要么被明确取消,不容易出现“外层函数已经返回,里面还有任务偷偷运行”的情况。

asyncio TaskGroup 取消传播和资源清理流程图

五、取消传播后一定要做资源清理

任务被取消时,会在等待点收到 asyncio.CancelledError。不要把它当作普通异常随手吞掉。更稳妥的做法是:清理资源,然后继续抛出,让上层知道任务已经取消。

async def stream_rows(name: str) -> int:
    opened = False
    try:
        opened = True
        count = 0
        while True:
            await asyncio.sleep(0.2)
            count += 1
    except asyncio.CancelledError:
        print(f"{name}: task cancelled, close cursor")
        raise
    finally:
        if opened:
            print(f"{name}: resource closed")

如果你在 except asyncio.CancelledError 里返回默认值,上层会以为任务正常完成,排查时很难知道它其实被取消过。除非你非常明确地要把取消转成业务结果,否则应当清理后继续抛出。

六、什么时候使用 shield

asyncio.shield() 可以保护某个等待对象,不让外层取消直接打断它。它适合很少数需要“尽量完成”的短操作,比如写审计日志、释放租约、提交本地状态。不要把大段业务逻辑都包进 shield(),否则超时边界会变得难以理解。

async def write_audit_log(order_id: str) -> None:
    await asyncio.sleep(0.2)
    print(f"audit saved: {order_id}")


async def handle_request(order_id: str) -> str:
    try:
        async with asyncio.timeout(1.0):
            await query_service("order", 0.8)
            return "ok"
    finally:
        await asyncio.shield(write_audit_log(order_id))

这段代码表达的是:主流程有 1 秒边界,但收尾日志尽量落下。真实项目里还要控制日志写入自己的超时时间,避免保护动作本身又变成新的慢点。

七、常见坑和排查思路

1. 只设置网关超时,没有设置协程超时

网关断开连接后,如果应用内部任务没有收到明确的取消或超时,它可能还会继续占用连接池、锁和内存。应用层也要有自己的时间边界。

2. 把 CancelledError 当普通异常吞掉

取消是一种控制信号。清理资源可以,直接吞掉要谨慎。吞掉取消信号后,外层任务状态会变得不真实,监控也容易误判。

3. 所有上游共用一个超时时间

核心上游和非核心上游的策略应当不同。订单主数据可以等待更久,推荐信息、优惠角标这类非核心结果可以更快降级。

4. 没有记录慢点来源

建议在每个上游调用旁边记录耗时、超时次数和降级原因。日志里至少要有业务 ID、上游名、耗时、是否降级,排查时会省很多时间。

完整示例

下面把整体超时、单点降级和结构化任务组织合在一起,便于你复制到本地运行:

import asyncio
from dataclasses import dataclass


@dataclass
class ServiceResult:
    name: str
    value: str


async def query_service(name: str, delay: float) -> ServiceResult:
    await asyncio.sleep(delay)
    return ServiceResult(name, f"{name}-ok")


async def safe_call(name: str, delay: float, limit: float) -> ServiceResult:
    try:
        return await asyncio.wait_for(query_service(name, delay), timeout=limit)
    except TimeoutError:
        return ServiceResult(name, "timeout")


async def build_order_detail() -> dict[str, ServiceResult]:
    async with asyncio.timeout(1.5):
        async with asyncio.TaskGroup() as group:
            order = group.create_task(safe_call("order", 0.2, 1.0))
            stock = group.create_task(safe_call("stock", 0.4, 1.0))
            coupon = group.create_task(safe_call("coupon", 3.0, 0.6))

    return {
        "order": order.result(),
        "stock": stock.result(),
        "coupon": coupon.result(),
    }


async def main() -> None:
    try:
        detail = await build_order_detail()
    except TimeoutError:
        detail = {"error": ServiceResult("all", "timeout")}
    print(detail)


if __name__ == "__main__":
    asyncio.run(main())

总结

asyncio 的超时治理可以按三层理解:整体流程用 asyncio.timeout() 划边界,单个等待点用 asyncio.wait_for() 控制慢上游,一组强相关任务用 TaskGroup 管生命周期。任务被取消时,先清理资源,再把取消信号交回上层。这样写出来的异步代码,慢点更可控,故障也更容易定位。

参考资料

本文示例参考了 Python 官方文档中关于 asyncio 任务、超时、任务组和取消行为的说明,并结合接口聚合场景重新组织为原创示例。

声明:本文转载于:17golang原创 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>