文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java实现手写一个线程池的示例代码

2022-11-13 18:31

关注

概述

线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。

线程池框架设计

我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。

因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:

线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。

为了实现线程池功能,需要考虑下面几个设计要点:

看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型。

现在我们将我们的设计思路转换为代码。

代码实现

阻塞队列的实现


@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    // 容量
    private int capcity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 阻塞的方式添加任务
    public void put(T task) {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.size() >= capcity) {
                log.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            log.debug("task add successfully");
            emptyWaitSet.signal();
        }  finally {
            lock.unlock();
        }
    }

    // 阻塞获取任务
    public T take() {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.isEmpty()) {
                try {
                    log.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            log.debug("take task successfully");
            // 从队列中获取元素
            return task;
        } finally {
            lock.unlock();
        }
    }
}

线程池消费端实现

1.定义执行器接口


public interface Executor {

    
    void execute(Runnable task);
}

2.定义线程池类实现该接口

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {

    
    private BlockingQueue<Runnable> taskQueue;

    
    private int coreSize;

    
    private Set<Worker> workers = new HashSet<>();

    
    public ThreadPool(int coreSize, int capcity) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
    }

    
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                taskQueue.put(task);
            }
        }
    }

    
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 如果任务不为空,或者可以从队列中获取任务
            while (task != null || (task = taskQueue.take()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 执行完后,设置任务为空
                    task = null;
                }
            }

              // 移除工作线程
            synchronized (workers){
                log.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}

3.演示

@Test
    public void testThreadPool1() throws InterruptedException {
        Executor executor = new ThreadPool(2, 4);
        // 提交任务
        for (int i = 0; i < 6; i++) {
            final  int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    log.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }

        Thread.sleep(10000);
    }

运行结果:

获取任务超时设计

目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
    // 容量
    private int capcity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public TimeoutBlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时时间的获取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    // 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        }finally {
            lock.unlock();
        }
    }

    // 带超时时间的增加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capcity){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    // 更新剩余需要等待的时间
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }
}

新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。

拒绝策略设计

目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。

1.定义策略模式的函数式接口


@FunctionalInterface
public interface RejectPolicy<T> {

    
    void reject(BlockingQueue<T> queue, T task);
}

2.添加函数式接口的调用入口

我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    ........

    
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 如果队列超过容量
            if (deque.size()> capcity){
                log.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            }else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

3.修改ThreadPool类

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    .....

    
    private RejectPolicy rejectPolicy;

    // 通过构造方法传入执行的拒绝策略
    public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
        this.rejectPolicy = rejectPolicy;
    }

    
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                //taskQueue.put(task);
                
                // 调用tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

   ....
}

通过构造方法的方式传入要执行的拒绝策略

调用tryPut方法添加任务

4.演示

以上就是Java实现手写一个线程池的示例代码的详细内容,更多关于Java线程池的资料请关注编程网其它相关文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯