本篇文章给大家分享的是有关Java实现生产者消费者的两种方式分别是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
我在8年前去面试程序员的时候,一个不大的公司,里面的开发主管接待了我们,给我的题目就是写一段程序模拟生产者消费者问题,当时可把我难坏了,一下子感觉自己的知识储备竟然如此的匮乏。
而在我从事DBA工作之后,经常会有大批量并发的环境,有的需要排障,有的需要优化,在很多并发的场景中,发现生产者消费者问题可以模拟出很多实际中的问题,所以生产者消费者问题非常重要,也是我想不断改进和探索的一类问题。
引入仓库的必要性
要想使用程序来模拟,其实也不用花太多的时间,我们简单说说需要考虑的地方。首先生产者,消费者是两个实体对象,生产者生产物品,消费者消费物品,如果在生产者中定义生产的流程,在消费者中定义消费的流程,两个对象就需要彼此引用,依赖性太高,而且实际上性能也好不到哪里去,所以就需要一个缓冲器,一个中间对象,我们就叫做仓库吧,生产的物品推入仓库,消费的物品从仓库中取出,这样生产者和消费者就能够取消之间的引用,直接通过仓库引用来同步状态,降低耦合。
所以我们的一个初步设想就是生产者-->仓库<--消费者 这样的模式。
生产者消费者的几种类型和实现方式
当然生产者消费者问题有两种类型,一种就是使用某种机制来保护生产者和消费者之间的同步,另外一种和Linux中的管道思路相似。相对来说第一种类型的处理方式更为通用,大体分为三类具体的实现方式:
经典的wait(),notify()方法
await(),signal()方法
使用阻塞队列(BlockingQueue),比如LinkedBlockingQueue
通用的对象类
为了更快出成果,尽可能快速理解,我也参考了一些资料,下午下班前写了下面的程序。我就简单说明第一种和第二种吧。
因为实体类对象是通用的,我就不再重复列出了,有生产者Producer和消费者Consumer两个类。
生产者类
import com.jeanron.test1.Storage;
public class Producer extends Thread {
// 每次生产的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Producer(Storage storage) {
this.storage = storage;
}
public Producer(Storage storage, int num) {
this.storage = storage;
this.num = num;
}
// 线程run函数
public void run() {
produce(num);
}
// 调用仓库Storage的生产函数
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
消费者类
import com.jeanron.test1.Storage;
public class Consumer extends Thread
{
// 每次消费的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Consumer(Storage storage)
{
this.storage = storage;
}
// 构造函数,设置仓库
public Consumer(Storage storage,int num)
{
this.storage = storage;
this.num = num;
}
// 线程run函数
public void run()
{
consume(num);
}
// 调用仓库Storage的生产函数
public void consume(int num)
{
storage.consume(num);
}
// get/set方法
public int getNum()
{
return num;
}
public void setNum(int num)
{
this.num = num;
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
} 滴哟中
第一种实现方式
import java.util.LinkedList;
public class Storage
{
// 仓库最大存储量
private final int MAX_SIZE = 200;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
private static Storage storage=null;
public static Storage getInstance() {
if (storage == null) {
synchronized (Storage.class) {
if (storage == null) {
storage = new Storage();
}
}
}
return storage;
}
// 生产num个产品
public void produce(int num)
{
// 同步代码段
synchronized (list)
{
// 如果仓库剩余容量不足
while (list.size() + num > MAX_SIZE)
{
System.out.println(Thread.currentThread().getName()+"【生产者:要生产的产品数量】:" + num + "\t【库存量】:"
+ list.size() + "\t暂时不能执行生产任务!");
try
{
// 由于条件不满足,生产阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 生产条件满足情况下,生产num个产品
for (int i = 1; i <= num; ++i)
{
list.add(new Object());
}
System.out.println(Thread.currentThread().getName()+"【生产者:已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// 消费num个产品
public void consume(int num)
{
// 同步代码段
synchronized (list)
{
// 如果仓库存储量不足
while (list.size() < num)
{
System.out.println(Thread.currentThread().getName()+"【消费者:要消费的产品数量】:" + num + "\t【库存量】:"
+ list.size() + "\t暂时不能执行消费任务!");
try
{
// 由于条件不满足,消费阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 消费条件满足情况下,消费num个产品
for (int i = 1; i <= num; ++i)
{
list.remove();
}
System.out.println(Thread.currentThread().getName()+"【消费者:已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// get/set方法
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
第二种实现方式
第二种使用await和signal的方式实现,我们使用Lock来产生两个Condition对象来管理任务间的通信,一个重要的参考点就是仓库满,仓库空,这类方式最后必须有try-finally的子句,保证能够释放锁。
package com.jeanron.test2;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.jeanron.test2.Storage;
public class Storage
{
// 仓库最大存储量
private final int MAX_SIZE = 200;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
private static Storage storage=null;
public static Storage getInstance() {
if (storage == null) {
synchronized (Storage.class) {
if (storage == null) {
storage = new Storage();
}
}
}
return storage;
}
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = lock.newCondition();
// 仓库空的条件变量
private final Condition empty = lock.newCondition();
// 生产num个产品
public void produce(int num)
{
// 获得锁
lock.lock();
// 如果仓库剩余容量不足
while (list.size() + num > MAX_SIZE)
{
System.out.println(Thread.currentThread().getName()+"【要生产的产品数量】:" + num + "\t【库存量】:" + list.size()
+ "\t暂时不能执行生产任务!");
try
{
// 由于条件不满足,生产阻塞
full.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 生产条件满足情况下,生产num个产品
for (int i = 1; i <= num; ++i)
{
list.add(new Object());
}
System.out.println(Thread.currentThread().getName()+"【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());
// 唤醒其他所有线程
full.signalAll();
empty.signalAll();
// 释放锁
lock.unlock();
}
// 消费num个产品
public void consume(int num)
{
// 获得锁
lock.lock();
// 如果仓库存储量不足
while (list.size() < num)
{
System.out.println(Thread.currentThread().getName()+"【要消费的产品数量】:" + num + "\t【库存量】:" + list.size()
+ "\t暂时不能执行生产任务!");
try
{
// 由于条件不满足,消费阻塞
empty.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 消费条件满足情况下,消费num个产品
for (int i = 1; i <= num; ++i)
{
list.remove();
}
System.out.println(Thread.currentThread().getName()+"【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());
// 唤醒其他所有线程
full.signalAll();
empty.signalAll();
// 释放锁
lock.unlock();
}
// set/get方法
public int getMAX_SIZE()
{
return MAX_SIZE;
}
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
测试类
测试类算是用用,我们直接使用多线程调度器Executor来做。对于生产者消费者使用随机数的方式来初始化物品数,仓库使用单例模式。
package com.jeanron.main;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.jeanron.entity1.Consumer;
import com.jeanron.entity1.Producer;
import com.jeanron.test1.Storage;
public class Test1
{
public static void main(String[] args)
{
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
exec.execute(new Producer(Storage.getInstance(),new Random().nextInt(10)*10));
}
for(int i=0;i<10;i++){
exec.execute(new Consumer(Storage.getInstance(),new Random().nextInt(10)*10));
}
}
}
测试结果
生产者是5个线程,消费者是10个线程,我们来看看调用之后的信息,这个可以简单分析下日志即可,不具有典型性和代表性。
第一类实现方式的输出如下:
pool-1-thread-1【生产者:已经生产产品数】:10 【现仓储量为】:10
pool-1-thread-3【生产者:已经生产产品数】:50 【现仓储量为】:60
pool-1-thread-2【生产者:已经生产产品数】:50 【现仓储量为】:110
pool-1-thread-5【生产者:已经生产产品数】:0 【现仓储量为】:110
pool-1-thread-4【生产者:已经生产产品数】:80 【现仓储量为】:190
pool-1-thread-4【消费者:已经消费产品数】:40 【现仓储量为】:150
pool-1-thread-1【消费者:已经消费产品数】:10 【现仓储量为】:140
pool-1-thread-3【消费者:已经消费产品数】:20 【现仓储量为】:120
pool-1-thread-2【消费者:已经消费产品数】:10 【现仓储量为】:110
pool-1-thread-5【消费者:已经消费产品数】:40 【现仓储量为】:70
pool-1-thread-6【消费者:已经消费产品数】:30 【现仓储量为】:40
pool-1-thread-4【消费者:要消费的产品数量】:90 【库存量】:40 暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50 【库存量】:40 暂时不能执行消费任务!
pool-1-thread-3【消费者:要消费的产品数量】:90 【库存量】:40 暂时不能执行消费任务!
pool-1-thread-7【消费者:已经消费产品数】:20 【现仓储量为】:20
pool-1-thread-3【消费者:要消费的产品数量】:90 【库存量】:20 暂时不能执行消费任务!
pool-1-thread-1【消费者:要消费的产品数量】:50 【库存量】:20 暂时不能执行消费任务!
pool-1-thread-4【消费者:要消费的产品数量】:90 【库存量】:20 暂时不能执行消费任务!
第二类实现方式的输出如下:
pool-1-thread-1【已经生产产品数】:16 【现仓储量为】:16
pool-1-thread-3【已经生产产品数】:60 【现仓储量为】:76
pool-1-thread-2【已经生产产品数】:67 【现仓储量为】:143
pool-1-thread-5【要生产的产品数量】:90 【库存量】:143 暂时不能执行生产任务!
pool-1-thread-4【已经生产产品数】:27 【现仓储量为】:170
pool-1-thread-1【已经消费产品数】:10 【现仓储量为】:160
pool-1-thread-2【已经消费产品数】:23 【现仓储量为】:137
pool-1-thread-6【已经消费产品数】:18 【现仓储量为】:119
pool-1-thread-5【要生产的产品数量】:90 【库存量】:119 暂时不能执行生产任务!
pool-1-thread-3【已经消费产品数】:97 【现仓储量为】:22
pool-1-thread-1【要消费的产品数量】:57 【库存量】:22 暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62 【库存量】:22 暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35 【库存量】:22 暂时不能执行生产任务!
pool-1-thread-4【已经消费产品数】:17 【现仓储量为】:5
pool-1-thread-2【已经消费产品数】:1 【现仓储量为】:4
pool-1-thread-7【要消费的产品数量】:93 【库存量】:4 暂时不能执行生产任务!
pool-1-thread-2【已经消费产品数】:4 【现仓储量为】:0
pool-1-thread-5【已经生产产品数】:90 【现仓储量为】:90
pool-1-thread-1【已经消费产品数】:57 【现仓储量为】:33
pool-1-thread-6【要消费的产品数量】:62 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38 【库存量】:33 暂时不能执行生产任务!
pool-1-thread-8【已经消费产品数】:33 【现仓储量为】:0
pool-1-thread-12【要消费的产品数量】:21 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-9【要消费的产品数量】:24 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-11【要消费的产品数量】:54 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-13【要消费的产品数量】:58 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-6【要消费的产品数量】:62 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-3【要消费的产品数量】:35 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-4【要消费的产品数量】:45 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-7【要消费的产品数量】:93 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-10【要消费的产品数量】:81 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-2【要消费的产品数量】:65 【库存量】:0 暂时不能执行生产任务!
pool-1-thread-5【要消费的产品数量】:38 【库存量】:0 暂时不能执行生产任务!
以上就是Java实现生产者消费者的两种方式分别是什么,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注编程网行业资讯频道。