文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

【面试心得】C++ 线程池总结

2023-09-12 07:11

关注

什么是线程池

线程池(Thread Pool)是一种多线程编程的设计模式,它用于管理和复用线程,以有效地执行并发任务。线程池由一组预创建的线程组成,这些线程在需要时被分配来执行任务。线程池的核心思想是将线程的创建、销毁和管理工作从任务执行中分离出来,从而提高性能、资源利用率和代码可维护性。

线程池的主要目标是解决以下问题:

  1. 减少线程创建和销毁的开销: 在多线程应用程序中,频繁地创建和销毁线程会导致额外的开销,包括内存分配、上下文切换等。线程池在应用程序启动时创建一组线程,并在任务完成后不立即销毁它们,而是将它们保持在池中以供重用。

  2. 控制并发度: 线程池允许您限制同时执行的线程数量,从而控制系统的并发度。这有助于避免过多的并发线程导致系统资源不足或性能下降的问题。

  3. 提高系统的稳定性和可靠性: 通过管理线程的生命周期,线程池可以提高系统的稳定性,避免因线程创建失败或线程泄漏而导致的问题。

  4. 提高代码可维护性: 使用线程池将任务的执行与线程管理分离,使代码更清晰和可维护。您可以专注于任务的逻辑,而不必担心线程的创建和销毁。

  5. 提高性能: 线程池可以提高多核处理器的利用率,充分利用可用的计算资源,从而加速任务的执行。

总之,线程池是一种有助于管理多线程应用程序的机制,它可以提高性能、资源利用率和代码可维护性,同时降低了多线程编程的复杂性。这就是为什么在许多多线程应用程序中使用线程池的原因。

线程池工作原理

  1. 初始化: 在线程池启动时,会创建一定数量的线程。这些线程通常被称为工作线程。线程池还会创建一个任务队列,用于存储待执行的任务。

  2. 任务提交: 当应用程序有任务需要执行时,它可以将任务提交到线程池。这些任务通常表示为可调用的函数对象(函数指针、Lambda表达式等)。任务提交后,线程池将任务放入任务队列中等待执行。

  3. 任务调度: 线程池中的工作线程会循环地从任务队列中获取任务。一旦工作线程获取到任务,它就会执行该任务。如果没有可用任务,工作线程会等待,直到有任务可执行。

  4. 任务执行: 工作线程执行从任务队列中获取的任务。任务可以并行执行,因为线程池中有多个工作线程,每个工作线程都可以执行不同的任务。

  5. 任务完成: 任务执行完成后,工作线程可以继续从任务队列中获取下一个任务,或者根据线程池的策略决定是否保持线程处于等待状态以等待更多任务。

  6. 线程复用: 工作线程在执行完任务后不会立即销毁,而是返回线程池中等待下一个任务。这样可以减少线程创建和销毁的开销,并提高性能。

  7. 线程池管理: 线程池通常具有管理功能,可以控制线程的数量、动态调整线程池大小、处理异常、记录日志等。这些管理功能有助于线程池的稳定性和性能优化。

  8. 任务完成通知: 通常,线程池允许应用程序检测任务是否完成,并获取任务的返回结果。这可以通过各种机制实现,如回调函数、Future/Promise等。

线程池的工作原理允许多个任务在一组工作线程上并发执行,从而提高了系统的性能和资源利用率。线程池还可以根据需要动态调整线程的数量,以适应不同的工作负载。这使得线程池成为处理并发任务的强大工具。

保障线程池的线程安全

  1. 任务队列的互斥锁: 任务队列是线程池中的关键资源,因此需要使用互斥锁(Mutex)来保护它,确保只有一个线程能够访问或修改任务队列。这样可以避免多个线程同时向队列添加或删除任务而导致的问题。

  2. 条件变量: 除了互斥锁,使用条件变量(Condition Variable)来通知等待任务的线程有新任务可执行。条件变量允许线程等待某个条件满足后再继续执行,这在任务队列为空时非常有用。

  3. 线程池状态管理: 维护线程池的状态,例如标识线程池是否处于运行状态、是否正在销毁等。在操作线程池状态时需要加锁,以确保线程安全。

  4. 资源的合理共享: 确保线程池中的线程在共享资源时进行适当的同步。避免数据竞争问题,如多个线程同时修改同一个变量。

  5. 异常处理: 考虑线程中可能出现的异常情况,特别是在任务执行时,确保异常不会导致线程池崩溃。可以使用 try-catch 块捕获异常,记录日志或采取适当的措施。

  6. 资源管理: 确保线程池中的线程在退出时释放资源,如内存、文件句柄等。防止资源泄漏。

  7. 使用原子操作: 在需要对共享变量进行增减等操作时,使用原子操作,以确保操作的原子性,避免竞态条件。

代码实现

版本一

xthread_pool.h

#pragma once#include #include #include #include #include #include class XTask{public:    virtual int Run() = 0;    std::function is_exit = nullptr;};class XThreadPool{public:    //    /// 初始化线程池    /// @para num 线程数量    void Init(int num);    //    /// 启动所有线程,必须先调用Init    void Start();    //    /// 线程池退出    void Stop();    void AddTask(XTask* task);    XTask* GetTask();    //线程池是否退出    bool is_exit() { return is_exit_; }    int task_run_count() { return task_run_count_; }private:    //线程池线程的入口函数    void Run() ;    int thread_num_ = 0;//线程数量    std::mutex mux_;    std::vector threads_;    std::list tasks_;    std::condition_variable cv_;    bool is_exit_ = false; //线程池退出    //正在运行的任务数量,线程安全    std::atomic task_run_count_ = {0};};

 xthread_pool.cpp

#include "xthread_pool.h"#include using namespace std;///// 初始化线程池/// @para num 线程数量void XThreadPool::Init(int num){    unique_lock lock(mux_);    this->thread_num_ = num;    cout << "Thread pool Init " << num << endl;}///// 启动所有线程,必须先调用Initvoid XThreadPool::Start(){    unique_lock lock(mux_);    if (thread_num_ <= 0)    {        cerr << "Please Init XThreadPool" << endl;        return;    }    if (!threads_.empty())    {        cerr << "Thread pool has start!" << endl;        return;    }    //启动线程    for (int i = 0; i < thread_num_; i++)    {        auto th = new thread(&XThreadPool::Run, this);        threads_.push_back(th);    }}///// 线程池退出void XThreadPool::Stop(){    is_exit_ = true;    cv_.notify_all();    for (auto& th : threads_)    {        th->join();    }    unique_lock lock(mux_);    threads_.clear();}//线程池线程的入口函数void XThreadPool::Run(){    cout << "begin XThreadPool Run " << this_thread::get_id() << endl;    while (!is_exit())    {        auto task = GetTask();        if (!task)continue;        ++task_run_count_;        try        {            task->Run();        }        catch (...)        {        }        --task_run_count_;    }    cout << "end XThreadPool Run " << this_thread::get_id() << endl;}void XThreadPool::AddTask(XTask* task){    unique_lock lock(mux_);    tasks_.push_back(task);    task->is_exit = [this] {return is_exit(); };    lock.unlock();    cv_.notify_one();}XTask* XThreadPool::GetTask(){    unique_lock lock(mux_);    if (tasks_.empty())    {        cv_.wait(lock);    }    if (is_exit())        return nullptr;    if (tasks_.empty())        return nullptr;    auto task = tasks_.front();    tasks_.pop_front();    return task;}

main.cpp 

#include "xthread_pool.h"#include using namespace std;class MyTask :public XTask{public:    int Run()    {        cout << "================================================" << endl;        cout << this_thread::get_id()<<" Mytask " << name << endl;        cout << "================================================" << endl;        for (int i = 0; i < 10; i++)        {            if (is_exit())break;            cout << "." << flush;            this_thread::sleep_for(500ms);        }        return 0;    }    std::string name = "";};int main(int argc, char* argv[]){      XThreadPool pool;    pool.Init(16);    pool.Start();    MyTask task1;    task1.name = "test name 001";    pool.AddTask(&task1);    MyTask task2;    task2.name = "test name 002";    pool.AddTask(&task2);    this_thread::sleep_for(100ms);    cout << "task run  count = " << pool.task_run_count() << endl;    this_thread::sleep_for(1s);    pool.Stop();    cout << "task run  count = " << pool.task_run_count() << endl;    getchar();    return 0;}

版本二(智能指针)

xthread_pool.h

#pragma once#include #include #include #include #include #include class XTask{public:    virtual int Run() = 0;    std::function is_exit = nullptr;};class XThreadPool{public:    //    /// 初始化线程池    /// @para num 线程数量    void Init(int num);    //    /// 启动所有线程,必须先调用Init    void Start();    //    /// 线程池退出    void Stop();    //void AddTask(XTask* task);    void AddTask(std::shared_ptr task);    std::shared_ptr GetTask();    //线程池是否退出    bool is_exit() { return is_exit_; }    int task_run_count() { return task_run_count_; }private:    //线程池线程的入口函数    void Run() ;    int thread_num_ = 0;//线程数量    std::mutex mux_;    //std::vector threads_;    //线程列表 指针指针版本    std::vector< std::shared_ptr > threads_;    //std::list tasks_;    std::list > tasks_;        std::condition_variable cv_;    bool is_exit_ = false; //线程池退出    //正在运行的任务数量,线程安全    std::atomic task_run_count_ = {0};};

xthread_pool.cpp 

#include "xthread_pool.h"#include using namespace std;///// 初始化线程池/// @para num 线程数量void XThreadPool::Init(int num){    unique_lock lock(mux_);    this->thread_num_ = num;    cout << "Thread pool Init " << num << endl;}///// 启动所有线程,必须先调用Initvoid XThreadPool::Start(){    unique_lock lock(mux_);    if (thread_num_ <= 0)    {        cerr << "Please Init XThreadPool" << endl;        return;    }    if (!threads_.empty())    {        cerr << "Thread pool has start!" << endl;        return;    }    //启动线程    for (int i = 0; i < thread_num_; i++)    {        //auto th = new thread(&XThreadPool::Run, this);        auto th = make_shared(&XThreadPool::Run, this);        threads_.push_back(th);    }}///// 线程池退出void XThreadPool::Stop(){    is_exit_ = true;    cv_.notify_all();    for (auto& th : threads_)    {        th->join();    }    unique_lock lock(mux_);    threads_.clear();}//线程池线程的入口函数void XThreadPool::Run(){    cout << "begin XThreadPool Run " << this_thread::get_id() << endl;    while (!is_exit())    {        auto task = GetTask();        if (!task)continue;        ++task_run_count_;        try        {            task->Run();        }        catch (...)        {        }        --task_run_count_;    }    cout << "end XThreadPool Run " << this_thread::get_id() << endl;}void XThreadPool::AddTask(std::shared_ptr task){    unique_lock lock(mux_);    tasks_.push_back(task);    task->is_exit = [this] {return is_exit(); };    lock.unlock();    cv_.notify_one();}std::shared_ptr XThreadPool::GetTask(){    unique_lock lock(mux_);    if (tasks_.empty())    {        cv_.wait(lock);    }    if (is_exit())        return nullptr;    if (tasks_.empty())        return nullptr;    auto task = tasks_.front();    tasks_.pop_front();    return task;}

 main.cpp

#include "xthread_pool.h"#include using namespace std;class MyTask :public XTask{public:    int Run()    {        cout << "================================================" << endl;        cout << this_thread::get_id()<<" Mytask " << name << endl;        cout << "================================================" << endl;        for (int i = 0; i < 10; i++)        {            if (is_exit())break;            cout << "." << flush;            this_thread::sleep_for(500ms);        }        return 0;    }    std::string name = "";};int main(int argc, char* argv[]){      XThreadPool pool;    pool.Init(16);    pool.Start();    {        auto task3 = make_shared();        task3->name = "test shared 003";        pool.AddTask(task3);        auto task4 = make_shared();        task4->name = "test shared 004";        pool.AddTask(task4);    }    this_thread::sleep_for(100ms);    cout << "task run  count = " << pool.task_run_count() << endl;    this_thread::sleep_for(1s);    pool.Stop();    cout << "task run  count = " << pool.task_run_count() << endl;    getchar();    return 0;}

来源地址:https://blog.csdn.net/weixin_42809675/article/details/132802041

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯