登录
首页 >  文章 >  python教程

Python生产者消费者模型详解

时间:2026-01-08 18:36:44 162浏览 收藏

欢迎各位小伙伴来到golang学习网,相聚于此都是缘哈哈哈!今天我给大家带来《Python任务调度模型:生产者消费者模式详解》,这篇文章主要讲到等等知识,如果你对文章相关的知识非常感兴趣或者正在自学,都可以关注我,我会持续更新相关文章!当然,有什么建议也欢迎在评论留言提出!一起学习!

生产者消费者模式是解耦任务生成与执行的并发模型,由生产者线程向线程安全队列put任务、消费者线程get并处理,配合task_done和join实现同步,适用于日志处理等高吞吐场景。

Python任务调度模型教程_生产者消费者模式

什么是生产者消费者模式

生产者消费者模式是一种经典的并发编程模型,用于解耦任务的生成与执行。在Python任务调度场景中,它表现为:一个或多个线程(或进程)负责生成任务(生产者),把任务放入共享队列;另一个或多个工作线程(消费者)持续从队列中取出任务并执行。这种结构天然适合异步、批量、高吞吐的任务调度系统,比如日志处理、定时爬虫、消息推送等。

核心组件:队列 + 线程/进程协作

Python标准库中的 queue.Queue 是实现该模式的关键——它线程安全,内置阻塞与超时机制,无需手动加锁。搭配 threadingmultiprocessing 模块即可快速搭建调度骨架。

  • 使用 queue.Queue(maxsize=0) 创建无限长队列;设为正整数可控制缓冲区大小,避免内存溢出
  • 生产者调用 put(item) 入队,可选 block=True, timeout=2 防止无限等待
  • 消费者调用 get() 出队,配合 task_done()join() 实现任务完成同步
  • 推荐用守护线程(daemon=True)运行消费者,主程序退出时自动终止

一个轻量可用的调度示例

下面是一个基于多线程的最小可行调度器,模拟“每秒生成3个任务,由2个工人并发处理”:

<font size="2"><code>import queue
import threading
import time
import random
<h1>共享任务队列</h1><p>task_queue = queue.Queue()</p><p>def producer():
for i in range(10):
task = f"task-{i}"
print(f"[生产] {task}")
task_queue.put(task)
time.sleep(1)  # 模拟不均匀生产节奏</p><p>def worker(worker_id):
while True:
try:
task = task_queue.get(timeout=3)  # 3秒无任务则退出
print(f"[工人{worker_id}] 正在处理 {task}")
time.sleep(random.uniform(0.5, 2))  # 模拟耗时操作
task_queue.task_done()
except queue.Empty:
break</p><h1>启动1个生产者、2个消费者</h1><p>threading.Thread(target=producer).start()
for i in range(2):
threading.Thread(target=worker, args=(i+1,), daemon=True).start()</p><h1>等待所有任务完成</h1><p>task_queue.join()
print("全部任务执行完毕")
</p></code></font>

进阶建议:应对真实生产环境

简单示例满足学习,但上线需考虑健壮性与可观测性:

  • 任务应封装为可序列化对象(如字典或dataclass),便于持久化或跨进程传递
  • 加入异常捕获与重试逻辑,避免单个失败任务阻塞整个队列
  • logging 替代 print,记录任务ID、开始/结束时间、耗时、错误堆栈
  • 必要时替换为 redis Queue(如RQ)或 Apache Airflow,支持分布式、持久化、Web监控
  • 若任务IO密集(如HTTP请求),优先用 threading;若CPU密集,考虑 multiprocessingconcurrent.futures.ProcessPoolExecutor

理论要掌握,实操不能落!以上关于《Python生产者消费者模型详解》的详细介绍,大家都掌握了吧!如果想要继续提升自己的能力,那么就来关注golang学习网公众号吧!

前往漫画官网入口并下载 ➜
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>