登录
首页 >  文章 >  python教程

Quart实现SSE的完整步骤详解

时间:2026-04-26 12:49:13 112浏览 收藏

本文深入剖析了在Quart框架中实现Server-Sent Events(SSE)的关键难点与生产级解决方案,直击Nginx和Hypercorn等中间件默认缓冲机制导致的实时响应失效痛点——前端长时间收不到流式数据、仅在连接断开时批量刷出;文章不仅揭示了问题根源(代理层缓存与SSE逐块推送特性的根本冲突),更提供了即开即用的异步队列驱动SSE示例代码,并强调了`X-Accel-Buffering: no`这一决定性响应头、Nginx反向代理配置要点、心跳保活策略及并发安全实践,助你快速构建低延迟、高可靠的服务端实时推送能力。

Quart 中实现 Server-Sent Events(SSE)的完整指南

本文详解如何在 Quart 框架中正确实现 Server-Sent Events,重点解决事件流被 Nginx/Hypercorn 缓冲导致前端收不到实时响应的问题,并提供可直接运行的异步队列驱动 SSE 示例。

本文详解如何在 Quart 框架中正确实现 Server-Sent Events,重点解决事件流被 Nginx/Hypercorn 缓冲导致前端收不到实时响应的问题,并提供可直接运行的异步队列驱动 SSE 示例。

Server-Sent Events(SSE)是一种基于 HTTP 的单向实时通信机制,适用于服务端向客户端持续推送更新(如通知、日志、状态变更等)。Quart 作为异步 Python Web 框架,天然支持 SSE,但实际部署中常因反向代理(如 Nginx)或 ASGI 服务器(如 Hypercorn)的默认缓冲策略,导致响应体被截断或延迟发送——表现为 Postman 或浏览器长时间挂起、无任何 event-stream 数据输出,仅在服务中断时才“刷出”累积内容。

根本原因在于:Nginx 默认启用 proxy_buffering on,会暂存后端响应直到缓冲区满或连接关闭;而 SSE 要求逐块即时传输(chunked encoding + keep-alive),必须显式禁用代理层缓冲。

✅ 正确解决方案是:在 Quart 响应头中添加 X-Accel-Buffering: no(Nginx 专用指令),强制其绕过缓冲,直通流式数据。

以下是一个生产就绪的 Quart SSE 实现示例:

from quart import Quart, request, make_response, abort
from asyncio import Queue
from dataclasses import dataclass
import asyncio

app = Quart(__name__)

@dataclass
class ServerSentEvent:
    data: str
    event: str = "message"

    def encode(self) -> bytes:
        # 严格遵循 SSE 规范:每条消息以 \r\n\r\n 结尾,字段为 data: / event: / id: 等
        lines = [f"data: {self.data}"]
        if self.event != "message":
            lines.append(f"event: {self.event}")
        lines.append("")  # 空行分隔消息
        return "\r\n".join(lines).encode("utf-8")

# 全局事件队列(生产环境建议使用 Redis Pub/Sub 或更健壮的消息总线)
global_event_queue: Queue = Queue()

@app.route("/sse")
async def sse_endpoint():
    # 客户端必须声明接受 text/event-stream
    if "text/event-stream" not in request.accept_mimetypes:
        abort(400, "Accept header must include text/event-stream")

    async def event_stream():
        # 初始订阅确认事件
        yield ServerSentEvent(event="connected", data="SSE connection established").encode()

        # 持续监听队列并推送事件
        while True:
            try:
                # 使用 timeout 避免永久阻塞(可选,增强健壮性)
                event = await asyncio.wait_for(global_event_queue.get(), timeout=30.0)
                yield event.encode()
            except asyncio.TimeoutError:
                # 心跳保活:发送空注释(SSE 注释以 : 开头,客户端忽略)
                yield b": heartbeat\n\n"

    # 构造响应:关键!添加 X-Accel-Buffering: no
    response = await make_response(
        event_stream(),
        {
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # ? 核心修复:禁用 Nginx 缓冲
        }
    )
    response.timeout = None  # 禁用 Quart 响应超时
    return response

# 示例:模拟后端触发事件(如 WebSocket 收到消息、数据库变更等)
@app.route("/trigger-event", methods=["POST"])
async def trigger_event():
    form = await request.form
    data = form.get("data", "default message")
    event_name = form.get("event", "update")

    await global_event_queue.put(ServerSentEvent(data=data, event=event_name))
    return {"status": "event queued"}

? 关键注意事项:

  • Nginx 配置补充(如使用): 除响应头外,还需在 location /sse 块中配置:
    proxy_buffering off;
    proxy_cache off;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    chunked_transfer_encoding off;  # 确保 Hypercorn 的 chunked 不被覆盖
  • ASGI 服务器兼容性: Hypercorn 默认支持流式响应,但需确保未启用 --worker-class sync 等同步模式;推荐使用默认 asyncio worker。
  • 客户端连接管理: 浏览器 EventSource 会在断连后自动重试(默认 3s),服务端应处理重复连接(如通过 request.sid 或 token 鉴权)。
  • 错误处理与心跳: 示例中加入了超时心跳,避免长连接被中间设备(如负载均衡器)静默关闭;生产环境建议结合 retry: 字段控制重连间隔。
  • 并发安全: asyncio.Queue 是协程安全的,多路 event_stream() 可共享同一队列,无需额外锁。

总结:Quart 完全胜任 SSE 场景,问题不在框架本身,而在 HTTP 中间件的流式语义适配。只需牢记——X-Accel-Buffering: no 是 Nginx 环境下 SSE 生效的黄金钥匙,配合规范的事件编码与异步流生成,即可构建低延迟、高可靠的服务端推送能力。

以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。

资料下载
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>