文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

LinkedBlockingQueue链式阻塞队列的使用和原理解析

2022-11-13 19:00

关注

概览

1. 基于链表的可选有界阻塞队列。根据FIFO的出入队顺序,从队列头部检索和获取元素,在队列尾部插入新元素。

2. 当作为有界阻塞队列,在队列空间不足时,put方法将会一直阻塞直到有多余空间才会执行插入元素操作,take方法则相反,只到队列内元素不为空时,才将队列元素逐个取出。

3. 队列容量不指定时,默认为Integer.MAX_VALUE,此时可以看作无界队列。

4. 使用非公平锁进行并发控制。所有方法都是线程安全的。

使用方法

下面的文章给出了阻塞队列的四种基本用法:

了解BlockingQueue 大体框架和实现思路

LinkedBlockingQueue实现了BlockingQueue类。

在BlockingQueue中,方法被分为如下四类:

 

Throws exception

Special value

Blocks

Times out

新增

add(E e)

offer(E e)

put(E e)

offer(E e, long timeout, TimeUnit unit)

删除

remove()

poll()

take()

poll(long timeout, TimeUnit unit)

查询

element()

peek()

  

1. add | remove | element

这三个方法在BlockingQueue的定义中,都会在操作未实现时,抛出异常。

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.add(1);
blockingQue.remove(1);
blockingQue.remove(); // NoSuchElementException
blockingQue.element(); // NoSuchElementException

2. offer | poll | peek

根据操作的实际情况,返回特定值,例如null、false(这些失败可能是线程中断、队列为空引起的)

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
blockingQue.offer(1);
blockingQue.poll();
Integer peek = blockingQue.peek(); // 返回null

3. put | take

阻塞当前线程,直到当前线程可以成功执行。

注意:阻塞时,不会解除锁占用。

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.put(1);
    blockingQue.take();
} catch (InterruptedException e) {
    // 线程被中断
    e.printStackTrace();
}

4. offer | poll (timeout)

尝试指定时间后,放弃执行

LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>();
try {
    blockingQue.offer(1, 100, TimeUnit.MILLISECONDS);
    blockingQue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

当然除了上述的阻塞队列的基本操作外,LinkedBlockingQueue还具有集合Collection的性质。因此集合中的通用方法也可以使用。

源码解析

说明

本次源码分析主要按照下面几个步骤进行:

1. 保存队列数据的容器以及出入队方法

2. 主要成员变量以及作用

3. 主要方法分析

队列容器

结构图

仅有数据item和后继next的单向节点,结构简单。

static class Node<E> {
    E item;
 
    Node<E> next;
 
    Node(E x) { item = x; }
}

next三种情况

A 普通节点的真实后继

B 真正的队首节点,item=null(队首节点恒为head.next)

C 队尾节点,next=null

入队操作

    // 入队
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node; // last.next = node; last = node;
    }

步骤1

 步骤2

出队操作

 
    // 出队
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        // 形成引用链闭环,JVM根据可达性分析时,GC root的引用链与该对象之间不可达,进行GC
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

步骤1

 步骤2

关键成员变量

队列入队和出队锁分离,都使用了非公平锁。

这里的count属性需要注意下,这里使用了原子类保证操作的原子性。后面的入队和出队,将会频繁使用它。


private final int capacity;
 

private final AtomicInteger count = new AtomicInteger();
 

transient Node<E> head;
 

private transient Node<E> last;
 

private final ReentrantLock takeLock = new ReentrantLock();
 

private final Condition notEmpty = takeLock.newCondition(); 
 

private final ReentrantLock putLock = new ReentrantLock();
 

private final Condition notFull = putLock.newCondition();

初始化

三个构造函数

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
 
    // 自定义容量
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
 
    // 初始化时,批量添加集合中的元素
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put方法

put方法几个关注点

这几点是整个阻塞操作的核心,可以在下面的分析中仔细观察。

注:由于阻塞队列就是基于生产者-消费者模型的,因此,下文中都把调用put方法的线程称为生产者,调用take方法的线程称为消费者。

总体分析

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1; // -1代表操作异常
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 如果线程没有被标记为中断,则获取锁
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            // 这里是线程在执行put操作时唯一一个执行过程中释放锁的地方
            notFull.await(); // 容量已满,等待被消费后唤醒
        }
        // 添加元素,更新容量
        enqueue(node);
        c = count.getAndIncrement();
        // 队列容量有余时,在这里再次唤醒一个其他的生产者线程(或者说消费者消费速度大于生产)
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // 唤醒一个消费者
    if (c == 0)
        signalNotEmpty();
}

count属性并发问题

这里需要重点关注count,由于有两把锁,count可以同时被putLock、takeLock操作,那么这里是否会产生并发问题。

分析如下:

A. 只有putLock或takeLock一把锁操作:就是单线程操作,没影响,不产生并发问题。

其他所有put操作都处于await的状态或者竞争锁状态,其他线程也因为获取不到锁而无法执行,只有等该节点添加完成释放锁,其他线程才有机会继续执行。

        while (count.get() == capacity) {
            notFull.await(); // 容量已满,等待被消费后唤醒
        }

B. putLock和takeLock同时操作:我们假设两个线程一个获取到putLock,一个获取到了takeLock(同时最多也只有两个线程操作count)。

// put            
while (count.get() == capacity) {
     notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
 
 
 
// take
while (count.get() == 0) {
    notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
    notEmpty.signal();

由于count是原子类那么count的所有读写操作必然是一个串联的操作,而非并行操作,因此也不存在并发问题,如下图(顺序可能不同):

唤醒消费者

代码的最后一段,会有唤醒一个消费者的操作。

// 唤醒一个等待中的消费者
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

刚开始看到的时候很疑惑,为什么是c == 0才唤醒。如果生产者入队成功,那么c应该为如下值:

c = count.getAndIncrement();

后面看了一下count.getAndIncrement()方法定义才发现自己记混了,count.getAndIncrement()是一个原子操作,且返回值的是操作前的值

ok,现在没问题了。

count >= 0,也就是说,只有在生产者入队前队列为空,入队成功之后才会唤醒一个消费者消费。

take方法

take方法与put方法大致相似,只是与put做相反操作。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列元素为空,停止消费,让出锁并等待被唤醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 移除队首元素,并更新容量
        x = dequeue();
        c = count.getAndDecrement();
        // 生产速度大于消费速度,唤醒一个其他消费者进行消费
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 消费之前队列已满,消费后唤醒一个生产者
    if (c == capacity)
        signalNotFull();
    return x;
}
 
// 唤醒生产者

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

总结

总体上看LinkedBlockingQueue类不难,整个生产-消费的流程实现也比较简单。源码已经把该介绍的东西都讲得很明白了,我这属于依葫芦画瓢顺着源码注释写出来的。这么一写,自己这个类的印象就很深刻了。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯