文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

阻塞队列—PriorityBlockingQueue源码分析

2024-12-03 16:37

关注

 前言


PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加锁)、无界、读阻塞的队列,底层采用的堆结构实现(二叉树),默认是小根堆,最小的或者最大的元素会一直置顶,每次获取都取最顶端的数据

队列创建

小根堆

  1. PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(); 

大根堆

  1. PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { 
  2.  @Override 
  3.  public int compare(Integer o1, Integer o2) { 
  4.   return o2 - o1; 
  5.  } 
  6. }); 

 应用场景

有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

我们来看一个具体例子,例子中定义了一个将要放入“优先阻塞队列”的任务类,并且定义了一个任务工场类和一个任务执行类,在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。

  1. package com.niuh.queue.priority; 
  2.  
  3. import java.util.ArrayList; 
  4. import java.util.List; 
  5. import java.util.Queue; 
  6. import java.util.Random; 
  7. import java.util.concurrent.ExecutorService; 
  8. import java.util.concurrent.Executors; 
  9. import java.util.concurrent.PriorityBlockingQueue; 
  10. import java.util.concurrent.TimeUnit; 
  11.  
  12.  
  13. public class PriorityBlockingQueueDemo { 
  14.  
  15.     public static void main(String[] args) throws Exception { 
  16.         Random random = new Random(47); 
  17.         ExecutorService exec = Executors.newCachedThreadPool(); 
  18.         PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); 
  19.         exec.execute(new PrioritizedTaskProducer(queue, exec)); // 这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的 
  20.         exec.execute(new PrioritizedTaskConsumer(queue)); // 步骤是同时进行的,因而输出结果并不一定是有序的 
  21.     } 
  22.  
  23. class PrioritizedTask implements Runnable, Comparable { 
  24.     private Random random = new Random(47); 
  25.     private static int counter = 0; 
  26.     private final int id = counter++; 
  27.     private final int priority; 
  28.  
  29.     protected static List sequence = new ArrayList<>(); 
  30.  
  31.     public PrioritizedTask(int priority) { 
  32.         this.priority = priority; 
  33.         sequence.add(this); 
  34.     } 
  35.  
  36.     @Override 
  37.     public int compareTo(PrioritizedTask o) { 
  38.         return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);  // 定义优先级计算方式 
  39.     } 
  40.  
  41.     @Override 
  42.     public void run() { 
  43.         try { 
  44.             TimeUnit.MILLISECONDS.sleep(random.nextInt(250)); 
  45.         } catch (InterruptedException e) { 
  46.         } 
  47.         System.out.println(this); 
  48.     } 
  49.  
  50.     @Override 
  51.     public String toString() { 
  52.         return String.format("[%1$-3d]", priority) + " Task " + id; 
  53.     } 
  54.  
  55.     public String summary() { 
  56.         return "(" + id + ": " + priority + ")"
  57.     } 
  58.  
  59.     public static class EndSentinel extends PrioritizedTask { 
  60.         private ExecutorService exec
  61.  
  62.         public EndSentinel(ExecutorService exec) { 
  63.             super(-1); 
  64.             this.exec = exec
  65.         } 
  66.  
  67.         @Override 
  68.         public void run() { 
  69.             int count = 0; 
  70.             for (PrioritizedTask pt : sequence) { 
  71.                 System.out.print(pt.summary()); 
  72.                 if (++count % 5 == 0) { 
  73.                     System.out.println(); 
  74.                 } 
  75.             } 
  76.             System.out.println(); 
  77.             System.out.println(this + " Calling shutdownNow()"); 
  78.             exec.shutdownNow(); 
  79.         } 
  80.     } 
  81.  
  82. class PrioritizedTaskProducer implements Runnable { 
  83.     private Random random = new Random(47); 
  84.     private Queue queue; 
  85.     private ExecutorService exec
  86.  
  87.     public PrioritizedTaskProducer(Queue queue, ExecutorService exec) { 
  88.         this.queue = queue; 
  89.         this.exec = exec
  90.     } 
  91.  
  92.     @Override 
  93.     public void run() { 
  94.         for (int i = 0; i < 20; i++) { 
  95.             queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加随机优先级的任务 
  96.             Thread.yield(); 
  97.         } 
  98.         try { 
  99.             for (int i = 0; i < 10; i++) { 
  100.                 TimeUnit.MILLISECONDS.sleep(250); 
  101.                 queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加优先级为10的任务 
  102.             } 
  103.             for (int i = 0; i < 10; i++) { 
  104.                 queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加优先级为1-10的任务 
  105.             } 
  106.             queue.add(new PrioritizedTask.EndSentinel(exec)); 
  107.         } catch (InterruptedException e) { 
  108.         } 
  109.         System.out.println("Finished PrioritizedTaskProducer"); 
  110.     } 
  111.  
  112. class PrioritizedTaskConsumer implements Runnable { 
  113.     private PriorityBlockingQueue queue; 
  114.  
  115.     public PrioritizedTaskConsumer(PriorityBlockingQueue queue) { 
  116.         this.queue = queue; 
  117.     } 
  118.  
  119.     @Override 
  120.     public void run() { 
  121.         try { 
  122.             while (!Thread.interrupted()) { 
  123.                 queue.take().run(); // 任务的消费者,从PriorityBlockingQueue中取出任务执行 
  124.             } 
  125.         } catch (InterruptedException e) { 
  126.         } 
  127.         System.out.println("Finished PrioritizedTaskConsumer"); 
  128.     } 

 工作原理

PriorityBlockingQueue 是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:

注意:

  1. 堆结构实际上是一种完全二叉树。关于二叉树可以查看 《树、二叉树、二叉搜索树的实现和特性》
  2. 堆又分为大顶堆和小顶堆 。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。关于二叉堆可以查看《堆和二叉堆的实现和特性》

源码分析

定义

PriorityBlockingQueue的类继承关系如下:

其包含的方法定义如下:

成员属性

从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。 

  1.  
  2. private static final int DEFAULT_INITIAL_CAPACITY = 11; 
  3.  
  4.  
  5. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 
  6.  
  7.  
  8. private transient Object[] queue; 
  9.  
  10.  
  11. private transient int size
  12.  
  13.  
  14. private transient Comparator comparator; 
  15.  
  16.  
  17. private final ReentrantLock lock; 
  18.  
  19.  
  20. private final Condition notEmpty; 

 构造函数 

  1.  
  2. public PriorityBlockingQueue() { 
  3.     this(DEFAULT_INITIAL_CAPACITY, null); 
  4.  
  5. public PriorityBlockingQueue(int initialCapacity) { 
  6.     this(initialCapacity, null); 
  7.  
  8. public PriorityBlockingQueue(int initialCapacity, 
  9.                              Comparator comparator) { 
  10.     if (initialCapacity < 1) 
  11.         throw new IllegalArgumentException(); 
  12.     this.lock = new ReentrantLock(); 
  13.     this.notEmpty = lock.newCondition(); 
  14.     this.comparator = comparator; 
  15.     this.queue = new Object[initialCapacity]; 
  16.  
  17. public PriorityBlockingQueue(Collection c) { 
  18.     this.lock = new ReentrantLock(); 
  19.     this.notEmpty = lock.newCondition(); 
  20.     boolean heapify = true; // true if not known to be in heap order 
  21.     boolean screen = true;  // true if must screen for nulls 
  22.     if (c instanceof SortedSet) { 
  23.         SortedSet ss = (SortedSet) c; 
  24.         this.comparator = (Comparator) ss.comparator(); 
  25.         heapify = false
  26.     } 
  27.     else if (c instanceof PriorityBlockingQueue) { 
  28.         PriorityBlockingQueue pq = 
  29.             (PriorityBlockingQueue) c; 
  30.         this.comparator = (Comparator) pq.comparator(); 
  31.         screen = false
  32.         if (pq.getClass() == PriorityBlockingQueue.class) // exact match 
  33.             heapify = false
  34.     } 
  35.     Object[] a = c.toArray(); 
  36.     int n = a.length; 
  37.     // If c.toArray incorrectly doesn't return Object[], copy it. 
  38.     if (a.getClass() != Object[].class) 
  39.         a = Arrays.copyOf(a, n, Object[].class); 
  40.     if (screen && (n == 1 || this.comparator != null)) { 
  41.         for (int i = 0; i < n; ++i) 
  42.             if (a[i] == null
  43.                 throw new NullPointerException(); 
  44.     } 
  45.     this.queue = a; 
  46.     this.size = n; 
  47.     if (heapify) 
  48.         heapify(); 

 入队方法

入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。

offer(E e)

  1. public void put(E e) { 
  2.     offer(e); // never need to block 
  3.  
  4. public boolean offer(E e) { 
  5.     //判断是否为空 
  6.     if (e == null
  7.         throw new NullPointerException(); 
  8.     //显示锁 
  9.     final ReentrantLock lock = this.lock; 
  10.     lock.lock(); 
  11.     //定义临时对象 
  12.     int n, cap; 
  13.     Object[] array; 
  14.     //判断数组是否满了 
  15.     while ((n = size) >= (cap = (array = queue).length)) 
  16.         //数组扩容 
  17.         tryGrow(array, cap); 
  18.     try { 
  19.         //拿到比较器 
  20.         Comparator cmp = comparator; 
  21.         //判断是否有自定义比较器 
  22.         if (cmp == null
  23.             //堆上浮 
  24.             siftUpComparable(n, e, array); 
  25.         else 
  26.             //使用自定义比较器进行堆上浮 
  27.             siftUpUsingComparator(n, e, array, cmp); 
  28.         //队列长度 +1 
  29.         size = n + 1; 
  30.         //唤醒休眠的出队线程 
  31.         notEmpty.signal(); 
  32.     } finally { 
  33.         //释放锁 
  34.         lock.unlock(); 
  35.     } 
  36.     return true

 siftUpComparable(int k, T x, Object[] array)

上浮调整比较器方法的实现

  1. private static  void siftUpComparable(int k, T x, Object[] array) { 
  2.         Comparable key = (Comparable) x; 
  3.         while (k > 0) { 
  4.          //无符号向左移,目的是找到放入位置的父节点 
  5.             int parent = (k - 1) >>> 1; 
  6.             //拿到父节点的值 
  7.             Object e = array[parent]; 
  8.             //比较是否大于该元素,不大于就没比较交换 
  9.             if (key.compareTo((T) e) >= 0) 
  10.                 break; 
  11.             //以下都是元素位置交换 
  12.             array[k] = e; 
  13.             k = parent; 
  14.         } 
  15.         array[k] = key
  16.     } 

 根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。

入队图解

例子:85 添加到二叉堆中(大顶堆)

  1. package com.niuh.queue.priority; 
  2.  
  3. import java.util.Comparator; 
  4. import java.util.concurrent.PriorityBlockingQueue; 
  5.  
  6.  
  7. public class TestPriorityBlockingQueue { 
  8.  
  9.     public static void main(String[] args) throws InterruptedException { 
  10.         // 大顶堆 
  11.         PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { 
  12.             @Override 
  13.             public int compare(Integer o1, Integer o2) { 
  14.                 return o2 - o1; 
  15.             } 
  16.         }); 
  17.  
  18.         concurrentLinkedQueue.offer(90); 
  19.         concurrentLinkedQueue.offer(80); 
  20.         concurrentLinkedQueue.offer(70); 
  21.         concurrentLinkedQueue.offer(60); 
  22.         concurrentLinkedQueue.offer(40); 
  23.         concurrentLinkedQueue.offer(30); 
  24.         concurrentLinkedQueue.offer(20); 
  25.         concurrentLinkedQueue.offer(10); 
  26.         concurrentLinkedQueue.offer(50); 
  27.         concurrentLinkedQueue.offer(85); 
  28.         //输出元素排列 
  29.         concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  ")); 
  30.         //取出元素 
  31.         Integer take = concurrentLinkedQueue.take(); 
  32.         System.out.println(); 
  33.         concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+"  ")); 
  34.     } 

 

操作的细节分为两步:

  

 85 按照上面讲的先插入到堆的尾部,也就是一维数组的尾部,一维数组的尾部的话就上图的位置,因为这是一个完全二叉树,所以它的尾部就是50后面这个结点。插进来之后这个时候就破坏了堆,它的每一个结点都要大于它的儿子的这种属性了,接下来要做的事情就是要把 85 依次地向上浮动,怎么浮动?就是 85 大于它的父亲结点,那么就和父亲结点进行交换,直到走到根如果大于根的话,就和根也进行交换。


85 再继续往前走之后,它要和 80 再进行比较,同理可得:也就是说这个结点每次和它的父亲比,如果它大于它的父亲的话就交换,直到它不再大于它的父亲。

 

出队方法

入队列的方法说完后,我们来说说出队列的方法。PriorityBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

poll 和 peek 与上面类似,这里不做说明

take()

出队方法,该方法会阻塞

  1. public E take() throws InterruptedException { 
  2.  //显示锁 
  3.     final ReentrantLock lock = this.lock; 
  4.     //可中断锁 
  5.     lock.lockInterruptibly(); 
  6.     //结果接收对象 
  7.     E result; 
  8.     try { 
  9.      //判断队列是否为空 
  10.         while ( (result = dequeue()) == null
  11.          //线程阻塞 
  12.             notEmpty.await(); 
  13.     } finally { 
  14.         lock.unlock(); 
  15.     } 
  16.     return result; 

 dequeue()

我们再来看看具体出队方法的实现,dequeue方法

  1. private E dequeue() { 
  2. //长度减少 1 
  3.    int n = size - 1; 
  4.    //判断队列中是否有元素 
  5.    if (n < 0) 
  6.        return null
  7.    else { 
  8.     //队列对象 
  9.        Object[] array = queue; 
  10.        //取出第一个元素 
  11.        E result = (E) array[0]; 
  12.        //拿出最后一个元素 
  13.        E x = (E) array[n]; 
  14.        //置空 
  15.        array[n] = null
  16.        Comparator cmp = comparator; 
  17.        if (cmp == null
  18.         //下沉调整 
  19.            siftDownComparable(0, x, array, n); 
  20.        else 
  21.            siftDownUsingComparator(0, x, array, n, cmp); 
  22.        //成功则减少队列中的元素数量 
  23.        size = n; 
  24.        return result; 
  25.    } 

 总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。

siftDownComparable(int k, T x, Object[] array, int n)

再来看看下沉比较器方法的实现

  1. private static  void siftDownComparable(int k, T x, Object[] array, 
  2.                                                int n) { 
  3.     //判断队列长度 
  4.     if (n > 0) { 
  5.         Comparable key = (Comparable)x; 
  6.         //找到队列最后一个元素的父节点的索引。 
  7.         int half = n >>> 1;           // loop while a non-leaf 
  8.         while (k < half) { 
  9.          //拿到 k 节点下的左子节点 
  10.             int child = (k << 1) + 1; // assume left child is least 
  11.             //取得子节点对应的值 
  12.             Object c = array[child]; 
  13.             //取得 k 右子节点的索引 
  14.             int right = child + 1; 
  15.             //比较右节点的索引是否小于队列长度和左右子节点的值进行比较 
  16.             if (right < n && 
  17.                 ((Comparable) c).compareTo((T) array[right]) > 0) 
  18.                 c = array[child = right]; 
  19.             //比较父节点值是否大于子节点 
  20.             if (key.compareTo((T) c) <= 0) 
  21.                 break; 
  22.             //下面都是元素替换 
  23.             array[k] = c; 
  24.             k = child; 
  25.         } 
  26.         array[k] = key
  27.     } 

 出队图解

将堆尾元素替换到顶部(即堆顶被替代删除掉)

依次从根部向下调整整个堆的结构(一直到堆尾即可) HeapifyDown

例子:90 从二叉堆中删除(大顶堆)


总结

PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

 

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯