登录
首页 >  文章 >  java教程

Java简易MQ实现:基于wait/notify的生产者消费者模型

时间:2026-05-09 19:39:49 450浏览 收藏

本文深入剖析了如何用 Java 原生 wait/notify 机制手写一个线程安全、带容量限制的简易内存消息队列,直击裸用 wait/notify 的典型陷阱——如非法监视器异常、虚假唤醒、唤醒错对象等,并给出可落地的实操规范:必须用 synchronized 保护共享状态、用 while 而非 if 循环检测条件、始终 notifyAll 以避免线程饥饿;同时阐明手写目的并非替代 BlockingQueue,而是透彻理解阻塞语义底层逻辑,适用于学习、单元测试或极简嵌入场景;文末还点破上线前极易忽视的三大坑:中断状态未恢复、容器选型不当导致性能骤降、小容量队列高并发下锁争抢恶化吞吐——帮你避开从原理到实践的全链路雷区。

如何在Java中编写简易的消息队列(MQ)内存版_wait/notify机制与生产者消费者模型整合

为什么不用 wait/notify 直接写生产者消费者队列?

因为裸用 wait/notify 极易死锁或虚假唤醒,且无法保证线程安全的入队/出队原子性。比如没加 synchronized 块就调用 wait(),会抛 IllegalMonitorStateException;而只在 if 里判断队列空/满,不改用 while,就会漏掉唤醒后条件已失效的情况。

实操建议:

  • 所有共享状态(如队列、计数器)必须被同一把锁保护,推荐用 synchronized 修饰方法或代码块,锁对象最好是队列本身(this 或显式 private final Object lock = new Object()
  • 入队前必须 while (queue.size() >= capacity) 循环等待,不能用 if
  • 每次 notifyAll() 而非 notify()——避免唤醒错类型线程(比如只唤醒另一个生产者,消费者还在挂起)
  • 出队同理:用 while (queue.isEmpty()) + wait() + notifyAll()

BlockingQueueput()/take() 已经封装好了,为什么还要手写?

手写不是为了替代,而是为了理解阻塞语义怎么落地。比如 ArrayBlockingQueue 内部正是基于 ReentrantLock + Condition 实现的,比原始 wait/notify 更精细(可分离“非空”和“非满”两个等待队列)。但如果你只是临时需要一个无依赖、轻量、可控的内存队列(比如单元测试模拟 MQ 行为),手写反而是最省事的。

实操建议:

  • 别自己造轮子去支持超时、中断、公平策略——这些交给 java.util.concurrent 就行
  • 若仅需 FIFO + 固定容量,用 new ArrayBlockingQueue(1024) 是最稳选择;手写只适合学习或极简嵌入场景
  • 注意:手写版无法像 LinkedBlockingQueue 那样动态扩容,容量写死就得提前评估峰值吞吐

一个能跑通的最小内存队列示例长什么样?

下面这个类去掉日志和注释,实际不到 30 行,但覆盖了核心逻辑:线程安全、阻塞、容量控制、异常防护。

public class SimpleMemoryMQ<T> {
    private final Queue<T> queue;
    private final int capacity;

    public SimpleMemoryMQ(int capacity) {
        this.queue = new LinkedList<>();
        this.capacity = capacity;
    }

    public void put(T item) throws InterruptedException {
        synchronized (queue) {
            while (queue.size() == capacity) {
                queue.wait(); // 等待有空位
            }
            queue.offer(item);
            queue.notifyAll(); // 唤醒可能等待取数据的消费者
        }
    }

    public T take() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                queue.wait(); // 等待有数据
            }
            T item = queue.poll();
            queue.notifyAll(); // 唤醒可能等待入队的生产者
            return item;
        }
    }
}

注意点:

  • queue 是锁对象,不是 this——避免外部同步干扰内部逻辑
  • 没做 null 检查,实际使用中 put(null) 会导致后续 take() 返回 null,容易引发 NPE,建议加 Objects.requireNonNull(item)
  • 没处理中断,真实场景应检查 Thread.interrupted() 并抛 InterruptedException

上线前最容易被忽略的三个细节

不是语法错,而是运行时才暴露的问题:

  • 忘记在 catch (InterruptedException e) 后恢复中断状态:Thread.currentThread().interrupt()——否则上层无法感知中断意图
  • ArrayList 替代 LinkedList 当底层容器:虽然都实现 Queue,但 ArrayList.remove(0) 是 O(n),消费性能断崖下跌
  • 多线程反复 put/take 时没压测边界:比如容量为 1 的队列,在高并发下 notifyAll() 唤醒大量线程争抢锁,CPU 利用率飙升但吞吐反而下降——这时该换 LockSupport 或直接上 Disruptor

真要长期用,别卡在 wait/notify 这一层;但第一次写清楚它怎么动起来,是绕不开的一步。

今天带大家了解了的相关知识,希望对你有所帮助;关于文章的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~

资料下载
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>