登录
首页 >  文章 >  python教程

Python logging 实战:用 contextvars 把 request_id 串到底

来源:Python 博主原创

时间:2026-06-08 14:31:54 427浏览 收藏

线上排查 Python 服务问题时,最怕的不是报错,而是日志里只有一句 pay failed,没有 request_id、没有订单号、没有异常栈。同一个请求进了异步任务、线程池、后台补偿后,日志上下文断成几段,搜索平台里只能按时间窗口猜。

这篇文章从一次支付失败排障写起:用 Python 标准库里的 loggingcontextvarsQueueHandlerQueueListener 做一套生产可用的日志链路。示例适合 Python 3.12/3.13,也能在更高版本继续使用。

Python 生产日志治理思维导图
日志治理不是只换格式:上下文、结构化字段、异常栈、队列写出和上线检查要一起收口。

故障现场:request_id 一会儿有,一会儿没

订单服务收到 x-request-id=rid-7f3c2e1a,入口日志能看到它;业务函数里打了一句 create order ok,没有订单号;支付失败只打印了 str(exc);后台补偿任务又生成了新的 ID。最后排障时,大家只能把多个系统的日志按秒拼在一起。

# 问题写法:并发下会串,异常栈也丢了
current_request_id = "-"

logger.info("create order ok")
logger.error("pay failed: %s", exc)

全局变量不适合保存当前请求。即使换成 threading.local(),在 asyncio 任务切换和线程池边界上也容易断。更稳的做法是把请求上下文放进 ContextVar

Python 日志请求链路流程图
一次请求从入口到业务任务,再到日志队列和 JSON 输出,request_id 应该贯穿整个链路。

第一步:用 contextvars 保存请求上下文

ContextVar 的值属于当前执行上下文,不会像普通全局变量一样被另一个并发请求直接覆盖。生产代码里要把变量放在模块顶层,并在请求结束时恢复旧值。

# logging_context.py
from __future__ import annotations

import contextvars
import uuid

request_id_var = contextvars.ContextVar("request_id", default="-")

def new_request_id(header_value: str | None) -> str:
    return header_value[:80] if header_value else f"rid-{uuid.uuid4().hex[:12]}"

async def run_with_request_id(headers, handler):
    rid = new_request_id(headers.get("x-request-id"))
    token = request_id_var.set(rid)
    try:
        return await handler()
    finally:
        request_id_var.reset(token)

不要省略 reset。请求入口、消息消费入口、定时任务入口都应该是“设置上下文、执行业务、恢复上下文”的结构。Python 3.14 以后 token 有上下文管理器用法,但在 Python 3.12/3.13 里显式 try/finally 最清楚。

第二步:用 Filter 统一给日志加字段

业务代码不应该每次都手工拼 request_id。让 logging.Filter 统一给 LogRecord 补字段,再由 JSON formatter 输出稳定结构。

# logging_setup.py
import json
import logging
from datetime import datetime, timezone

from logging_context import request_id_var

class RequestContextFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        record.request_id = request_id_var.get()
        return True

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        data = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "request_id": getattr(record, "request_id", "-"),
            "message": record.getMessage(),
        }
        for key in ("event", "order_id", "user_id"):
            value = getattr(record, key, None)
            if value is not None:
                data[key] = value
        if record.exc_info:
            data["exc_info"] = self.formatException(record.exc_info)
        return json.dumps(data, ensure_ascii=False)

字段名要少而稳定。我的最低要求是 tslevelloggerrequest_idmessage;订单号、用户 ID、事件名用 extra 补充,便于日志平台检索。

第三步:异常日志保留完整堆栈

只记录 str(exc),线上基本等于丢线索。真正处理异常的边界用 logger.exception,中间层不要层层重复打 ERROR。

logger = logging.getLogger("service.pay")

async def pay_order(order_id: str, user_id: str) -> None:
    try:
        await call_gateway(order_id)
    except TimeoutError:
        logger.exception(
            "payment gateway timeout",
            extra={
                "event": "pay_failed",
                "order_id": order_id,
                "user_id": user_id,
            },
        )
        raise
Python 日志问题写法和推荐写法对照
代码审查重点:上下文不丢、异常栈完整、请求线程不被慢 I/O 拖住、字段可以检索。

第四步:把慢 handler 放到队列后面

文件轮转、网络发送、慢磁盘写入都可能拖慢请求线程。标准库的 QueueHandler 负责入队,QueueListener 在后台线程里交给真正 handler 处理。

import logging
import queue
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler

def configure_logging() -> QueueListener:
    log_queue = queue.Queue(maxsize=10000)

    root = logging.getLogger()
    root.setLevel(logging.INFO)
    root.handlers.clear()

    queue_handler = QueueHandler(log_queue)
    queue_handler.addFilter(RequestContextFilter())
    root.addHandler(queue_handler)

    file_handler = RotatingFileHandler(
        "logs/app.jsonl",
        maxBytes=256 * 1024 * 1024,
        backupCount=10,
        encoding="utf-8",
    )
    file_handler.setFormatter(JsonFormatter())

    listener = QueueListener(log_queue, file_handler, respect_handler_level=True)
    listener.start()
    return listener

队列不是无限缓冲。上线前要确认队列满了怎么办、进程退出前是否 listener.stop()、容器日志由谁采集、是否会重复采集。如果用 multiprocessing.Queue,还要避免 multiprocessing 自己的内部日志和 QueueHandler 互相递归。

线程池边界:复制当前上下文

asyncio 任务内的上下文传播通常符合预期,但把函数丢进线程池时,最好显式复制上下文,避免子线程日志没有 request_id。

import contextvars
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=8)

def submit_with_context(fn, *args, **kwargs):
    ctx = contextvars.copy_context()
    return executor.submit(ctx.run, fn, *args, **kwargs)

上线检查清单

  • 入口统一接收或生成 x-request-id,并在 finally 中 reset。
  • ContextVar 在模块顶层创建,不在闭包里动态创建。
  • 日志输出 JSON Lines,字段名稳定,业务字段通过 extra 补充。
  • 异常边界使用 logger.exception,避免只记录 str(exc)
  • 慢 handler 放到 QueueListener 后面,请求线程只负责入队。
  • 进程退出前停止 listener,压测时观察队列长度、落盘延迟和丢日志策略。
  • 线程池任务用 contextvars.copy_context() 显式传播上下文。

总结

Python 日志治理的核心不是“多打日志”,而是让每条日志都能串起来、搜出来、解释清楚。contextvars 解决上下文隔离,logging.Filter 统一补字段,QueueHandlerQueueListener 把慢处理挪出请求路径。

如果只改一件事,先把 request_id 做成标准字段,并保证异步任务、线程池和异常日志都不丢它。等链路能串起来,再谈采样、告警和成本优化,团队排障效率会立刻提升。

声明:本文转载于:Python 博主原创 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>