文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

JUC之阻塞队列BlockingQueue竟然有8种类型?

2024-12-03 16:20

关注

下面是Queue类的继承关系图:


Queue


Queue:队列的上层接口,提供了插入、删除、获取元素这3种类型的方法,而且对每一种类型都提供了两种方式,先来看看插入方法:

add和offer作为插入方法的唯一不同就在于队列满了之后的处理方式。add抛出异常,而offer返回false。

再来看看删除和获取元素方法(和插入方法类似):

如果队列是空,remove和element方法会抛出异常,而poll和peek返回null。

Deque

Deque在Queue的基础上,增加了以下几个方法:

可以发现,其实很多方法的效果都是一样的,只不过名字不同。比如Deque为了实现Stack的语义,定义了push和pop两个方法。

BlockingQueue阻塞队列

BlockingQueue(阻塞队列),在Queue的基础上实现了阻塞等待的功能。它是JDK 1.5中加入的接口,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。

BlockingQueue ,是java.util.concurrent 包提供的用于解决并发 生产者 — 消费者 问题的最有用的类,很好的解决了多线程中,如何高效安全“传输”数据的问题。它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且 BlockingQueue 提供类超时 return null 的机制,在许多生产场景里都可以看到这个工具的身影。

总体认识

一般我们用到的阻塞队列有哪些?可以通过下面一个类图来总体看下:


可以看到 BlockingQueue 是一个接口,继承它的另外还有两个接口 BlockingDeque(双端队列)、TransferQueue(两个线程之间传递元素)。

阻塞队列的成员如下:


队列类型

  1. 无限队列(unbounded queue)— 几乎可以无限增长
  2. 有限队列(bounded queue)— 定义了最大容量

队列数据结构

队列实质就是一种存储数据的结构


常见的5种阻塞队列

BlockingQueue API

BlockingQueue的核心方法

  1. public interface BlockingQueue extends Queue { 
  2.  
  3.     //将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。 
  4.     boolean add(E e); 
  5.  
  6.     //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。 
  7.     boolean offer(E e); 
  8.  
  9.     //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。 
  10.     void put(E e) throws InterruptedException; 
  11.  
  12.     //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false
  13.     boolean offer(E e, long timeout, TimeUnit unit) 
  14.         throws InterruptedException; 
  15.  
  16.     //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。 
  17.     E take() throws InterruptedException; 
  18.  
  19.     //在给定的时间里,从队列中获取值,时间到了直接调用普通的poll方法,为null则直接返回null。 
  20.     E poll(long timeout, TimeUnit unit) 
  21.         throws InterruptedException; 
  22.  
  23.     //获取队列中剩余的空间。 
  24.     int remainingCapacity(); 
  25.  
  26.     //从队列中移除指定的值。 
  27.     boolean remove(Object o); 
  28.  
  29.     //判断队列中是否拥有该值。 
  30.     public boolean contains(Object o); 
  31.  
  32.     //将队列中值,全部移除,并发设置到给定的集合中。 
  33.     int drainTo(Collection c); 
  34.  
  35.     //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。 
  36.     int drainTo(Collection c, int maxElements); 

 BlockingQueue 接口的所有方法可以分为两大类:负责向队列添加元素的方法和 检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

添加元素


检索元素 


BlockingQueue最重要的也就是关于阻塞等待的几个方法,而这几个方法正好可以用来实现生产-消费的模型。

ArrayBlockingQueue

ArrayBlockingQueue 由数组支持的有界阻塞队列,队列基于数组实现,容量大小在创建 ArrayBlockingQueue 对象时已经定义好。 此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁,默认采用非公平锁。

ArrayBlockingQueue 内部由ReentrantLock来实现线程安全,由Condition的await和signal来实现等待唤醒的功能。它的数据结构是数组,准确的说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从0继续开始。

深入理解ArrayBlockingQueue可以阅读《阻塞队列 — ArrayBlockingQueue源码分析》


LinkedBlockingQueue

LinkedBlockingQueue 由链表节点支持的可选有界队列,是一个基于链表的无界队列(理论上有界),队列按照先进先出的顺序进行排序。LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默认为 Integer.MAX_VALUE,也就是无界队列。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

LinkedBlockingQueue 内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。

向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里不是说不会加锁保证线程安全],因此它可以增长到非常大的容量。

使用无限 BlockingQueue 设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息。否则,内存可能会填满,然后就会得到一个 OutOfMemory 异常。

深入理解LinkedBlockingQueue可以阅读《阻塞队列 — LinkedBlockingQueue源码分析》


PriorityBlockingQueue

PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加锁)、无界、读阻塞的队列,底层采用的堆结构实现(二叉树),默认是小根堆,最小的或者最大的元素会一直置顶,每次获取都取最顶端的数据。可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

深入理解PriorityBlockingQueue可以阅读《阻塞队列 —PriorityBlockingQueue源码分析》


DelayQueue

DelayQueue 由优先级支持的、基于时间的调度队列,内部使用非线程安全的优先队列(PriorityQueue)实现,而无界队列基于数组的扩容实现。在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

深入理解DelayQueue可以阅读《阻塞队列 — DelayQueue源码分析》

SynchronousQueue

SynchronousQueue 一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能继续添加元素。支持公平锁和非公平锁2种策略来访问队列。默认是采用非公平性策略访问队列。公平性策略底层使用了类似队列的数据结构,而非公平策略底层使用了类似栈的数据结构。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

深入理解SynchronousQueue可以阅读《阻塞队列 — SynchronousQueue源码分析》


LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞传输队列,它是一个很多队列的结合体(ConcurrentLinkedQueue,LinkedBlockingQueue,SynchronousQueue),在除了有基本阻塞队列的功能(但是这个阻塞队列没有使用锁)之外;队列实现了TransferQueue接口重写了transfer 和 tryTransfer 方法,这组方法和SynchronousQueue公平模式的队列类似,具有匹配的功能。

深入理解LinkedTransferQueue可以阅读《阻塞队列 — LinkedTransferQueue源码分析》


LinkedBlockingDeque

LinkedBlockingDeque 一个由于链表结构组成的双向阻塞队列,队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争对多降到一半。

深入理解LinkedBlockingDeque可以阅读《阻塞队列 — LinkedBlockingDeque源码分析》


DelayedWorkQueue

DelayedWorkQueue 也是一种设计为定时任务的延迟队列,其实现原理和DelayQueue 基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用。

深入理解DelayedWorkQueue可以阅读《阻塞队列 — DelayedWorkQueue源码分析》


对比分析

LinkedBlockingQueue与ArrayBlockingQueue区别

LinkedTransferQueue和SynchronousQueue(公平模式)区别

LinkedBlockingDeque与LinkedList区别

  1. package com.niuh.deque; 
  2.  
  3. import java.util.Iterator; 
  4. import java.util.LinkedList; 
  5. import java.util.Queue; 
  6. import java.util.concurrent.LinkedBlockingDeque; 
  7.  
  8.  
  9. public class LinkedBlockingDequeRunner { 
  10.  
  11.     // TODO: queue是LinkedList对象时,程序会出错。 
  12.     // private static Queue queue = new LinkedList(); 
  13.     private static Queue queue = new LinkedBlockingDeque(); 
  14.  
  15.     public static void main(String[] args) { 
  16.  
  17.         // 同时启动两个线程对queue进行操作! 
  18.         new MyThread("A").start(); 
  19.         new MyThread("B").start(); 
  20.     } 
  21.  
  22.     private static void printAll() { 
  23.         String value; 
  24.         Iterator iter = queue.iterator(); 
  25.         while (iter.hasNext()) { 
  26.             value = (String) iter.next(); 
  27.             System.out.print(value + ", "); 
  28.         } 
  29.         System.out.println(); 
  30.     } 
  31.  
  32.     private static class MyThread extends Thread { 
  33.         MyThread(String name) { 
  34.             super(name); 
  35.         } 
  36.  
  37.         @Override 
  38.         public void run() { 
  39.             int i = 0; 
  40.             while (i++ < 6) { 
  41.                 // “线程名” + "-" + "序号" 
  42.                 String val = Thread.currentThread().getName() + i; 
  43.                 queue.add(val); 
  44.                 // 通过“Iterator”遍历queue。 
  45.                 printAll(); 
  46.             } 
  47.         } 
  48.     } 

 输出结果: 

  1. A1,  
  2. A1, A2,  
  3. A1, A2, A3,  
  4. A1, A2, A3, A4,  
  5. A1, A2, A3, A4, A5,  
  6. A1, A2, A3, A4, A5, A6,  
  7. A1, A2, A3, A4, A5, A6, B1,  
  8. A1, A2, A3, A4, A5, A6, B1, B2,  
  9. A1, A2, A3, A4, A5, A6, B1, B2, B3,  
  10. A1, A2, A3, A4, A5, A6, B1, B2, B3, B4,  
  11. A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5,  
  12. A1, A2, A3, A4, A5, A6, B1, B2, B3, B4, B5, B6,  

 结果说明:示例程序中,启动两个线程(线程A和线程B)分别对LinkedBlockingDeque进行操作:

以线程A而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingDeque中;

接着,遍历并输出LinkedBlockingDeque中的全部元素。

线程B的操作和线程A一样,只不过线程B的名字和线程A的名字不同。

当queue是LinkedBlockingDeque对象时,程序能正常运行。

如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。

BlockingQueue应用

多线程生产者-消费者示例

接下来我们创建一个由两部分组成的程序: 生产者 ( Producer ) 和消费者 ( Consumer ) 。

生产者(Producer)

生产者将生成一个 0 到 100 的随机数(十全大补丸的编号),并将该数字放在 BlockingQueue 中。我们将创建 16 个线程(潘金莲)用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。

需要记住的重要一点是,我们需要阻止我们的消费者线程无限期地等待元素出现在队列中。

从生产者(潘金莲)向消费者(武大郎)发出信号的好方法是,不需要处理消息,而是发送称为毒 ( poison ) 丸 ( pill ) 的特殊消息。 我们需要发送尽可能多的毒 ( poison ) 丸 ( pill ) ,因为我们有消费者(武大郎)。然后当消费者从队列中获取特殊的毒 ( poison ) 丸 ( pill )消息时,它将优雅地完成执行。

以下生产者的代码:

  1. package com.niuh.queue; 
  2.  
  3. import lombok.extern.slf4j.Slf4j; 
  4.  
  5. import java.util.concurrent.BlockingQueue; 
  6. import java.util.concurrent.ThreadLocalRandom; 
  7.  
  8.  
  9. @Slf4j 
  10. public class NumbersProducer implements Runnable { 
  11.     private BlockingQueue<Integer> numbersQueue; 
  12.     private final int poisonPill; 
  13.     private final int poisonPillPerProducer; 
  14.  
  15.     public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) { 
  16.         this.numbersQueue = numbersQueue; 
  17.         this.poisonPill = poisonPill; 
  18.         this.poisonPillPerProducer = poisonPillPerProducer; 
  19.     } 
  20.  
  21.     public void run() { 
  22.         try { 
  23.             generateNumbers(); 
  24.         } catch (InterruptedException e) { 
  25.             Thread.currentThread().interrupt(); 
  26.         } 
  27.     } 
  28.  
  29.     private void generateNumbers() throws InterruptedException { 
  30.         for (int i = 0; i < 100; i++) { 
  31.             numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); 
  32.             log.info("潘金莲-{}号,给武大郎的泡药!", Thread.currentThread().getId()); 
  33.         } 
  34.  
  35.          
  36.  
  37.         for (int j = 0; j < poisonPillPerProducer; j++) { 
  38.             numbersQueue.put(poisonPill); 
  39.             log.info("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!", Thread.currentThread().getId(), j + 1); 
  40.         } 
  41.     } 

 我们的生成器构造函数将 BlockingQueue 作为参数,用于协调生产者和使用者之间的处理,我们看到方法generateNumbers() 将 100 个元素(生产100副药给武大郎吃)放入队列中。它还需要有毒 ( poison ) 丸 ( pill ) (潘金莲给武大郎下毒)消息,以便知道在执行完成时放入队列的消息类型。该消息需要将 poisonPillPerProducer 次放入队列中。

消费者(Consumer)

每个消费者将使用 take() 方法从 BlockingQueue 获取一个元素,因此它将阻塞,直到队列中有一个元素。从队列中取出一个 Integer 后,它会检查该消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金莲有没有下毒) ,如果是,则完成一个线程的执行。否则,它将在标准输出上打印出结果以及当前线程的名称。

  1. package com.niuh.queue; 
  2.  
  3. import lombok.extern.slf4j.Slf4j; 
  4.  
  5. import java.util.concurrent.BlockingQueue; 
  6.  
  7.  
  8. @Slf4j 
  9. public class NumbersConsumer implements Runnable { 
  10.     private BlockingQueue<Integer> queue; 
  11.     private final int poisonPill; 
  12.  
  13.     public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) { 
  14.         this.queue = queue; 
  15.         this.poisonPill = poisonPill; 
  16.     } 
  17.  
  18.     public void run() { 
  19.         try { 
  20.             while (true) { 
  21.                 Integer number = queue.take(); 
  22.                 if (number.equals(poisonPill)) { 
  23.                     return
  24.                 } 
  25.                 log.info("武大郎-{}号,喝药-编号:{}", Thread.currentThread().getId(), number); 
  26.             } 
  27.         } catch (InterruptedException e) { 
  28.             Thread.currentThread().interrupt(); 
  29.         } 
  30.     } 

 需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显示同步。

验证测试

既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。 我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:

  1. package com.niuh.queue; 
  2.  
  3. import java.util.concurrent.ArrayBlockingQueue; 
  4. import java.util.concurrent.BlockingQueue; 
  5. import java.util.concurrent.LinkedBlockingQueue; 
  6.  
  7.  
  8. public class Main { 
  9.  
  10.     public static void main(String[] args) { 
  11.         int BOUND = 10; 
  12.         int N_PRODUCERS = 16; 
  13.         int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); //=8 
  14.         int poisonPill = Integer.MAX_VALUE; 
  15.         int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; // =0 
  16.         int mod = N_CONSUMERS % N_PRODUCERS;//0+8=8 
  17.  
  18.         BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND); 
  19.  
  20.         //潘金莲给武大郎熬药 
  21.         for (int i = 1; i < N_PRODUCERS; i++) { 
  22.             new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); 
  23.         } 
  24.  
  25.         //武大郎开始喝药 
  26.         for (int j = 0; j < N_CONSUMERS; j++) { 
  27.             new Thread(new NumbersConsumer(queue, poisonPill)).start(); 
  28.         } 
  29.  
  30.         try { 
  31.             Thread.sleep(5000); 
  32.         } catch (InterruptedException e) { 
  33.             e.printStackTrace(); 
  34.         } 
  35.  
  36.         //潘金莲开始投毒,武大郎喝完毒药GG 
  37.         new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); 
  38.     } 
  39.  

 BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。

PS:以上代码提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

 

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯