文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java并发编程之阻塞队列深入详解

2024-04-02 19:55

关注

1. 什么是阻塞队列

 阻塞队列是一种特殊的队列,和数据结构中普通的队列一样,也遵守先进先出的原则同时,阻塞队列是一种能保证线程安全的数据结构,并且具有以下两种特性:当队列满的时候,继续向队列中插入元素就会让队列阻塞,直到有其他线程从队列中取走元素;当队列为空的时候,继续出队列也会让队列阻塞,直到有其他线程往队列中插入元素

补充:线程阻塞的意思指代码此时不会被执行,即操作系统在此时不会把这个线程调度到CPU上去执行了

2. 阻塞队列的代码使用


import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
public class Test {
    public static void main(String[] args) throws InterruptedException {
        //不能直接newBlockingDeque,因为它是一个接口,要向上转型
        //LinkedBlockingDeque内部是基于链表方式来实现的
        BlockingDeque<String> queue=new LinkedBlockingDeque<>(10);//此处可以指定一个具体的数字,这里的的10代表队列的最大容量
        queue.put("hello");
        String elem=queue.take();
        System.out.println(elem);
        elem=queue.take();
        System.out.println(elem);
    }
}

注意: put方法带有阻塞功能,但是offer不具有,所以一般用put方法(能使用offer方法的原因是 BlockingDeque继承了Queue

在这里插入图片描述


打印结果如上所示,当打印了hello后,队列为空,代码执行到elem=queue.take();就不会继续往下执行了,此时线程进入阻塞等待状态,什么也不会打印了,直到有其他线程给队列中放入新的元素为止

3. 生产者消费者模型

生产者消费者模型是在服务器开发和后端开发中比较常用的编程手段,一般用于解耦合和削峰填谷。

高耦合度:两个代码模块的关联关系比较高
高内聚:一个代码模块内各个元素彼此结合的紧密
因此,我们一般追求高内聚低耦合,这样会加快执行效率,而使用生产者消费者模型就可以解耦合

(1)应用一:解耦合

我们以实际生活中的情况为例,这里有两台服务器:A服务器和B服务器,当A服务器传输数据给B时,要是直接传输的话,那么不是A向B推送数据,就是B从A中拉取数据,都是需要A和B直接交互,所以A和B存在依赖关系(A和B的耦合度比较高)。未来如果服务器需要扩展,比如加一个C服务器,让A给C传数据,那么改动就比较复杂,且会降低效率。这时我们可以加一个队列,这个队列为阻塞队列,如果A把数据写到队列里,B从中取,那么队列相当于是中转站(或者说交易场所),A相当于生产者(提供数据),B相当于消费者(接收数据),此时就构成了生产者消费者模型,这样会让代码耦合度更低,维护更方便,执行效率更高。

在这里插入图片描述

在计算机中,生产者充当其中一组线程,而消费者充当另一组线程,而交易场所就可以使用阻塞队列了

(2)应用二:削峰填谷

在这里插入图片描述

实际生活中
在河道中大坝算是一个很重要的组成部分了,如果没有大坝,大家试想一下结果:当汛期来临后上游的水很大时,下游就会涌入大量的水发生水灾让庄稼被淹没;而旱期的话下游的水会很少可能会引发旱灾。若有大坝的话,汛期时大坝把多余的水存到大坝中,关闸蓄水,让上游的水按一定速率往下流,避免突然一波大雨把下游淹了,这样下游不至于出现水灾。旱期时大坝把之前储存好的水放出来,还是让让水按一定速率往下流,避免下流太缺水,这样既可以避免汛期发生洪涝又可以避免旱期发生旱灾了。
峰:相当于汛期
谷:相当于旱期
计算机中
这样的情况在计算机中也是很典型的,尤其是在服务器开发中,网关通常会把互联网中的请求转发给业务服务器,比如一些商品服务器,用户服务器,商家服务器(存放商家的信息),直播服务器。但因为互联网过来的请求数量是多是少不可控,相当于上游的水,如果突然来了一大波请求,网关即使能扛得住,后续的很多服务器收到很多请求也就会崩溃(处理一个请求涉及到一系列的数据库操作,因为数据库相关操作效率本身比较低,这样请求多了就处理不过来了,因此就会崩溃)

在这里插入图片描述

所以实际情况中网关和业务服务器之间往往用一个队列来缓冲,这个队列就是阻塞队列(交易场所),用这个队列来实现生产者(网关)消费者(业务服务器)模型,把请求缓存到队列中,后面的消费者(业务服务器)按照自己固定的速率去读请求。这样当请求很多时,虽然队列服务器可能会稍微受到一定压力,但能保证业务服务器的安全。

(3)相关代码


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestDemo {
    public static void main(String[] args) {
        // 使用一个 BlockingQueue 作为交易场所
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        // 此线程作为消费者
        Thread customer = new Thread() {
            @Override
            public void run() {
                while (true) {
                    // 取队首元素
                    try {
                        Integer value = queue.take();
                        System.out.println("消费元素: " + value);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        customer.start();
        // 此线程作为生产者
        Thread producer = new Thread() {
            @Override
            public void run() {
                for (int i = 1; i <= 10000; i++) {
                    System.out.println("生产了元素: " + i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        try {
            customer.join();
            producer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

打印如上(此代码是让生产者通过sleep每过1秒生产一个元素,而消费者不使用sleep,所以每当生产一个元素时,消费者都会立马消费一个元素)

4.阻塞队列和生产者消费者模型功能的实现

在学会如何使用BlockingQueue后,那么如何自己去实现一个呢?
主要思路:


public class Test2 {
    static class BlockingQueue {
    private int[] items = new int[1000];    // 此处的1000相当于队列的最大容量, 此处暂时不考虑扩容的问题.
    private int head = 0;//定义队头
    private int tail = 0;//定义队尾
    private int size = 0;//数组大小
    private Object locker = new Object();

    // put 用来入队列
    public void put(int item) throws InterruptedException {
        synchronized (locker) {
            while (size == items.length) {
                // 队列已经满了,阻塞队列开始阻塞
                locker.wait();
            }
            items[tail] = item;
            tail++;
            // 如果到达末尾, 就回到起始位置.
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            locker.notify();
        }
    }
    // take 用来出队列
    public int take() throws InterruptedException {
        int ret = 0;
        synchronized (locker) {
            while (size == 0) {
                // 对于阻塞队列来说, 如果队列为空, 再尝试取元素, 就要阻塞
                locker.wait();
            }
            ret = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // 此处的notify 用来唤醒 put 中的 wait
            locker.notify();
        }
        return ret;
    }
}

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new BlockingQueue();
        // 消费者线程
        Thread consumer = new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        int elem = queue.take();
                        System.out.println("消费元素: " + elem);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();

        // 生产者线程
        Thread producer = new Thread() {
            @Override
            public void run() {
                for (int i = 1; i < 10000; i++) {
                    System.out.println("生产元素: " + i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        consumer.join();
        producer.join();
    }
}

在这里插入图片描述

运行结果如上。
注意:

到此这篇关于Java并发编程之阻塞队列深入详解的文章就介绍到这了,更多相关Java 阻塞队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯