Java中生产者消费者模式的三种实现方法
引言
生产者消费者模型是并发编程中的基础问题之一,描述了两类线程如何协同工作:生产者生成数据并放入共享缓冲区,消费者从该缓冲区取出数据进行处理。当缓冲区满时,生产者必须等待;当缓冲区空时,消费者需要阻塞,直到有新数据可用。
在 Java 并发体系中,解决此类问题主要有以下三种方式:
- 利用 Object 类的 wait() 和 notify()/notifyAll() 实现线程通信
- 借助 ReentrantLock 与 Condition 接口完成精确唤醒
- 直接使用线程安全的 BlockingQueue 队列结构
本文将围绕这三种机制分别展开,并通过代码示例展示其具体应用。
1. 基于 synchronized 与 wait/notify 的实现
Java 中每个对象都拥有一个内置监视器(monitor),可通过 synchronized 关键字获取对象锁。配合 wait()、notify() 和 notifyAll() 方法,可实现线程间的协作控制。
- wait():释放当前持有的对象锁,使线程进入等待状态,直至被其他线程唤醒。
- notify():随机唤醒一个正在等待该对象锁的线程。
- notifyAll():唤醒所有等待此对象锁的线程,由调度器决定哪个线程继续执行。
注意:这些方法必须在同步上下文中调用,否则会抛出 IllegalMonitorStateException 异常。
public class PCBySynchronized {
private int itemCount = 0;
private static final int MAX_CAPACITY = 10;
public static void main(String[] args) {
PCBySynchronized instance = new PCBySynchronized();
for (int i = 0; i < 20; i++) {
new Thread(instance.new ProducerTask()).start();
new Thread(instance.new ConsumerTask()).start();
}
}
class ProducerTask implements Runnable {
@Override
public void run() {
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
synchronized (this) {
while (itemCount == MAX_CAPACITY) {
try {
System.out.println("缓冲区已满,生产者等待...");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
itemCount++;
System.out.println("生产完成,当前物品数量:" + itemCount);
notifyAll(); // 通知所有等待线程
}
}
}
class ConsumerTask implements Runnable {
@Override
public void run() {
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
synchronized (this) {
while (itemCount == 0) {
try {
System.out.println("缓冲区为空,消费者等待...");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
itemCount--;
System.out.println("消费完成,当前物品数量:" + itemCount);
notifyAll();
}
}
}
}
2. 使用 ReentrantLock 与 Condition 的方式
相比隐式锁 synchronized,ReentrantLock 提供了更灵活的显式锁控制。结合 Condition 接口,可以创建多个等待条件,实现精准唤醒特定线程组,避免不必要的线程竞争。
- lock():手动加锁
- unlock():必须放在 finally 块中释放锁
- await():使当前线程等待并释放锁
- signal():唤醒一个等待线程
- signalAll():唤醒所有等待线程
import java.util.concurrent.locks.*;
public class PCByLockCondition {
private int itemQuantity = 0;
private static final int BUFFER_LIMIT = 10;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
PCByLockCondition manager = new PCByLockCondition();
for (int i = 0; i < 50; i++) {
new Thread(manager.new ProducerJob()).start();
new Thread(manager.new ConsumerJob()).start();
}
}
class ProducerJob implements Runnable {
@Override
public void run() {
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
lock.lock();
try {
while (itemQuantity == BUFFER_LIMIT) {
System.out.println("队列已满,生产暂停");
notFull.await();
}
itemQuantity++;
System.out.println("新增一项,总量为:" + itemQuantity);
notEmpty.signal(); // 通知消费者
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class ConsumerJob implements Runnable {
@Override
public void run() {
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
lock.lock();
try {
while (itemQuantity == 0) {
System.out.println("无货可取,消费等待");
notEmpty.await();
}
itemQuantity--;
System.out.println("消费一项,剩余:" + itemQuantity);
notFull.signal(); // 通知生产者
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
3. 利用 BlockingQueue 简化实现
Java 并发包提供了多种阻塞队列实现,如 ArrayBlockingQueue、LinkedBlockingQueue 等。它们内部已封装了线程安全逻辑和等待/通知机制,极大简化了生产者消费者的编码复杂度。
常用方法包括:
- put(item):插入元素,若队列满则阻塞
- take():取出元素,若队列空则阻塞
import java.util.concurrent.*;
public class PCByBlockingQueue {
private static final int CAPACITY = 10;
private final BlockingQueue<String> buffer = new ArrayBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
PCByBlockingQueue system = new PCByBlockingQueue();
for (int i = 0; i < 20; i++) {
new Thread(system.new ProducerUnit()).start();
new Thread(system.new ConsumerUnit()).start();
}
}
class ProducerUnit implements Runnable {
@Override
public void run() {
try {
Thread.sleep(300);
buffer.put("product"); // 自动阻塞
System.out.println("生产成功,当前数量:" + buffer.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class ConsumerUnit implements Runnable {
@Override
public void run() {
try {
Thread.sleep(300);
buffer.take(); // 自动阻塞
System.out.println("消费成功,当前数量:" + buffer.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}