1. 前言
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空
- 当队列满时,存储元素的线程会等待队列可用
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
2. 什么是生产者-消费者模型
生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。
3. 生产者-消费者模型的意义
- 解耦合
- 削峰填谷
3.1 解耦合
两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。
假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。
不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。
相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。
服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。
3.2 削峰填谷
当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。
如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。
4. 如何使用Java标准库提供的阻塞队列
当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。
因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。
我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。
public class Demo1 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingQueue<>(); queue.put("123"); queue.put("234"); queue.put("345"); queue.put("456"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); }}
这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。
5. 自己实现一个阻塞队列
阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。
5.1 实现出循环队列
队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。
当 tail == head 的时候有两种情况:
- 队列中没有数据的时候
- 队列满了的时候
为了区分这两种情况,我们可以使用两种方法:
- 浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0
- 定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了
class MyQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) { //当添加数据的时候需要判断队列的容量是否已经满了 if(size == data.length) return; data[tail++] = str; size++; if(tail == data.length) tail = 0; } public String take() { //读取数据的时候需要判断队列是否为空 if(size == 0) return null; String ret = data[head++]; size--; if(head == data.length) head = 0; return ret; }}
5.2 实现阻塞队列
阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。
5.2.1 加锁
当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) { synchronized (this) { if(size == data.length) return; data[tail++] = str; size++; if(tail == data.length) tail = 0; } } public String take() { synchronized (this) { if(size == 0) return null; String ret = data[head++]; size--; if(head == data.length) head = 0; return ret; } }}
5.2.2 进行阻塞操作
当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { if(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //这个 notify 用来唤醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { if(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //这个 notify 用来唤醒 put 操作的等待 this.notify(); return ret; } }}
5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题
当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。
- 当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;
- 当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。
为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { while(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //这个 notify 用来唤醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //这个 notify 用来唤醒 put 操作的等待 this.notify(); return ret; } }}
5.2.4 解决因指令重排序造成的问题
因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。
class MyBlockingQueue { private final String[] data = new String[1000]; private volatile int size; private volatile int head = 0; private volatile int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { while(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //这个 notify 用来唤醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //这个 notify 用来唤醒 put 操作的等待 this.notify(); return ret; } }}
测试实现的阻塞队列
public class Demo4 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread t1 = new Thread(() -> { while(true) { try { System.out.println("消费元素" + queue.take()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { int count = 1; while(true) { try { queue.put(count + ""); System.out.println("生产元素" + count); count++; Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); }}
让生产速度较慢,使得读取操作阻塞等待插入数据才执行。
public class Demo4 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread t1 = new Thread(() -> { while(true) { try { System.out.println("消费元素" + queue.take()); Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { int count = 1; while(true) { try { queue.put(count + ""); System.out.println("生产元素" + count); count++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); }}
让生产 put 操作进入阻塞等待状态。
来源地址:https://blog.csdn.net/m0_73888323/article/details/132862839