文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

多线程开发实用技巧

2024-12-01 14:30

关注

动态拓展容量 触发方式,我们可以通过 apollo 配置或者通过延迟任务定期扫描自动拓展

@Value("${corePoolSize:2}")
private Integer corePoolSize;
@Value("${corePoolSize:8}")
private Integer maximumPoolSize;



taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaximumPoolSize(maximumPoolSize);

可以通过这种方式对生产环境线程池参数进行动态优化。

判断线程池执行结束

线程池的使用可以提高我们并发程序线程复用,以及提供对线程的管理能力。提高线程的利用率,提升性能。但是对于一些场景我们需要知道当前线程池中提交的任务是否执行完毕,我们可以通过一下4种方式判断。

使用 isTerminated 方法

threadPool.isTerminated() 常用来判断线程池是否结束,结束了为TRUE. 使用 threadPool.isTerminated() 方法,必须在shutdown()方法关闭线程池之后才能使用,否则isTerminated()永不为TRUE,线程将一直阻塞在该判断的地方,导致程序最终崩溃。

public class ThreadPoolCompleted {
public static void main(String[] args) {

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024));

addTask(threadPool);

isCompleted(threadPool);

System.out.println();
System.out.println("线程池任务执行完成!");
}


private static void isCompleted(ThreadPoolExecutor threadPool) {
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}


private static void addTask(ThreadPoolExecutor threadPool) {

final int taskCount = 5;

for (int i = 0; i < taskCount; i++) {
final int finalI = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
try {

int sleepTime = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("任务%d执行完成", finalI));
}
});
}
}
}

使用 getCompletedTaskCount 方法

我们可以通过判断线程池中的计划执行任务数和已完成任务数,来判断线程池是否已经全部执行完,如果计划执行任务数=已完成任务数,那么线程池的任务就全部执行完了,否则就未执行完。

private static void isCompletedByTaskCount(ThreadPoolExecutor threadPool) {
while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount()) {
}
}

方法解析:

PS: 此实现方法的优点是无需关闭线程池。 它的缺点是getTaskCount() 和getCompletedTaskCount(),返回的是一个近似值,因为线程池中的任务和线程的状态可能在计算过程中动态变化,所以它们两个返回的都是一个近似值。

使用 CountDownLatch

CountDownLatch 是一个栅栏类似计数器,我们创建了一个包含 N 个任务的计数器,每个任务执行完计数器 -1,直到计数器减为 0 时,说明所有的任务都执行完了

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024));

CountDownLatch countDownLatch = new CountDownLatch(2);

for (int i = 0; i < 10; i++) {
int finalI = i;
threadPool.submit(() -> {

try {

int sleepTime = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(Math.max(sleepTime, 2));
} catch (InterruptedException e) {
System.err.println("InterruptedException err : " + e.getMessage());
}

System.out.println(gettime() + " task running : " + finalI);
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException t) {
System.err.println("InterruptedException err : " + t.getMessage());
}

CountDownLatch 写法很优雅,且无需关闭线程池,但它的缺点是只能使用一次,CountDownLatch创建之后不能被重复使用,也就是说 CountDownLatch 可以理解为只能使用一次的计数器。

使用 CyclicBarrier

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024));
final int taskCount = 5;

CyclicBarrier cyclicBarrier = new CyclicBarrier(taskCount, new Runnable() {
@Override
public void run() {

System.out.println();
System.out.println("线程池所有任务已执行完!");
}
});

for (int i = 0; i < taskCount; i++) {
final int finalI = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
try {

int sleepTime = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(sleepTime);
System.out.println(String.format("任务%d执行完成", finalI));

cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}

执行结果:

**CyclicBarrier 构造方法核心参数: **

await 方法: 在 CyclicBarrier 上进行阻塞等待,当调用此方法时 CyclicBarrier 的内部计数器会 -1,直到发生以下情形之一: 在 CyclicBarrier 上等待的线程数量达到 parties,也就是计数器的声明数量时,则所有线程被释放,继续执行。 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。

reset 方法: 使得CyclicBarrier回归初始状态,直观来看它做了两件事: 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。 将是否破损标志位 broken 置为 false。 优缺点分析 CyclicBarrier 从设计的复杂度到使用的复杂度都高于 CountDownLatch,相比于 CountDownLatch 来说它的优点是可以重复使用(只需调用 reset 就能恢复到初始状态),缺点是使用难度较高。

4 种方法总结:

使用 isTerminated 方法判断:通过判断线程池的完成状态来实现,需要关闭线程池,一般情况下不建议使用。

使用 getCompletedTaskCount 方法判断:通过计划执行总任务量和已经完成总任务量,来判断线程池的任务是否已经全部执行,如果相等则判定为全部执行完成。但因为线程个体和状态都会发生改变,所以得到的是一个大致的值,可能不准确。

使用 CountDownLatch 判断:相当于一个线程安全的单次计数器,使用比较简单,且不需要关闭线程池,是比较常用的判断方法。

使用 CyclicBarrier 判断:相当于一个线程安全的重复计数器,但使用较为复杂,所以日常项目中使用的较少。

线程池提交速度限制

对于线程池提交速率限制,我们可以通过信号量进行限制,这样可以避免触发线程池的拒绝策略,将逻辑处理能力限制功能前置,可以做到线程池的职责单一,负责提交而去做额外的事情。 下面是一个简单的例子:

// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024));
final int taskCount = 5; // 任务总数
// 循环计数器
Semaphore semaphore = new Semaphore(2);
// 添加任务
for (int i = 0; i < taskCount; i++) {
final int finalI = i;
threadPool.submit(new Runnable() {
@Override
public void run() {
try {
// 获取信号量
semaphore.acquire();

// 随机休眠 4s
TimeUnit.SECONDS.sleep(4);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String format1 = sdf.format(new Date());

System.out.println(String.format("%s 任务%d执行完成", format1, finalI));

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
});
}

执行结果:

我们可以看到,我们同一个时间只能获取两个信号量,也就是两个线程能进行执行,如果新进入线程调用 ​semaphore.acquire();​方法进入线程阻塞,需要等待有信号量释放过后才能进入新的调用。释放方法 semaphore.release();.

延迟任务执行

Java 单机延迟任务我们可以通过, Delayed + DelayQueue� 实现:

实现 Delayed 接口

实现 Delayed 接口 ,自定义消息体/延迟任务对象

public class DelayMessage<T> implements Delayed {

private long activeTime;

private T data;

public DelayMessage(long activeTime, T data) {
// 将传入的时间转换为超时的时刻
this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS)
+ System.nanoTime();
this.data = data;
}

public long getActiveTime() {
return activeTime;
}

public T getData() {
return data;
}


@Override
public int compareTo(Delayed o) {
// 订单剩余时间-当前传入的时间= 实际剩余时间(单位纳秒)
long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
// 根据剩余时间判断等于0 返回1 不等于0
// 有可能大于0 有可能小于0 大于0返回1 小于返回-1
return (d == 0) ? 0 : ((d > 0) ? 1 : -1);
}


@Override
public long getDelay(TimeUnit unit) {
// 剩余时间= 到期时间-当前系统时间,系统一般是纳秒级的,所以这里做一次转换
return unit.convert(activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}

创建任务任务执行方法

创建任务执行线程、方法,去调用 delayQueue获取延迟任务对象。 通常情况下我们可以定义一个独立的线程去处理这些任务。


private void executeThread() {
while (true) {
DelayMessage<Object> task = null;
try {
task = delayQueue.take();
processTask(task);
} catch (Exception e) {
System.err.println("延时任务执行失败 task:" + task + " err:" + e);
}
}
}


private void processTask(DelayMessage<Object> task) {
if (task == null) {
return;
}
Object data = task.getData();
System.out.println("out:" + data + " run time:" + System.currentTimeMillis());
}

管理 DelayQueue� 任务

public class DelayQueueManager {

private DelayQueue<DelayMessage<Object>> delayQueue = new DelayQueue<>();

private static DelayQueueManager instance = new DelayQueueManager();

public static DelayQueueManager getInstance() {
return instance;
}

private DelayQueueManager() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Thread(this::executeThread));
}


public void put(DelayMessage<Object> task) {
System.out.println("加入延时任务 delay= " + task.getDelay(TimeUnit.MILLISECONDS) + "ms");
delayQueue.put(task);
}


private void executeThread() {
while (true) {
DelayMessage<Object> task = null;
try {
task = delayQueue.take();
processTask(task);
} catch (Exception e) {
System.err.println("延时任务执行失败 task:" + task + " err:" + e);
}
}
}


private void processTask(DelayMessage<Object> task) {
if (task == null) {
return;
}
Object data = task.getData();
System.out.println("out:" + data + " run time:" + System.currentTimeMillis());
}
}

测试程序

测试代码如下:

public static void main(String[] args) {
DelayQueueManager delayQueueManager = new DelayQueueManager();
System.out.println("start " + System.currentTimeMillis());
delayQueueManager.put(new DelayMessage<>(5000L, "Hello Tom"));
delayQueueManager.put(new DelayMessage<>(8000L, "Hello Ktm"));
delayQueueManager.put(new DelayMessage<>(2000L, "Hello Star"));
}

输出结果如下:

来源:运维开发故事内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯