文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java线程池优化的方法是什么

2023-06-29 17:53

关注

这篇文章主要介绍“Java线程池优化的方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Java线程池优化的方法是什么”文章能帮助大家解决问题。

升级版线程池的优化

新增了4种拒绝策略。分别为:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

对线程池MyThreadPoolExecutor的构造方法进行优化,增加了参数校验,防止乱传参数现象。

这是最重要的一个优化。

线程池构造器

public MyThreadPoolExecutor(){        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);    }    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);    }    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {        this.workers=new HashSet<>(corePoolSize);        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){            this.corePoolSize=corePoolSize;            this.waitingQueue=waitingQueue;            this.threadFactory=threadFactory;            this.handle=handle;        }else {            throw new NullPointerException("线程池参数不合法");        }    }

线程池拒绝策略

策略接口:MyRejectedExecutionHandle

package com.springframework.concurrent;public interface MyRejectedExecutionHandle {    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);}

策略内部实现类

    //抛异常策略(默认)    public static class MyAbortPolicy implements MyRejectedExecutionHandle{        public MyAbortPolicy(){        }        @Override        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");        }    }    //默默丢弃策略    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{        public MyDiscardPolicy() {        }        @Override        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {        }    }    //丢弃掉最老的任务策略    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{        public MyDiscardOldestPolicy() {        }        @Override        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了                threadPoolExecutor.execute(runnable); //把新任务加入到队列中            }        }    }    //由调用者调用策略    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{        public MyCallerRunsPolicy(){        }        @Override        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭                runnable.run();            }        }    }

封装拒绝方法

protected final void reject(Runnable runnable){        this.handle.rejectedExecution(runnable, this);    }    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){        this.handle.rejectedExecution(runnable, threadPoolExecutor);    }

execute方法

@Override    public boolean execute(Runnable runnable)    {        if (!this.waitingQueue.offer(runnable)) {            this.reject(runnable);            return false;        }        else {            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程                MyWorker worker = new MyWorker(); //通过构造方法添加线程            }            return true;        }    }

可以看出只有当往线程池放任务时才会创建线程对象。

手写线程池源码

MyExecutorService

package com.springframework.concurrent;import java.util.concurrent.BlockingQueue;public interface MyExecutorService {    boolean execute(Runnable runnable);    void shutdown();    void shutdownNow();    boolean isShutdown();    BlockingQueue<Runnable> getWaitingQueue();}

MyRejectedExecutionException

package com.springframework.concurrent;public class MyRejectedExecutionException extends RuntimeException {    public MyRejectedExecutionException() {    }    public MyRejectedExecutionException(String message) {        super(message);    }    public MyRejectedExecutionException(String message, Throwable cause) {        super(message, cause);    }    public MyRejectedExecutionException(Throwable cause) {        super(cause);    }}

MyRejectedExecutionHandle

package com.springframework.concurrent;public interface MyRejectedExecutionHandle {    void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);}

核心类MyThreadPoolExecutor

package com.springframework.concurrent;import java.util.HashSet;import java.util.Set;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.concurrent.atomic.AtomicInteger;public class MyThreadPoolExecutor implements MyExecutorService{    private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数    private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号    private static volatile int corePoolSize; //核心线程数    private final HashSet<MyWorker> workers; //工作线程    private final BlockingQueue<Runnable> waitingQueue; //等待队列    private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称    private volatile boolean isRunning=true; //是否运行    private volatile boolean STOPNOW=false; //是否立刻停止    private volatile ThreadFactory threadFactory; //线程工厂    private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略    private volatile MyRejectedExecutionHandle handle; //拒绝紫略    public MyThreadPoolExecutor(){        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);    }    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {        this(corePoolSize,waitingQueue,threadFactory,defaultHandle);    }    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {        this.workers=new HashSet<>(corePoolSize);        if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){            this.corePoolSize=corePoolSize;            this.waitingQueue=waitingQueue;            this.threadFactory=threadFactory;            this.handle=handle;        }else {            throw new NullPointerException("线程池参数不合法");        }    }        //抛异常策略(默认)    public static class MyAbortPolicy implements MyRejectedExecutionHandle{        public MyAbortPolicy(){        }        @Override        public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {            throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");        }    }    //默默丢弃策略    public static class MyDiscardPolicy implements MyRejectedExecutionHandle{        public MyDiscardPolicy() {        }        @Override        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {        }    }    //丢弃掉最老的任务策略    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{        public MyDiscardOldestPolicy() {        }        @Override        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {            if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭                threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了                threadPoolExecutor.execute(runnable); //把新任务加入到队列中            }        }    }    //由调用者调用策略    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{        public MyCallerRunsPolicy(){        }        @Override        public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {            if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭                runnable.run();            }        }    }    //call拒绝方法    protected final void reject(Runnable runnable){        this.handle.rejectedExecution(runnable, this);    }    protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){        this.handle.rejectedExecution(runnable, threadPoolExecutor);    }        private final class MyWorker implements Runnable{        final Thread thread; //为每个MyWorker        MyWorker(){            Thread td = threadFactory.newThread(this);            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());            this.thread=td;            this.thread.start();            workers.add(this);        }        //执行任务        @Override        public void run() {            //循环接收任务                while (true)                {                    //循环退出条件:                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)                    {                        break;                    }else {                        //不断取任务,当任务!=null时则调用run方法处理任务                        Runnable runnable = waitingQueue.poll();                        if(runnable!=null){                            runnable.run();                            System.out.println("task==>"+taskcount.incrementAndGet());                        }                    }                }        }    }    //往线程池中放任务    @Override    public boolean execute(Runnable runnable)    {        if (!this.waitingQueue.offer(runnable)) {            this.reject(runnable);            return false;        }        else {            if(this.workers!=null&&this.workers.size()<corePoolSize){//这种情况才能添加线程                MyWorker worker = new MyWorker(); //通过构造方法添加线程            }            return true;        }    }    //优雅的关闭    @Override    public void shutdown()    {        this.isRunning=false;    }    //暴力关闭    @Override    public void shutdownNow()    {        this.STOPNOW=true;    }    //判断线程池是否关闭    @Override    public boolean isShutdown() {        return !this.isRunning||STOPNOW;    }    //获取等待队列    @Override    public BlockingQueue<Runnable> getWaitingQueue() {        return this.waitingQueue;    }}

线程池测试类

package com.springframework.test;import com.springframework.concurrent.MyThreadPoolExecutor;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;public class ThreadPoolTest {  public static void main(String[] args) {//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());//      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor//              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());      for(int i=0;i<11;i++){          int finalI = i;          myThreadPoolExecutor.execute(()->{              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);          });      }      myThreadPoolExecutor.shutdown();//      myThreadPoolExecutor.shutdownNow();  }}

好了升级版线程池就优化到这了,后面可能还会出完善版,不断进行优化。

关于“Java线程池优化的方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯