当前位置:首页 > 技术 > 正文内容

Java中生产者消费者模式的三种实现方法

访客 技术 2026年6月20日 1

引言

生产者消费者模型是并发编程中的基础问题之一,描述了两类线程如何协同工作:生产者生成数据并放入共享缓冲区,消费者从该缓冲区取出数据进行处理。当缓冲区满时,生产者必须等待;当缓冲区空时,消费者需要阻塞,直到有新数据可用。

在 Java 并发体系中,解决此类问题主要有以下三种方式:

  • 利用 Object 类的 wait() 和 notify()/notifyAll() 实现线程通信
  • 借助 ReentrantLock 与 Condition 接口完成精确唤醒
  • 直接使用线程安全的 BlockingQueue 队列结构

本文将围绕这三种机制分别展开,并通过代码示例展示其具体应用。

1. 基于 synchronized 与 wait/notify 的实现

Java 中每个对象都拥有一个内置监视器(monitor),可通过 synchronized 关键字获取对象锁。配合 wait()notify()notifyAll() 方法,可实现线程间的协作控制。

  • wait():释放当前持有的对象锁,使线程进入等待状态,直至被其他线程唤醒。
  • notify():随机唤醒一个正在等待该对象锁的线程。
  • notifyAll():唤醒所有等待此对象锁的线程,由调度器决定哪个线程继续执行。

注意:这些方法必须在同步上下文中调用,否则会抛出 IllegalMonitorStateException 异常。

public class PCBySynchronized {
    private int itemCount = 0;
    private static final int MAX_CAPACITY = 10;

    public static void main(String[] args) {
        PCBySynchronized instance = new PCBySynchronized();
        for (int i = 0; i < 20; i++) {
            new Thread(instance.new ProducerTask()).start();
            new Thread(instance.new ConsumerTask()).start();
        }
    }

    class ProducerTask implements Runnable {
        @Override
        public void run() {
            try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

            synchronized (this) {
                while (itemCount == MAX_CAPACITY) {
                    try {
                        System.out.println("缓冲区已满,生产者等待...");
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                itemCount++;
                System.out.println("生产完成,当前物品数量:" + itemCount);
                notifyAll(); // 通知所有等待线程
            }
        }
    }

    class ConsumerTask implements Runnable {
        @Override
        public void run() {
            try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

            synchronized (this) {
                while (itemCount == 0) {
                    try {
                        System.out.println("缓冲区为空,消费者等待...");
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                itemCount--;
                System.out.println("消费完成,当前物品数量:" + itemCount);
                notifyAll();
            }
        }
    }
}

2. 使用 ReentrantLock 与 Condition 的方式

相比隐式锁 synchronizedReentrantLock 提供了更灵活的显式锁控制。结合 Condition 接口,可以创建多个等待条件,实现精准唤醒特定线程组,避免不必要的线程竞争。

  • lock():手动加锁
  • unlock():必须放在 finally 块中释放锁
  • await():使当前线程等待并释放锁
  • signal():唤醒一个等待线程
  • signalAll():唤醒所有等待线程
import java.util.concurrent.locks.*;

public class PCByLockCondition {
    private int itemQuantity = 0;
    private static final int BUFFER_LIMIT = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        PCByLockCondition manager = new PCByLockCondition();
        for (int i = 0; i < 50; i++) {
            new Thread(manager.new ProducerJob()).start();
            new Thread(manager.new ConsumerJob()).start();
        }
    }

    class ProducerJob implements Runnable {
        @Override
        public void run() {
            try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

            lock.lock();
            try {
                while (itemQuantity == BUFFER_LIMIT) {
                    System.out.println("队列已满,生产暂停");
                    notFull.await();
                }
                itemQuantity++;
                System.out.println("新增一项,总量为:" + itemQuantity);
                notEmpty.signal(); // 通知消费者
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    class ConsumerJob implements Runnable {
        @Override
        public void run() {
            try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

            lock.lock();
            try {
                while (itemQuantity == 0) {
                    System.out.println("无货可取,消费等待");
                    notEmpty.await();
                }
                itemQuantity--;
                System.out.println("消费一项,剩余:" + itemQuantity);
                notFull.signal(); // 通知生产者
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

3. 利用 BlockingQueue 简化实现

Java 并发包提供了多种阻塞队列实现,如 ArrayBlockingQueueLinkedBlockingQueue 等。它们内部已封装了线程安全逻辑和等待/通知机制,极大简化了生产者消费者的编码复杂度。

常用方法包括:

  • put(item):插入元素,若队列满则阻塞
  • take():取出元素,若队列空则阻塞
import java.util.concurrent.*;

public class PCByBlockingQueue {
    private static final int CAPACITY = 10;
    private final BlockingQueue<String> buffer = new ArrayBlockingQueue<>(CAPACITY);

    public static void main(String[] args) {
        PCByBlockingQueue system = new PCByBlockingQueue();
        for (int i = 0; i < 20; i++) {
            new Thread(system.new ProducerUnit()).start();
            new Thread(system.new ConsumerUnit()).start();
        }
    }

    class ProducerUnit implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(300);
                buffer.put("product"); // 自动阻塞
                System.out.println("生产成功,当前数量:" + buffer.size());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    class ConsumerUnit implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(300);
                buffer.take(); // 自动阻塞
                System.out.println("消费成功,当前数量:" + buffer.size());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
标签: Synchronized

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。