这期内容当中小编将会给大家带来有关怎么在Java中使用线程工厂监控线程池,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
ThreadFactory
线程池中的线程从哪里来呢?就是ThreadFoctory
public interface ThreadFactory { Thread newThread(Runnable r);}
Threadfactory里面有个接口,当线程池中需要创建线程就会调用该方法,也可以自定义线程工厂
public class ThreadfactoryText { public static void main(String[] args) { Runnable runnable=new Runnable() { @Override public void run() { int num=new Random().nextInt(10); System.out.println(Thread.currentThread().getId()+"--"+System.currentTimeMillis()+"--睡眠"+num); try { TimeUnit.SECONDS.sleep(num); } catch (InterruptedException e) { e.printStackTrace(); } } }; //创建线程池 使用自定义线程工厂 采用默认的拒绝策略 ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t=new Thread(r); t.setDaemon(true);//设置为守护线程,当主线程运行结束,线程池中线程也会被释放 System.out.println("创建了线程"+t); return t; } }); //提交五个任务 for (int i = 0; i < 5; i++) { executorService.submit(runnable); } }}
当线程提交超过五个任务时,线程池会默认抛出异常
监控线程池
ThreadPoolExcutor提供了一组方法用于监控线程池
int getActiveCount()//获得线程池只当前的获得线程数量long getCompletedTaskCount()//返回线程池完成任务数量int getCorePoolSize()//线程池中核心任务数量int getLargestPoolSize() //返回线程池中曾经达到线程的最大数int getMaximumPoolSize()//返回线程池的最大容量int getPoolSize()//返回线程大小BlockingQueue<Runnable> getQueue()//返回阻塞队列long getTaskCount()//返回线程池收到任务总数
public class Text { public static void main(String[] args) throws InterruptedException { Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getId() + "线程开始执行--" + System.currentTimeMillis()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }; //创建线程池 使用默认线程工厂 有界队列 采用DiscardPolicy策略 ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy()); //提交五个任务 for (int i = 0; i < 30; i++) { executorService.submit(runnable); System.out.println("当前线程核心线程数"+executorService.getCorePoolSize()+",最大线程数:"+executorService.getMaximumPoolSize()+",当前线程池大小:"+executorService.getPoolSize()+"活动线程数:"+executorService.getActiveCount()+",收到任务:"+executorService.getTaskCount()+"完成任务数:"+executorService.getCompletedTaskCount()+"等待任务数:"+executorService.getQueue().size()); TimeUnit.MILLISECONDS.sleep(500); } System.out.println("-------------------"); while (executorService.getActiveCount()>=0)//继续对线程池进行检测 { System.out.println("当前线程核心线程数"+executorService.getCorePoolSize()+",最大线程数:"+executorService.getMaximumPoolSize()+",当前线程池大小:"+executorService.getPoolSize()+"活动线程数:"+executorService.getActiveCount()+",收到任务:"+executorService.getTaskCount()+"完成任务数:"+executorService.getCompletedTaskCount()+"等待任务数:"+executorService.getQueue().size()); Thread.sleep(1000);//每1秒检测一次 } }}
当线程池大小达到了核心线程数,线程会被放在等待队列。当线程池等待队列已满会开启新的线程。当当前线程大小达到最大线程数,等待队列也满了,再提交的话会执行DiscardPolicy策略,直接丢弃这个无法处理的任务,最后30个任务只剩下15个了。
原理如图:
扩展线程池
有时候需要对线程池进行扩展,如在监控每个任务开始和结束时间,或者自定义其他增强功能。
ThreadPoolExecutor线程池提供了两个方法:
protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }
线程池执行某个任务前会执行beforeExecute()方法,执行后会调用afterExecute()方法
查看ThreadPoolExecutor源码,在该类中定义了一个内部类Worker,ThreadPoolExecutor线程池的工作线程就是Worker类的实例,Worker实例在执行时会调用beforeExecute与afterExecute方法。
public void run() { runWorker(this);}final void runWorker(Worker w) { try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } }
部分代码已省略,线程执行前会调用beforeExecute,执行后会调用afterExecute方法。
扩展线程池示例
package com;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Text07 { public static void main(String[] args) { //定义扩展线程池 定义线程池类继承ThreadPoolExecutor,然后重写其他方法 ExecutorService threadPoolExecutor= new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new LinkedBlockingDeque<>()){ //在内部类重写开始方法 @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(t.getId()+"线程准备执行任务"+((Mytask)r).name); } //在内部类重写结束方法 @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println(((Mytask)r).name+"执行完成"); } //线程池退出 @Override protected void terminated() { System.out.println("线程池退出"); } }; for (int i = 0; i < 5; i++) { Mytask mytask=new Mytask("Thread"+i); threadPoolExecutor.execute(mytask); } } private static class Mytask implements Runnable { private String name; public Mytask(String name) { this.name=name; } @Override public void run() { System.out.println(name+"正在被执行"+Thread.currentThread().getId()); try { Thread.sleep(1000);//模拟任务时长 } catch (InterruptedException e) { e.printStackTrace(); } } }}
优化线程池大小
线程池大小对系统性能有一定影响,过大或者过小都无法方法发挥系统最佳性能,不需要非常精确,只要避免极大或者极小就可以了,一般来说线程池大小大姚考虑CPU数量
线程池大小=CPU数量 * 目标CPU使用率*(1+等待时间与计算时间的比)
线程池死锁
如果线程池执行中,任务A在执行过程中提交了任务B,任务B添加到线程池中的等待队列,如果A的结束需要B的执行结果,而B线程需要等待A线程执行完毕,就可能会使其他所有工作线程都处于等待状态,待这些任务在阻塞队列中执行。线程池中没有可以对阻塞队列进行处理的线程,就会一直等待下去照成死锁。
适合给线程池提交相互独立的任务,而不是彼此依赖的任务,对于彼此依赖的任务,可以考虑分别提交给不同的线程池来处理。
线程池异常信息捕获
import java.util.concurrent.ExecutorService;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Text09 { public static void main(String[] args) { //创建线程池 ExecutorService executorService=new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>()); //向线程池中添加两个数相处计算的任务 for (int i = 0; i <5 ; i++) { executorService.submit(new Text(10,i)); } } private static class Text implements Runnable { private int x; private int y; public Text(int x,int y) { this.x=x; this.y=y; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"线程x/y结果的为"+x+"/"+y+"="+(x/y)); } }}
可以看到只有四条结果,实际向线程池提交了五个任务,但是当i==0时,产生了算术异常,线程池把该异常吃掉了,导致我们对该异常一无所知
解决办法:
把submit改为execute
对线程池进行扩展,对submit进行包装
package com;import java.util.concurrent.*;public class Text09 { public static void main(String[] args) { //创建线程池 使用自定义的线程池 ExecutorService executorService=new TranceThreadPoorExcuter(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>()); //向线程池中添加两个数相处计算的任务 for (int i = 0; i <5 ; i++) { executorService.submit(new Text(10,i)); } } public static class Text implements Runnable { public int x; public int y; public Text(int x,int y) { this.x=x; this.y=y; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"线程x/y结果的为"+x+"/"+y+"="+(x/y)); } } //自定义线程池类 对TranceThreadPoorExcuter进行扩展 private static class TranceThreadPoorExcuter extends ThreadPoolExecutor { public TranceThreadPoorExcuter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } //定义一个方法用于传入两个参数 第一个是要接受的任务 第二个是Exception public Runnable warp(Runnable r,Exception e) { return new Runnable() { @Override public void run() { try { r.run(); } catch (Exception e1) { e.printStackTrace(); throw e1; } } }; } //重写submit方法 @Override public Future<?> submit(Runnable task) { return super.submit(warp(task,new Exception("客户跟踪异常"))); } //还可以重写excute方法 }}
上述就是小编为大家分享的怎么在Java中使用线程工厂监控线程池了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网行业资讯频道。