文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

线程池02-LinkedBlockingQueue 阻塞队列

2021-10-24 23:05

关注


	线程池02-LinkedBlockingQueue 阻塞队列
[数据库教程]

首先,我们先了解一下什么是阻塞队列:

常用到的方法

技术图片

上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlockingQueue。

源码分析

Node节点

static class Node {
    E item;
    Node next;
    Node(E x) { item = x; }
}

构造方法和参数

   
   private final int capacity;

    
    private final AtomicInteger count = new AtomicInteger();

    
    transient Node head;

    
    private transient Node last;

    
    private final ReentrantLock takeLock = new ReentrantLock();

    
    private final Condition notEmpty = takeLock.newCondition();

    
    private final ReentrantLock putLock = new ReentrantLock();

    
    private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node(null);//初始化的时候设置头节点和尾节点为两个空节点
    }

插入

put 方法

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node node = new Node(e);//创建新节点
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//获取put锁
        try {
            //判断存入的元素个数和配置的数量是否相等,如果相等。那么将当前线程放入到条件队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);//将节点插入到末尾
            c = count.getAndIncrement();//元素数量+1
            if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
                notFull.signal();
        } finally {
            putLock.unlock();释放put锁
        }
        // 唤醒获取条件队列的头节点
        if (c == 0)
            signalNotEmpty();
    }

//将节点设置为尾节点
private void enqueue(Node node) {
    last = last.next = node;
}
// 唤醒“获取条件队列”中的首节点
private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//t获取ake锁
        try {
            notEmpty.signal();//唤醒“获取条件队列”中的首节点
        } finally {
            takeLock.unlock();
        }
    }

offer 方法

  public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)//如果超过容量直接返回false,表示不能再插入数据
            return false;
        int c = -1;
        Node node = new Node(e);//新建节点
        final ReentrantLock putLock = this.putLock;
        putLock.lock();//获取put锁
        try {
            if (count.get() < capacity) {//小于容量时增加节点,并唤醒“存入条件队列”头节点到同步队列
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 唤醒获取条件队列的头节点
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

take 方法

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//获取锁
        try {
            while (count.get() == 0) {//如果为空则放入“获取条件队列”
                notEmpty.await();
            }
            x = dequeue();//将节点加入到链表最后
            c = count.getAndDecrement();//数量减1
            if (c > 1)//如果元素书大于1,则调用“获取条件队列”中的元素放入同步队列
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 唤醒存入条件队列的头节点
        if (c == capacity)
            signalNotFull();
        return x;//返回头节点
    }

// 获取头节点元素
private E dequeue() {
        Node h = head;
        Node first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }


// 唤醒“存入条件队列”的头节点到同步队列
 private void signalNotFull() {
      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
          notFull.signal();
      } finally {
          putLock.unlock();
      }
  }

poll方法

   public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;//获取锁
        takeLock.lock();
        try {
            if (count.get() > 0) {//当前容器数量大于0时
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 唤醒存入条件队列的头节点
        if (c == capacity)
            signalNotFull();
        return x;
    }

总结

1.如何保证当队列没有消息或者消息满了的时候,进行监听?

上面看代码的时候,两段代码刚开始是有点懵的。

1.存入的方法
  // 唤醒获取条件队列的头节点
  if (c == 0) signalNotEmpty();

2.获取的方法
  // 唤醒存入条件队列的头节点
  if (c == capacity) signalNotFull();

其实这就监听的重要环节。
逻辑是这样的。以存入为例:
1.如果当前节点为0,说明队列中没有任务;
2.唤醒“获取条件队列”的头节点,去尝试获取元素。如果获取到则执行,如果没有,则依然放入到“获取条件队列”的末尾;
3.这样就可以保证在存入数据的时候,实时监听获取节点元素了。

线程池02-LinkedBlockingQueue 阻塞队列

原文地址:https://www.cnblogs.com/perferect/p/13723857.html

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯