线上排查 Python 服务问题时,最怕的不是报错,而是日志里只有一句 pay failed,没有 request_id、没有订单号、没有异常栈。同一个请求进了异步任务、线程池、后台补偿后,日志上下文断成几段,搜索平台里只能按时间窗口猜。
这篇文章从一次支付失败排障写起:用 Python 标准库里的 logging、contextvars、QueueHandler 和 QueueListener 做一套生产可用的日志链路。示例适合 Python 3.12/3.13,也能在更高版本继续使用。
故障现场:request_id 一会儿有,一会儿没
订单服务收到 x-request-id=rid-7f3c2e1a,入口日志能看到它;业务函数里打了一句 create order ok,没有订单号;支付失败只打印了 str(exc);后台补偿任务又生成了新的 ID。最后排障时,大家只能把多个系统的日志按秒拼在一起。
# 问题写法:并发下会串,异常栈也丢了
current_request_id = "-"
logger.info("create order ok")
logger.error("pay failed: %s", exc)
全局变量不适合保存当前请求。即使换成 threading.local(),在 asyncio 任务切换和线程池边界上也容易断。更稳的做法是把请求上下文放进 ContextVar。
第一步:用 contextvars 保存请求上下文
ContextVar 的值属于当前执行上下文,不会像普通全局变量一样被另一个并发请求直接覆盖。生产代码里要把变量放在模块顶层,并在请求结束时恢复旧值。
# logging_context.py
from __future__ import annotations
import contextvars
import uuid
request_id_var = contextvars.ContextVar("request_id", default="-")
def new_request_id(header_value: str | None) -> str:
return header_value[:80] if header_value else f"rid-{uuid.uuid4().hex[:12]}"
async def run_with_request_id(headers, handler):
rid = new_request_id(headers.get("x-request-id"))
token = request_id_var.set(rid)
try:
return await handler()
finally:
request_id_var.reset(token)
不要省略 reset。请求入口、消息消费入口、定时任务入口都应该是“设置上下文、执行业务、恢复上下文”的结构。Python 3.14 以后 token 有上下文管理器用法,但在 Python 3.12/3.13 里显式 try/finally 最清楚。
第二步:用 Filter 统一给日志加字段
业务代码不应该每次都手工拼 request_id。让 logging.Filter 统一给 LogRecord 补字段,再由 JSON formatter 输出稳定结构。
# logging_setup.py
import json
import logging
from datetime import datetime, timezone
from logging_context import request_id_var
class RequestContextFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = request_id_var.get()
return True
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
data = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"request_id": getattr(record, "request_id", "-"),
"message": record.getMessage(),
}
for key in ("event", "order_id", "user_id"):
value = getattr(record, key, None)
if value is not None:
data[key] = value
if record.exc_info:
data["exc_info"] = self.formatException(record.exc_info)
return json.dumps(data, ensure_ascii=False)
字段名要少而稳定。我的最低要求是 ts、level、logger、request_id、message;订单号、用户 ID、事件名用 extra 补充,便于日志平台检索。
第三步:异常日志保留完整堆栈
只记录 str(exc),线上基本等于丢线索。真正处理异常的边界用 logger.exception,中间层不要层层重复打 ERROR。
logger = logging.getLogger("service.pay")
async def pay_order(order_id: str, user_id: str) -> None:
try:
await call_gateway(order_id)
except TimeoutError:
logger.exception(
"payment gateway timeout",
extra={
"event": "pay_failed",
"order_id": order_id,
"user_id": user_id,
},
)
raise
第四步:把慢 handler 放到队列后面
文件轮转、网络发送、慢磁盘写入都可能拖慢请求线程。标准库的 QueueHandler 负责入队,QueueListener 在后台线程里交给真正 handler 处理。
import logging
import queue
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler
def configure_logging() -> QueueListener:
log_queue = queue.Queue(maxsize=10000)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers.clear()
queue_handler = QueueHandler(log_queue)
queue_handler.addFilter(RequestContextFilter())
root.addHandler(queue_handler)
file_handler = RotatingFileHandler(
"logs/app.jsonl",
maxBytes=256 * 1024 * 1024,
backupCount=10,
encoding="utf-8",
)
file_handler.setFormatter(JsonFormatter())
listener = QueueListener(log_queue, file_handler, respect_handler_level=True)
listener.start()
return listener
队列不是无限缓冲。上线前要确认队列满了怎么办、进程退出前是否 listener.stop()、容器日志由谁采集、是否会重复采集。如果用 multiprocessing.Queue,还要避免 multiprocessing 自己的内部日志和 QueueHandler 互相递归。
线程池边界:复制当前上下文
asyncio 任务内的上下文传播通常符合预期,但把函数丢进线程池时,最好显式复制上下文,避免子线程日志没有 request_id。
import contextvars
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=8)
def submit_with_context(fn, *args, **kwargs):
ctx = contextvars.copy_context()
return executor.submit(ctx.run, fn, *args, **kwargs)
上线检查清单
- 入口统一接收或生成
x-request-id,并在finally中 reset。 ContextVar在模块顶层创建,不在闭包里动态创建。- 日志输出 JSON Lines,字段名稳定,业务字段通过
extra补充。 - 异常边界使用
logger.exception,避免只记录str(exc)。 - 慢 handler 放到
QueueListener后面,请求线程只负责入队。 - 进程退出前停止 listener,压测时观察队列长度、落盘延迟和丢日志策略。
- 线程池任务用
contextvars.copy_context()显式传播上下文。
总结
Python 日志治理的核心不是“多打日志”,而是让每条日志都能串起来、搜出来、解释清楚。contextvars 解决上下文隔离,logging.Filter 统一补字段,QueueHandler 和 QueueListener 把慢处理挪出请求路径。
如果只改一件事,先把 request_id 做成标准字段,并保证异步任务、线程池和异常日志都不丢它。等链路能串起来,再谈采样、告警和成本优化,团队排障效率会立刻提升。