文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java并发编程之ConcurrentLinkedQueue源码详解

2024-04-02 19:55

关注

一、ConcurrentLinkedQueue介绍

并编程中,一般需要用到安全的队列,如果要自己实现安全队列,可以使用2种方式:
方式1:加锁,这种实现方式就是我们常说的阻塞队列。
方式2:使用循环CAS算法实现,这种方式实现队列称之为非阻塞队列。
从点到面, 下面我们来看下非阻塞队列经典实现类:ConcurrentLinkedQueue (JDK1.8版)

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全的队列。当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,用CAS实现了非阻塞的线程安全队列。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素,因为移除元素时实际是将节点中item置为null,如果元素本身为null,则跟删除有冲突

我们首先看一下ConcurrentLinkedQueue的类图结构先,好有一个内部逻辑有一个大概的印象,如下图所示: 

主要属性head节点,tail节点


// 链表头节点
private transient volatile Node<E> head;
// 链表尾节点
private transient volatile Node<E> tail;

主要内部类Node

类Node在static方法里获取到item和next的内存偏移量,之后通过casItem和casNext更改item值和next节点


private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
 
    
    Node(E item) {
        //将item存放在本节点的itemOffset偏移量位置的内存里
        UNSAFE.putObject(this, itemOffset, item);//设置this对象的itemoffset位置
    }
    //更新item值
    boolean casItem(E cmp, E val) {
       //this对象的itemoffset位置存放的值如果和期望值cmp相等,则替换为val
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }
 
    void lazySetNext(Node<E> val) {
      //this对象的nextOffset位置存入val
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }
    //更新next节点值
    boolean casNext(Node<E> cmp, Node<E> val) {
     //this对象的nextOffset位置存放的值如果和期望值cmp相等,则替换为val
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
 
    // Unsafe mechanics
 
    private static final sun.misc.Unsafe UNSAFE;
    //当前节点存放的item的内存偏移量
    private static final long itemOffset;
    //当前节点的next节点的内存偏移量
    private static final long nextOffset;
 
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

concurrentlinkedqueue同样在static方法里获取到head和tail的内存偏移量:之后通过casHead和casTail更改head节点和tail节点值


static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentLinkedQueue.class;
        headOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("head"));
        tailOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("tail"));
    } catch (Exception e) {
        throw new Error(e);
    }
}
 
private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
 
private boolean casHead(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}

二、构造方法


//无参构造函数,head=tail=new Node<E>(null)=空节点
//初始一个为空的ConcurrentLinkedQueue,此时head和tail都指向一个item为null的节点
public ConcurrentLinkedQueue() {
    // 初始化头尾节点
    head = tail = new Node<E>(null);
}
 
//集合构造函数:就是将集合中的元素挨个链起来
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);//可以理解为一种懒加载,  将t的next值设置为newNode
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}
 
private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
 
//putObjectVolatile的内存非立即可见版本,
//写后结果并不会被其他线程看到,通常是几纳秒后被其他线程看到,这个时间比较短,所以代价可以接收
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

三、入队 

获取到当前尾节点p=tail:


public boolean offer (E e){
    //先检查元素是否为null,是null则抛出异常 不是null,则构造新节点准备入队
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    //初始p指针和t指针都指向尾节点,p指针用来向队列后面推移,t指针用来判断尾节点是否改变
    Node<E> t = tail, p = t;
    for (; ; ) {
        Node<E> q = p.next;
        if (q == null) {//p.next为null,则代表p为尾节点,则将p.next指向新节点 
            // p is last node
            if (p.casNext(null, newNode)) {
                
                if (p != t)
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        } else if (p == q)
        
 
            p = (t != (t = tail)) ? t : head;
        else
        
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

四、出队

poll出队:
获取到当前头节点p=head:如果成功设置了item为null,即p.catItem(item,null),
如果此时被其他线程抢走消费了,此时需要p=p.next,向后继续争抢消费,直到成功执行p.catItem(item,null),此时检查p是不是head节点,如果不是更新p.next为头结点


public E poll() {
    restartFromHead:
    for (;;) {
        // p节点表示首节点,即需要出队的节点
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
 
            // 如果p节点的元素不为null,则通过CAS来设置p节点引用的元素为null,如果成功则返回p节点的元素
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                // 如果p != h,则更新head
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
            // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
            else if ((q = p.next) == null) {
                // 更新头结点
                updateHead(h, p);
                return null;
            }
            // p == q,则使用新的head重新开始
            else if (p == q)
                continue restartFromHead;
            // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
            else
                p = q;
        }
    }
}

五、总结

offer:

找到尾节点,将新节点链入到尾节点后面,tail.next=newNode,

由于多线程操作,所以拿到p=tail后cas操作执行p.next=newNode可能由于被其他线程抢去而执行不成功,此时需要p=p.next向后遍历,直到找到p.next=null的目标节点。继续尝试向其后面添加元素,添加成功后检查p是否是tail,如果不是tail,则更新tail=p,添加不成功继续向后next遍历

poll:

获取到当前头节点p=head:如果成功设置了item为null,即p.catItem(item,null),

如果此时被其他线程抢走消费了,此时需要p=p.next,向后继续争抢消费,直到成功执行p.catItem(item,null),此时检查p是不是head节点,如果不是更新头结点head=p.next(因为p已经删除了)

更新tail和head:

不是每次添加都更新tail,而是间隔一次更新一次(head也是一样道理):第一个抢到的线程拿到tail执行成功tail.next=newNode1此时不更新tail,那么第二个线程再执行成功添加p.next=newNode2会判断出p是newNode1而不是tail,所以就更新tail为newNode2。

tail节点不总是最后一个,head节点不总是第一个设计初衷:

让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率。

在JDK 1.7的实现中,doug lea使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将 tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。

在JDK 1.8的实现中,tail的更新时机是通过p和t是否相等来判断的,其实现结果和JDK 1.7相同,即当tail节点和尾节点的距离大于等于1时,更新tail。

到此这篇关于Java并发编程之ConcurrentLinkedQueue源码详解的文章就介绍到这了,更多相关Java ConcurrentLinkedQueue源码内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯