文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

彻底搞懂java并发ThreadPoolExecutor使用

2023-02-28 17:21

关注

前言

线程池是Java中使用较多的并发框架,合理使用线程池,可以:降低资源消耗提高响应速度提高线程的可管理性

本篇文章将从线程池简单原理线程池的创建线程池执行任务关闭线程池进行使用学习。

正文

一. 线程池的简单原理

当一个任务提交到线程池ThreadPoolExecutor时,该任务的执行如下图所示。

由于ThreadPoolExecutor存储工作线程使用的集合是HashSet,因此执行上述步骤1和步骤3时需要获取全局锁来保证线程安全,而获取全局锁会导致线程池性能瓶颈,因此通常情况下,线程池完成预热后(当前线程数大于等于corePoolSize),线程池的execute() 方法都是执行步骤2。

二. 线程池的创建

通过ThreadPoolExecutor能够创建一个线程池,ThreadPoolExecutor的构造函数签名如下。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

通过ThreadPoolExecutor创建线程池时,需要指定线程池的核心线程数最大线程数线程保活时间线程保活时间单位任务阻塞队列,并按需指定线程工厂饱和拒绝策略,如果不指定线程工厂饱和拒绝策略,则ThreadPoolExecutor会使用默认的线程工厂饱和拒绝策略。下面分别介绍这些参数的含义。

参数含义
corePoolSize核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。
maximumPoolSize最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的任务会加入任务阻塞队列,但是如果任务阻塞队列已满且线程数小于maximumPoolSize,此时会继续创建新的线程来执行任务。该参数规定了线程池允许创建的最大线程数
keepAliveTime线程保活时间。当线程池的线程数大于核心线程数时,多余的空闲线程会最大存活keepAliveTime的时间,如果超过这个时间且空闲线程还没有获取到任务来执行,则该空闲线程会被回收掉。
unit线程保活时间单位。通过TimeUnit指定线程保活时间的时间单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么时间单位,ThreadPoolExecutor统一会将其转换为NANOSECONDS
workQueue任务阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的任务会添加到workQueue中,所有线程执行完上一个任务后,会循环从workQueue中获取任务来执行。
threadFactory创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
handler饱和拒绝策略。如果任务阻塞队列已满且线程池中的线程数等于maximumPoolSize,说明线程池此时处于饱和状态,应该执行一种拒绝策略来处理新提交的任务。

三. 线程池执行任务

1. 执行无返回值任务

通过ThreadPoolExecutorexecute() 方法,能执行Runnable任务,示例如下。

public class ThreadPoolExecutorTest {
    @Test
    public void ThreadPoolExecutor执行简单无返回值任务() throws Exception {
        // 创建一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(300));
        // 创建两个任务
        Runnable firstRunnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("第一个任务执行");
            }
        };
        Runnable secondRunnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("第二个任务执行");
            }
        };
        // 让线程池执行任务
        threadPoolExecutor.execute(firstRunnable);
        threadPoolExecutor.execute(secondRunnable);
        // 让主线程睡眠1秒,等待线程池中的任务被执行完毕
        Thread.sleep(1000);
    }
}

运行测试程序,结果如下。

2. 执行有返回值任务

通过ThreadPoolExecutorsubmit() 方法,能够执行Callable任务,通过submit() 方法返回的RunnableFuture能够拿到异步执行的结果。示例如下。

public class ThreadPoolExecutorTest {
    @Test
    public void ThreadPoolExecutor执行简单有返回值任务() throws Exception {
        // 创建一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(300));
        // 创建两个任务,任务执行完有返回值
        Callable&lt;String&gt; firstCallable = new Callable&lt;String&gt;() {
            @Override
            public String call() throws Exception {
                return "第一个任务返回值";
            }
        };
        Callable&lt;String&gt; secondCallable = new Callable&lt;String&gt;() {
            @Override
            public String call() throws Exception {
                return "第二个任务返回值";
            }
        };
        // 让线程池执行任务
        Future&lt;String&gt; firstFuture = threadPoolExecutor.submit(firstCallable);
        Future&lt;String&gt; secondFuture = threadPoolExecutor.submit(secondCallable);
        // 获取执行结果,拿不到结果会阻塞在get()方法上
        System.out.println(firstFuture.get());
        System.out.println(secondFuture.get());
    }
}

运行测试程序,结果如下。

3. 执行有返回值任务时抛出错误

如果ThreadPoolExecutor在执行Callable任务时,在Callable任务中抛出了异常并且没有捕获,那么这个异常是可以通过Futureget() 方法感知到的。示例如下。

public class ThreadPoolExecutorTest {
    @Test
    public void ThreadPoolExecutor执行简单有返回值任务时抛出错误() {
        // 创建一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
        // 创建一个任务,任务有返回值,但是执行过程中抛出异常
        Callable<String> exceptionCallable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                throw new RuntimeException("发生了异常");
            }
        };
        // 让线程池执行任务
        Future<String> exceptionFuture = threadPoolExecutor.submit(exceptionCallable);
        try {
            System.out.println(exceptionFuture.get());
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

运行测试程序,结果如下。

4. ThreadPoolExecutor通过submit方式执行Runnable

ThreadPoolExecutor可以通过submit() 方法来运行Runnable任务,并且还可以异步获取执行结果。示例如下。

public class ThreadPoolExecutorTest {
    @Test
    public void ThreadPoolExecutor通过submit的方式来提交并执行Runnable() throws Exception {
        // 创建一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;&gt;(300));
        // 创建结果对象
        MyResult myResult = new MyResult();
        // 创建Runnable对象
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                myResult.setResult("任务执行了");
            }
        };
        // 通过ThreadPoolExecutor的submit()方法提交Runnable
        Future&lt;MyResult&gt; resultFuture = threadPoolExecutor.submit(runnable, myResult);
        // 获取执行结果
        MyResult finalResult = resultFuture.get();
        // myResult和finalResult的地址实际相同
        Assert.assertEquals(myResult, finalResult);
        // 打印执行结果
        System.out.println(resultFuture.get().getResult());
    }
    private static class MyResult {
        String result;
        public MyResult() {}
        public MyResult(String result) {
            this.result = result;
        }
        public String getResult() {
            return result;
        }
        public void setResult(String result) {
            this.result = result;
        }
    }
}

运行测试程序,结果如下。

实际上ThreadPoolExecutorsubmit() 方法无论是提交Runnable任务还是Callable任务,都是将任务封装成了RunnableFuture接口的子类FutureTask,然后调用ThreadPoolExecutorexecute() 方法来执行FutureTask

四. 关闭线程池

关闭线程池可以通过ThreadPoolExecutorshutdown() 方法,但是shutdown() 方法不会去中断正在执行任务的线程,所以如果线程池里有Worker正在执行一个永远不会结束的任务,那么shutdown() 方法是无法关闭线程池的。示例如下。

public class ThreadPoolExecutorTest {
    @Test
    public void 通过shutdown关闭线程池() {
        // 创建一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
        // 创建Runnable对象
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    LockSupport.parkNanos(1000 * 1000 * 1000);
                }
                System.out.println(Thread.currentThread().getName() + " 被中断");
            }
        };
        // 让线程池执行任务
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.execute(runnable);
        // 调用shutdown方法关闭线程池
        threadPoolExecutor.shutdown();
        // 等待3秒观察现象
        LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
    }
}

运行测试程序,会发现在主线程中等待3秒后,也没有得到预期的打印结果。如果上述测试程序中使用shutdownNow,则是可以得到预期打印结果的,示例如下。

public class ThreadPoolExecutorTest {
    @Test
    public void 通过shutdownNow关闭线程池() {
        // 创建一个线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
        // 创建Runnable对象
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    LockSupport.parkNanos(1000 * 1000 * 1000);
                }
                System.out.println(Thread.currentThread().getName() + " 被中断");
            }
        };
        // 让线程池执行任务
        threadPoolExecutor.execute(runnable);
        threadPoolExecutor.execute(runnable);
        // 调用shutdown方法关闭线程池
        threadPoolExecutor.shutdownNow();
        // 等待3秒观察现象
        LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
    }
}

运行测试程序,打印如下。

因为测试程序中的任务是响应中断的,而ThreadPoolExecutorshutdownNow() 方法会中断所有Worker,所以执行shutdownNow() 方法后,正在运行的任务会响应中断并结束运行,最终线程池关闭。

假如线程池中运行着一个永远不会结束的任务,且这个任务不响应中断,那么无论是shutdown() 方法还是shutdownNow() 方法,都是无法关闭线程池的。

总结

ThreadPoolExecutor的使用总结如下。

以上就是彻底搞懂java并发ThreadPoolExecutor使用的详细内容,更多关于java并发ThreadPoolExecutor的资料请关注编程网其它相关文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯