文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot线程池和Java线程池怎么使用

2023-07-06 01:25

关注

这篇文章主要介绍“SpringBoot线程池和Java线程池怎么使用”,在日常操作中,相信很多人在SpringBoot线程池和Java线程池怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot线程池和Java线程池怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

SpringBoot线程池和Java线程池的用法和实现原理

使用默认的线程池

方式一:通过@Async注解调用

public class AsyncTest {    @Async    public void async(String name) throws InterruptedException {        System.out.println("async" + name + " " + Thread.currentThread().getName());        Thread.sleep(1000);    }}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication//@EnableAsyncpublic class Test1Application {   public static void main(String[] args) throws InterruptedException {      ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);      AsyncTest bean = run.getBean(AsyncTest.class);      for(int index = 0; index <= 10; ++index){         bean.async(String.valueOf(index));      }   }}

方式二:直接注入 ThreadPoolTaskExecutor

此时可不加 @EnableAsync注解

@SpringBootTestclass Test1ApplicationTests {   @Resource   ThreadPoolTaskExecutor threadPoolTaskExecutor;   @Test   void contextLoads() {      Runnable runnable = () -> {         System.out.println(Thread.currentThread().getName());      };      for(int index = 0; index <= 10; ++index){         threadPoolTaskExecutor.submit(runnable);      }   }}

线程池默认配置信息

SpringBoot线程池的常见配置:

spring:  task:    execution:      pool:        core-size: 8        max-size: 16                          # 默认是 Integer.MAX_VALUE        keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止        allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true        queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE      shutdown:        await-termination: false              # 线程关闭等待      thread-name-prefix: task-               # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutor 和 taskExecutor

// TaskExecutionAutoConfiguration#applicationTaskExecutor()@Lazy@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,      AsyncAnnotationBeanPostProcessor.DEFAUL          T_TASK_EXECUTOR_BEAN_NAME })@ConditionalOnMissingBean(Executor.class)public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {   return builder.build();}
// ThreadPoolTaskExecutor#initializeExecutor()@Overrideprotected ExecutorService initializeExecutor(      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {   BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);   ThreadPoolExecutor executor;   if (this.taskDecorator != null) {      executor = new ThreadPoolExecutor(            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,            queue, threadFactory, rejectedExecutionHandler) {         @Override         public void execute(Runnable command) {            Runnable decorated = taskDecorator.decorate(command);            if (decorated != command) {               decoratedTaskMap.put(decorated, command);            }            super.execute(decorated);         }      };   }   else {      executor = new ThreadPoolExecutor(            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,            queue, threadFactory, rejectedExecutionHandler);   }   if (this.allowCoreThreadTimeOut) {      executor.allowCoreThreadTimeOut(true);   }   this.threadPoolExecutor = executor;   return executor;}
// ExecutorConfigurationSupport#initialize()public void initialize() {   if (logger.isInfoEnabled()) {      logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));   }   if (!this.threadNamePrefixSet && this.beanName != null) {      setThreadNamePrefix(this.beanName + "-");   }   this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);}

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

@Configurationpublic class ThreadPoolConfiguration {    @Bean("taskExecutor")    public ThreadPoolTaskExecutor taskExecutor() {        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();        //设置线程池参数信息        taskExecutor.setCorePoolSize(10);        taskExecutor.setMaxPoolSize(50);        taskExecutor.setQueueCapacity(200);        taskExecutor.setKeepAliveSeconds(60);        taskExecutor.setThreadNamePrefix("myExecutor--");        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);        taskExecutor.setAwaitTerminationSeconds(60);        //修改拒绝策略为使用当前线程执行        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        //初始化线程池        taskExecutor.initialize();        return taskExecutor;    }}

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2")public ThreadPoolTaskExecutor taskExecutor2() {    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();    //设置线程池参数信息    taskExecutor.setCorePoolSize(10);    taskExecutor.setMaxPoolSize(50);    taskExecutor.setQueueCapacity(200);    taskExecutor.setKeepAliveSeconds(60);    taskExecutor.setThreadNamePrefix("myExecutor2--");    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);    taskExecutor.setAwaitTerminationSeconds(60);    //修改拒绝策略为使用当前线程执行    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //初始化线程池    taskExecutor.initialize();    return taskExecutor;}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@ResourceThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

@Async("taskExecutor2")    public void async(String name) throws InterruptedException {        System.out.println("async" + name + " " + Thread.currentThread().getName());        Thread.sleep(1000);    }

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,                          BlockingQueue<Runnable> workQueue) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,         Executors.defaultThreadFactory(), defaultHandler);}

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,                              60L, TimeUnit.SECONDS,                              new SynchronousQueue<Runnable>());

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

return new ThreadPoolExecutor(nThreads, nThreads,                              0L, TimeUnit.MILLISECONDS,                              new LinkedBlockingQueue<Runnable>());

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue());}

周期执行:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);scheduledThreadPool.scheduleAtFixedRate(()->{   System.out.println("rate");}, 1, 1, TimeUnit.SECONDS);

延时执行:

scheduledThreadPool.schedule(()->{   System.out.println("delay 3 seconds");}, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));}

Java 线程池中的四种拒绝策略

CallerRunsPolicy

直接在主线程中执行了run方法。

public static class CallerRunsPolicy implements RejectedExecutionHandler {     public CallerRunsPolicy() { }     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            r.run();        }    }}

效果类似于:

Runnable thread = ()->{   System.out.println(Thread.currentThread().getName());   try {      Thread.sleep(0);   } catch (InterruptedException e) {      throw new RuntimeException(e);   }};thread.run();

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

public static class AbortPolicy implements RejectedExecutionHandler {     public AbortPolicy() { }     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        throw new RejectedExecutionException("Task " + r.toString() +                                             " rejected from " +                                             e.toString());    }}

DiscardPolicy

什么也不做。

public static class DiscardPolicy implements RejectedExecutionHandler {     public DiscardPolicy() { }     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {    }}

DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {     public DiscardOldestPolicy() { }     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            e.getQueue().poll();            e.execute(r);        }    }}

Java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{        private static final long serialVersionUID = 6138294804551838833L;        final Thread thread;        Runnable firstTask;        volatile long completedTasks;        Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        this.thread = getThreadFactory().newThread(this);    }        public void run() {        runWorker(this);    }    // Lock methods    //    // The value 0 represents the unlocked state.    // The value 1 represents the locked state.    protected boolean isHeldExclusively() {        return getState() != 0;    }    protected boolean tryAcquire(int unused) {        if (compareAndSetState(0, 1)) {            setExclusiveOwnerThread(Thread.currentThread());            return true;        }        return false;    }    protected boolean tryRelease(int unused) {        setExclusiveOwnerThread(null);        setState(0);        return true;    }    public void lock()        { acquire(1); }    public boolean tryLock()  { return tryAcquire(1); }    public void unlock()      { release(1); }    public boolean isLocked() { return isHeldExclusively(); }    void interruptIfStarted() {        Thread t;        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {            try {                t.interrupt();            } catch (SecurityException ignore) {            }        }    }}

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                beforeExecute(wt, task);                Throwable thrown = null;                try {                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}

到此,关于“SpringBoot线程池和Java线程池怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯