文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Linux高性能编程-线程池

2024-11-29 18:52

关注

1.线程池简介

1.1 什么是线程池?

    线程池就是提前创建好一批线程,通过一个池子来管理所有的线程。当有任务时,从池子中取出一个线程去执行该任务,执行结束后,线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。

    我们通过几个图来讲解一下线程池的作用。

1)常规多线程

图片

    常规多线程方式,每个任务都会创建一个新的线程来处理,任务处理完后线程会销毁。

    优点:在任务量比较少的情况下,任务执行效率比较高。

    缺点:1.随着任务量增多,线程数量会越来越多,线程开销(CPU,内存开销)很大,如果不控制线程数量,系统会出现异常。2.缺乏任务管理机制。3.缺乏多线程管理机制。

2)简单的线程池

图片

    简单线程池通过任务队列和线程池完成。

    新任务产生后存储在任务队列,线程池中的空闲线程从任务队列获取任务并执行。

    优点:1.线程数量可控,能够保证系统安全。2.任务和线程统一管理,方便程序设计。

    缺点:1.没有任务拒绝机制,任务会堆积在任务队列。2.线程池线程数量固定,无法动态调节线程数量,导致任务处理效率不高。

3)完善的线程池

图片

    完善的线程池需要具备一下几个优点:

    线程池被分为核心线程和动态线程,核心线程会一直运行保证基础业务,当任务越来越多的时候,核心线程无法保证任务快速响应,此时需要通过创建动态线程提高线程响应速度。

    当动态线程到达极限时,系统的处理能力到达瓶颈,此时需要启动安全保护机制,通过任务拒绝,保证系统安全。

1.2 线程池优点

通过上述分析,我们了解到线程池有以下优点:

2.线程池设计

2.1 整体设计

    设计一个完善的线程池,我们需要设计几个核心模块:状态管理、线程管理,任务管理。

2.2 详细设计

    线程池定义(struct thread_pool):

typedef struct thread_pool {
    struct list_head head;   //任务队列
    int task_max; //任务队列最大长度
    int task_num; //任务队列实际长度
    int core_num; //核心线程数量
    int dyn_max; //动态线程最大数量
    atomic_int dyn_num; //动态线程实际数量
    pthread_t *core_th; //核心线程pthread_t数组
    pthread_t *dyn_th; //动态线程pthread_t数组
    int status; //线程池状态
    pthread_mutex_t mutex; //互斥锁
    pthread_cond_t cond; //条件变量
} thread_pool_t;

    线程池通过struct thread_pool结构体定义,每个成员的作用已在代码中注释。

    创建线程池:

thread_pool_t* thread_pool_create(int core_num, int dyn_max) {
    thread_pool_t *tp = (thread_pool_t *)malloc(sizeof(thread_pool_t));
    if (!tp) return NULL;

    pthread_mutex_init(&tp->mutex, NULL);
    pthread_cond_init(&tp->cond, NULL);
    list_head_init(&tp->head);
    tp->status = INIT_STATUS;

    tp->task_max = TASK_MAX;
    tp->task_num = 0;
    tp->dyn_max = dyn_max;
    atomic_init(&tp->dyn_num, 0);
    tp->core_num = core_num;
    tp->core_th = malloc(core_num * sizeof(pthread_t));
    for (int i = 0; i < tp->core_num; i++) { //创建核心线程
        thread_arg *th_arg = (thread_arg *)malloc(sizeof(thread_arg));
        *th_arg = (thread_arg) {.tp = tp, .type = CORE_THREAD, .task = NULL};
        pthread_create(&tp->core_th[i], NULL, thread_proc, (void *)th_arg);
    }

    tp->status = RUNNING_STATUS; //设置线程池为running状态

    return tp;
}

    线程池创建时会指定核心线程数量以及最大动态线程数量,核心线程跟着线程池一起创建。

    动态线程根据实际任务量动态创建,为了防止创建过多的动态线程,需限制动态线程最大数量。

    销毁线程池:

void thread_pool_destroy_p(thread_pool_t **ptp) {
    if (!ptp) return;
    thread_pool_t *tp = *ptp;

    while(atomic_load(&tp->dyn_num)) { //等待动态线程全部退出
        usleep(10 * 1000);
    }

    for (int i = 0; i < tp->core_num; i++) { //等待核心线程全部退出
        pthread_join(tp->core_th[i], NULL);
    }

    free(tp->core_th);
    pthread_mutex_destroy(&tp->mutex);
    pthread_cond_destroy(&tp->cond);
    tp->status = STOP_STATUS; //设置线程池为stop状态

    free(tp);
    ptp = NULL;
    return;
}

    线程池销毁需要回收核心线程和动态线程,核心线程采用pthread_join方式回收。

    动态线程设置分离属性,线程池成员dyn_num(原子变量)用于记录仍在工作的动态线程数量,dyn_num等于0时表示当前所有的动态线程都已经退出。

2.2.1 状态管理

    线程池整个生命周期可分为4个状态:

#define INIT_STATUS (1<<0) //init状态
#define RUNNING_STATUS (1<<1) //running状态
#define SHUTDOWN_STATUS (1<<2) //shutdown状态
#define STOP_STATUS (1<<3) //stop状态

2.2.2 线程管理

    核心线程和动态线程处理函数:

void *thread_proc(void *arg) {
    thread_arg *th_arg = (thread_arg *)arg;
    thread_pool_t *tp = th_arg->tp;
    pid_t tid = gettid();
     //动态线程设置成分离模式,方便管理 
     if (th_arg->type == DYN_THREAD) pthread_detach(pthread_self());

    int count = 0;
    task_t *task = NULL;
    while(1) {
        if (th_arg->task) { //创建线程并执行首次任务
            printf("tid:%d first process task\n", tid);
            task = th_arg->task;
            th_arg->task = NULL;
        } else {
            task = thread_get_task(tp); //从任务队列中获取任务并执行
            if (!task) {
                if (tp->status != RUNNING_STATUS) break; //线程池被关闭,线程退出
                count++;
                if ((count >= 10) && (th_arg->type == DYN_THREAD)) {
                    printf("dyn thread 10s break\n");
                    break; //动态线程空闲10秒自动退出
                }
                continue;
            }
            count = 0;
        }
        task->cb(task->arg);
        task_freep(&task);
    }

    free(th_arg);
    if (th_arg->type == DYN_THREAD) {
        atomic_fetch_sub(&tp->dyn_num, 1);
     }
    return NULL;
}

1)核心线程

    核心线程数量固定,在线程池创建时会创建所有的核心线程,线程池退出时会销毁所有的核心线程。

2)动态线程

    动态线程的管理比较复杂,需要根据任务数量做动态调整,任务量大时,动态线程会被创建,提高线程池任务响应速率,任务量小时,空闲的动态线程会被回收,从而减少线程开销。

2.2.3 任务管理

    任务定义(struct task):

typedef enum TASK_TYPE {
    FREE_TYPE,
    NOFREE_TYPE,
}TASK_TYPE;

typedef void (*func)(void *arg);
typedef struct task {
    struct list_head list; //队列节点
    func cb; //回调函数
    void *arg; //回调函数参数
    TASK_TYPE type; //任务参数是否需要释放
}task_t;

    任务通过struct task定义,任务主要成员:

    任务申请流程如下图,创建一个新的任务后,需要做一些检测才能将任务加入线程池,检测不通过则执行任务拒绝,从而保证线程池始终处于安全高效运行状态。

    任务执行完毕后,需释放任务,回收资源。

图片

    线程池一定要做任务管理,任务管理的目的有两个:

3.线程池测试

    测试环境:树莓派4B,4核,4GB。

    分别采用多线程和线程池方式测试CPU密集型和IO密集型任务,对比两种方式性能和效率的差异。每毫秒产生1个任务,总共测试10000个任务。

    通过time命令执行测试程序,记录测试程序执行情况。

    测试代码如下:

    (完整代码请联系博主获取)

#define CORE_THREAD_NUM (4) //核心线程数量
#define DYN_THREAD_MAX (32) //动态线程最大数量
#define TASK_MAX (128) //任务队列任务最大数量

#define ENABLE_THREAD_POOL (0) //是否开启线程池,0:关闭 1:开启
#define TEST_TASK_NUM (10000) //测试任务数量
#define TASK_INTERVAL (1000) //任务产生间隔时间,单位:毫秒

#define TEST_TASK_TYPE (0) //任务类型,0:CPU密集型 1:IO密集型
#define NOP_TIMES (10000000) //CPU密集型任务执行空指令次数
#define RAND_RANGE (1 << 18) //IO密集型任务休眠时间随机范围,单位:毫秒

void cpu_stress() {
    for (int i = 0; i < NOP_TIMES; i++) {
        ;
    }
}

void rand_sleep() {
    srand(time(0));
    int ms = rand() & (RAND_RANGE - 1);
    usleep(ms);
}

//任务处理函数
void task_cb(void *arg) { 
#if !TEST_TASK_TYPE
    cpu_stress();
#else
    rand_sleep();
#endif
    return;
}

atomic_int running_threads;
void *thread_proc1(void *arg) {
    atomic_fetch_add(&running_threads, 1);
    pthread_detach(pthread_self());
    task_t *task = (task_t *)arg;
    task->cb(task->arg);
    free(task);
    atomic_fetch_sub(&running_threads, 1);
    return NULL;
}

int main(int argc, char *argv[]) {

#if ENABLE_THREAD_POOL
    thread_pool_t *tp = thread_pool_create(CORE_THREAD_NUM, DYN_THREAD_MAX);
    if (!tp) {
        printf("thread_pool_create error");
        return -1;
    }

    int seq = 0;
    while(1) {
        usleep(TASK_INTERVAL);
        task_t *task = task_create(task_cb, NULL, FREE_TYPE);
        if (!task) {
            printf("task create error\n");
            usleep(10 * 1000);
            continue;
        }
        int ret = thread_add_task(tp, task);
        if (ret == -1) { //任务拒绝
            task_freep(&task);
            usleep(10 * 1000);
            continue;
        }
        if (seq++ >= TEST_TASK_NUM) {
            thread_pool_exit(tp);
            thread_pool_destroy_p(&tp);
            break;
        }
    }
    printf("thread pool test done------\n");

#else
    atomic_init(&running_threads, 0);
    int seq = 0;
    int old_num = 0;
    while(1) {
        usleep(TASK_INTERVAL);
        task_t *task = task_create(task_cb, NULL, FREE_TYPE);
        if (!task) {
            printf("task create error\n");
            usleep(10 * 1000);
            continue;
        }
        pthread_t th;
        int ret = pthread_create(&th, NULL, thread_proc1, (void *)task);
        if (ret != 0) {
            free(task);
            usleep(10 * 1000);
            continue;
        }

        int num = atomic_load(&running_threads);
        if (old_num < num) {
            old_num = num;
            printf("running_threads:%d\n", num);
        }
        if (seq++ >= TEST_TASK_NUM) break;
    }
    printf("threads test done------\n");
#endif

    return 0;
}

1)CPU密集型场景测试

    测试参数如下:

图片

    多线程测试结果-->:

图片

    CPU使用率397%(已使用完),最高同时创建913个线程,完成测试时间2分钟,用户时间7分51秒,系统时间0.5秒。

    线程池测试结果-->:

图片

    CPU使用率398%(已使用完),核心线程4个,动态线程32个,共36各个线程,完成测试时间1分56秒,用户时间7分41秒,系统时间0.04秒。

    小节:CPU密集场景多线程和线程池方式处理效率相差不大,多线程方式最多同时创建900多个线程,会消耗大量系统资源。线程池方式线程始终控制在36个,比较安全。

2)IO密集型场景测试

    测试参数如下:

图片

    多线程测试结果-->:

图片

    CPU使用率10.2%(已使用完),最高同时创建212个线程,完成测试时间11秒,用户时间0.15秒,系统时间为1秒。

    线程池测试结果-->:

图片

图片

    CPU使用率3.6%(已使用完),核心线程4个,动态线程32个,共36各个线程,完成测试时间20秒,用户时间0.25秒,系统时间为0.32秒。

    小节:IO密集型场景线程处于IO阻塞状态,CPU使用率并不高,此时可以适当增加线程数量来提高CPU利用率。

总结:

来源:物联网心球内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯