Linux知识点 – Linux多线程(三)
文章目录
一、线程同步
1.概念理解
持有锁的线程会频繁进入临界区申请临界资源,造成其他进程饥饿的问题;
这本身是没有错的,但是不合理;
线程同步:就是线程按照一定的顺序,进行临界资源的访问;主要就是为了解决访问临界资源和理性的问题;在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题;
2.条件变量
-
当我们申请临界资源前,需要先做临界资源是否存在的检测,做检测的本质也是访问临界资源;
-
对临界资源的检测,也一定是要在加锁和解锁之间的;
-
常规的方式检测条件是否就绪,注定了我们必须要频繁申请和释放锁,我们可以使用条件变量来完成检测:
(1)资源未就绪的时候,不要让线程再频繁检测,让线程等待;
(2)当条件就绪时,通知对应的线程,让其进行资源的申请和访问; -
初始化条件变量:
-
条件不满足时,等待:
-
条件满足时,发通知:
broadcast是将等待的线程全部唤醒;
signal是将特定的线程唤醒;
注:pthread库返回值都是成功返回0,失败返回错误码;
3.使用条件变量进行线程同步
按照一定顺序控制线程:
#include #include #include #include #include using namespace std;#define TNUM 4//共四个线程typedef void (*func_t) (const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);//定义函数指针class ThreadData{public: ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond) : _name(name) , _func(func) , _pmtx(pmtx) , _pcond(pcond) {}public: string _name;//线程名 func_t _func;//线程回调的函数 pthread_mutex_t* _pmtx;//锁 pthread_cond_t* _pcond;//条件变量};void func1(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(true) { pthread_cond_wait(pcond, pmtx);//默认该线程执行的时候,wait代码被执行,当前线程会立即被阻塞 //阻塞就是将当前进程放进一个队列中去等待,并且再等待条件满足后被唤醒 cout << name << "running -- 播放" << endl; }}void func2(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(true) { pthread_cond_wait(pcond, pmtx); cout << name << "running -- 下载" << endl; }}void func3(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(true) { pthread_cond_wait(pcond, pmtx); cout << name << "running -- 刷新" << endl; }}void func4(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(true) { pthread_cond_wait(pcond, pmtx); cout << name << "running -- 扫描" << endl; }}//每一个线程都进入Entry接口,在entry接口内调用自己的函数void* Entry(void* args){ ThreadData* td = (ThreadData*)args;//td在每一个线程自己私有的栈空间中保存 td->_func(td->_name, td->_pmtx, td->_pcond);//这是一个函数,调用完就返回这里 delete td;//需要在td使用完后进行销毁 return nullptr;}int main(){ pthread_mutex_t mtx; //锁 pthread_cond_t cond; //条件变量 pthread_mutex_init(&mtx, nullptr); pthread_cond_init(&cond, nullptr); pthread_t tids[TNUM]; func_t funcs[TNUM] = {func1, func2, func3, func4}; for(int i = 0; i < TNUM; i++) { string name = "Thread "; name += to_string(i + 1); ThreadData* td = new ThreadData(name, funcs[i], &mtx, &cond); pthread_create(tids + i, nullptr, Entry, (void*)td); } sleep(5);//主线程sleep,新线程创建出来都在wait while(true) { cout << "resume thread run code ..." << endl; pthread_cond_signal(&cond);//唤醒在指定条件变量下等待的线程,不用指定线程,因为wait的时候线程已经在队列中排队了 sleep(1); } for(int i = 0; i < TNUM; i++) { pthread_join(tids[i], nullptr); cout << "thread: " << tids[i] << "quit" << endl; } pthread_mutex_destroy(&mtx); pthread_cond_destroy(&cond); return 0;}
上面的代码创建了局部的锁和条件变量,创建了四个新线程,将线程名、回调的函数地址、锁和条件变量的地址都放进了一个类对象中;
在创建线程的函数中,每个线程都调用的是一个Entry入口函数,在Entry接口内调用自己的函数;
在线程执行的函数中,默认该线程执行的时候,wait代码被执行,当前线程会立即被阻塞;阻塞就是将当前进程放进一个队列中去等待,并且再等待条件满足后被唤醒;
当主线程执行到pthread_cond_signal函数时,会唤醒在指定条件变量下等待的线程;不用指定线程,因为wait的时候线程已经在队列中排队了;
运行结果:
主线程在等待了5s后,开始调用新线程执行任务,并且新线程是按照一定的顺序被唤醒的;
如果使用pthread_cond_broadcast接口一次唤醒一批线程:
int main(){ pthread_mutex_t mtx; //锁 pthread_cond_t cond; //条件变量 pthread_mutex_init(&mtx, nullptr); pthread_cond_init(&cond, nullptr); pthread_t tids[TNUM]; func_t funcs[TNUM] = {func1, func2, func3, func4}; for(int i = 0; i < TNUM; i++) { string name = "Thread "; name += to_string(i + 1); ThreadData* td = new ThreadData(name, funcs[i], &mtx, &cond); pthread_create(tids + i, nullptr, Entry, (void*)td); } sleep(5);//主线程sleep,新线程创建出来都在wait while(true) { cout << "resume thread run code ..." << endl; //pthread_cond_signal(&cond);//唤醒在指定条件变量下等待的线程,不用指定线程,因为wait的时候线程已经在队列中排队了 pthread_cond_broadcast(&cond);//一次唤醒一批线程 sleep(1); } for(int i = 0; i < TNUM; i++) { pthread_join(tids[i], nullptr); cout << "thread: " << tids[i] << "quit" << endl; } pthread_mutex_destroy(&mtx); pthread_cond_destroy(&cond); return 0;}
运行结果:
等待队列中所有的线程被一次全部唤醒;
回调函数临界区加入加锁和解锁:
wait一定要在加锁和解锁之间进行;
加入了quit标志位,任务执行完后线程退出;
#include #include #include #include #include using namespace std;#define TNUM 4//共四个线程typedef void (*func_t) (const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);//定义函数指针volatile bool quit = false;//加入quit标志位class ThreadData{public: ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond) : _name(name) , _func(func) , _pmtx(pmtx) , _pcond(pcond) {}public: string _name;//线程名 func_t _func;//线程回调的函数 pthread_mutex_t* _pmtx;//锁 pthread_cond_t* _pcond;//条件变量};void func1(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(!quit)//加入退出判断 { //wait一定要在加锁和解锁之间进行 pthread_mutex_lock(pmtx); //if(临界资源未就绪) 等待 pthread_cond_wait(pcond, pmtx);//默认该线程执行的时候,wait代码被执行,当前线程会立即被阻塞 //阻塞就是将当前进程放进一个队列中去等待,并且再等待条件满足后被唤醒 cout << name << "running -- 播放" << endl; pthread_mutex_unlock(pmtx); }}void func2(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(!quit) { pthread_mutex_lock(pmtx); pthread_cond_wait(pcond, pmtx); cout << name << "running -- 下载" << endl; pthread_mutex_unlock(pmtx); }}void func3(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(!quit) { pthread_mutex_lock(pmtx); pthread_cond_wait(pcond, pmtx); cout << name << "running -- 刷新" << endl; pthread_mutex_unlock(pmtx); }}void func4(const string &name, pthread_mutex_t* pmtx, pthread_cond_t* pcond){ while(!quit) { pthread_mutex_lock(pmtx); pthread_cond_wait(pcond, pmtx); cout << name << "running -- 扫描" << endl; pthread_mutex_unlock(pmtx); }}//每一个线程都进入Entry接口,在entry接口内调用自己的函数void* Entry(void* args){ ThreadData* td = (ThreadData*)args;//td在每一个线程自己私有的栈空间中保存 td->_func(td->_name, td->_pmtx, td->_pcond);//这是一个函数,调用完就返回这里 delete td;//需要在td使用完后进行销毁 return nullptr;}int main(){ pthread_mutex_t mtx; //锁 pthread_cond_t cond; //条件变量 pthread_mutex_init(&mtx, nullptr); pthread_cond_init(&cond, nullptr); pthread_t tids[TNUM]; func_t funcs[TNUM] = {func1, func2, func3, func4}; for(int i = 0; i < TNUM; i++) { string name = "Thread "; name += to_string(i + 1); ThreadData* td = new ThreadData(name, funcs[i], &mtx, &cond); pthread_create(tids + i, nullptr, Entry, (void*)td); } sleep(5);//主线程sleep,新线程创建出来都在wait int cnt = 10; while(cnt) { cout << "resume thread run code ..." << cnt-- << endl; pthread_cond_signal(&cond);//唤醒在指定条件变量下等待的线程,不用指定线程,因为wait的时候线程已经在队列中排队了 //pthread_cond_broadcast(&cond);//一次唤醒一批线程 sleep(1); } cout << "control done" << endl; quit = true; pthread_cond_broadcast(&cond);//再唤醒一下线程,让其检测quit信号 for(int i = 0; i < TNUM; i++) { pthread_join(tids[i], nullptr); cout << "thread: " << tids[i] << "quit" << endl; } pthread_mutex_destroy(&mtx); pthread_cond_destroy(&cond); return 0;}
运行结果:
二、生产者消费者模型
1.概念
生产者消费者模型就是一种多线程运作的模型,就像超市一样,生产者生产了商品运送到超市售卖,而消费者从超市里购买商品;
其中,生产者和消费者都是给线程进行了角色化,不同的线程执行不同的职能,超市则是一个数据的缓冲区,商品就是数据;
- 3种关系:
生产者和生产者:竞争、互斥的关系;
消费者和消费者:竞争、互斥的关系;
生产者和消费者:互斥和同步的关系; - 2种角色:
生产者、消费者; - 一个交易场所:
超市;
这个模型能够让生产者和消费者线程之间实现解耦,提高效率;
当生产者生产了商品,就能够给消费者同步信息,唤醒消费者线程;
当消费者消费之后,就能给生产者同步信息,唤醒生产者线程,继续生产;
可以让生产者和消费者线程互相同步;
在逻辑层面上解耦消费者和生产者,能够提高效率
重点是给线程赋予了角色;
需要消除生产中的状态,避免数据不一致;
- 生产过程:
生产者将商品生产出来放到仓库,消费者从仓库取走商品;
生产和消费的过程不仅于此,生产者生产数据,消费者使用数据都需要花时间;
2.基于BlockingQueue的生产者消费者模型
- BlockingQueue:阻塞队列
当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出;
3.单生产者单消费者模型
BlockQueue.hpp:
#include #include #include #include using namespace std;const int gDefaultCap = 5;template <class T>class BlockQueue{private: bool isQueueEmpty() { return _bq.size() == 0; } bool isQueueFull() { return _bq.size() == _capacity; }public: BlockQueue(int capacity = gDefaultCap) : _capacity(capacity) { pthread_mutex_init(&_mtx, nullptr); pthread_cond_init(&_Empty, nullptr); pthread_cond_init(&_Full, nullptr); } void push(const T &in) // 生产者放数据 { pthread_mutex_lock(&_mtx); // 1.先检测当前的临界资源是否满足访问条件 // pthread_cond_wait是在临界区中的,此时进程是持有锁的,如果去等待了,锁怎么办? // pthread_cond_wait第二个参数是一个锁,当此进程成功挂起后,传入的锁,会被自动释放 // 当此进程被唤醒时,从哪里阻塞的,就从那里唤醒,被唤醒的时候,此进程还是在临界区内部的 // 当被唤醒的时候,pthread_cond_wait会帮助此线程获取锁 // pthread_cond_wait:只要是一个函数,就有可能调用失败,也有可能存在伪唤醒的情况 // 因此条件变量的使用规范:使用while循环持续进行条件检测 // 这样在访问临界资源时,就能100%确定资源是就绪的 while (isQueueFull()) { pthread_cond_wait(&_Full, &_mtx); } // 2.访问临界资源 _bq.push(in); // 加入控制策略:当队列中数据量过半后,才唤醒消费者线程 if (_bq.size() >= _capacity / 2) { pthread_cond_signal(&_Empty); // 生产者放了数据后,就唤起消费者线程,通知其消费 } pthread_mutex_unlock(&_mtx); // 发信号在解锁之前和之后都是可以的 } void pop(T *out) { pthread_mutex_lock(&_mtx); while (isQueueEmpty()) { pthread_cond_wait(&_Empty, &_mtx); } *out = _bq.front(); _bq.pop(); pthread_mutex_unlock(&_mtx); pthread_cond_signal(&_Full); // 消费者取走数据后,就唤起生产者线程,通知其生产 } ~BlockQueue() { pthread_mutex_destroy(&_mtx); pthread_cond_destroy(&_Empty); pthread_cond_destroy(&_Full); }private: queue<T> _bq; // 阻塞队列 int _capacity; // 容量上限 pthread_mutex_t _mtx; // 通过互斥锁保证队列安全 pthread_cond_t _Empty; // 同来表示bq 是否为空的条件 pthread_cond_t _Full; // 同来表示bq 是否为满的条件};
-
生产者生产数据的流程:
1.先检测当前的临界资源是否满足访问条件;
2.访问临界资源; -
pthread_cond_wait是在临界区中的,此时进程是持有锁的,如果去等待了,锁怎么办?
pthread_cond_wait第二个参数是一个锁,当此进程成功挂起后,传入的锁,会被自动释放; -
当此进程被唤醒时,从哪里阻塞的,就从哪里唤醒,被唤醒的时候,此进程还是在临界区内部的;
当被唤醒的时候,pthread_cond_wait会帮助此线程获取锁; -
pthread_cond_wait:只要是一个函数,就有可能调用失败,也有可能存在伪唤醒的情况;
因此条件变量的使用规范:使用while循环持续进行条件检测;
这样在访问临界资源时,就能100%确定资源是就绪的;
ConPod.cc:
#include"BlockQueue.hpp"using namespace std;void* consumer(void* args){ BlockQueue<int>* bqueue = (BlockQueue<int>*)args; while(true) { int a; bqueue->pop(&a); cout << "消费一个数据:" << a << endl; sleep(1); } return nullptr;}void* productor(void* args){ BlockQueue<int>* bqueue = (BlockQueue<int>*)args; int a = 1; while(true) { bqueue->push(a++); cout << "生产一个数据:" << a << endl; } return nullptr;}int main(){ BlockQueue<int>* bqueue = new BlockQueue<int>(); pthread_t c, p; pthread_create(&c, nullptr, consumer, bqueue); pthread_create(&p, nullptr, productor, bqueue); pthread_join(c, nullptr); pthread_join(p, nullptr); delete bqueue; return 0;}
运行结果:
- 注:效率高:在于利用缓冲区,提高了生产和消费线程的并发度;
4.多生产者多消费者模型
Task.hpp:
#pragma once#include #include typedef std::function<int(int, int)> func_t;class Task{public: Task(){} Task(int x, int y, func_t func):x_(x), y_(y), func_(func) {} int operator ()() { return func_(x_, y_); }public: int x_; int y_; func_t func_;};
封装一个Task类,队列中存储这个类,类中能够调用回调函数;
BlockQueue.hpp:(同上)
ConPod.cc:
#include"BlockQueue.hpp"#include"Task.hpp"#include using namespace std;int myAdd(int x, int y){ return x + y;}void* consumer(void* args){ BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args; while(true) { //获取任务 Task t; bqueue->pop(&t); //完成任务 cout << pthread_self() << "consumer: " << t.x_ << "+" << t.y_ << "=" << t() << endl; sleep(1); } return nullptr;}void* productor(void* args){ BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args; int a = 1; while(true) { //制作任务 int x = rand() % 10 + 1; usleep(rand()%1000); int y = rand() % 5 + 1; Task t(x, y, myAdd); //生产任务 bqueue->push(t); cout << pthread_self() << "productor: " << t.x_ << "+" << t.y_ << "=?" << endl; sleep(1); } return nullptr;}int main(){ srand((uint64_t)time(nullptr) ^ getpid()); BlockQueue<Task>* bqueue = new BlockQueue<Task>(); pthread_t c[2], p[2]; pthread_create(c, nullptr, consumer, bqueue); pthread_create(c + 1, nullptr, consumer, bqueue); pthread_create(p, nullptr, consumer, bqueue); pthread_create(p + 1, nullptr, productor, bqueue); pthread_join(c[0], nullptr); pthread_join(c[1], nullptr); pthread_join(p[0], nullptr); pthread_join(p[1], nullptr); delete bqueue; return 0;}
结果:
- 注:多生产和多消费模型中,生产者线程生产商品可以是并发的,但是向仓库中输送商品的行为是互斥的;
同理,消费者线程获取商品的行为是互斥的,但是处理任务的行为可以是并发的;
5.锁的封装
lockGuard.hpp
#pragma once#include #include class Mutex{public: Mutex(pthread_mutex_t *mtx) : _pmtx(mtx) { } void lock() { pthread_mutex_lock(_pmtx); } void unlock() { pthread_mutex_unlock(_pmtx); } ~Mutex() {}private: pthread_mutex_t *_pmtx;};class lockGuard{public: lockGuard(pthread_mutex_t *mtx) : _mtx(mtx) { _mtx.lock(); } ~lockGuard() { _mtx.unlock(); }private: Mutex _mtx;};
- RAII风格的加锁方式:
这里面的两个类成员中并没有真实的锁,只是传入锁的地址来进行对象的构造,进而在构造的时候就进行加锁操作,在对象析构的时候自动进行解锁;
BlockQueue.hpp:
#pragma once#include #include #include #include #include"lockGuard.hpp"using namespace std;const int gDefaultCap = 5;template <class T>class BlockQueue{private: bool isQueueEmpty() { return _bq.size() == 0; } bool isQueueFull() { return _bq.size() == _capacity; }public: BlockQueue(int capacity = gDefaultCap) : _capacity(capacity) { pthread_mutex_init(&_mtx, nullptr); pthread_cond_init(&_Empty, nullptr); pthread_cond_init(&_Full, nullptr); } void push(const T &in) // 生产者放数据 { lockGuard lockguard(&_mtx);//自动调用构造函数,加锁 while (isQueueFull()) { pthread_cond_wait(&_Full, &_mtx); } _bq.push(in); if (_bq.size() >= _capacity / 2) { pthread_cond_signal(&_Empty);费 } //自动调用析构函数,解锁 } void pop(T *out) { lockGuard lockguard(&_mtx);//自动调用构造函数,加锁 while (isQueueEmpty()) { pthread_cond_wait(&_Empty, &_mtx); } *out = _bq.front(); _bq.pop(); pthread_cond_signal(&_Full); //自动调用析构函数,解锁 } ~BlockQueue() { pthread_mutex_destroy(&_mtx); pthread_cond_destroy(&_Empty); pthread_cond_destroy(&_Full); }private: queue<T> _bq; // 阻塞队列 int _capacity; // 容量上限 pthread_mutex_t _mtx; // 通过互斥锁保证队列安全 pthread_cond_t _Empty; // 同来表示bq 是否为空的条件 pthread_cond_t _Full; // 同来表示bq 是否为满的条件};
构造lockGuard对象的时候,就已经加锁完成了;
析构的时候,自动解锁;
三、POSIX信号量
1.信号量的概念与使用
共享资源:任何一个时刻只有一个执行流在进行访问,共享资源是被当作整体使用的,执行流之间都是互斥的;
如果一个共享资源不被当做一个整体,而让不同的执行流访问不同的区域,就可以多执行流并发访问了,不同执行流只有在访问同一个区域的时候才需要进行互斥;
当前共享资源中还有多少份资源,特定的执行流使用可以是否可以得到一个共享资源,这些都可以通过信号量来实现;
- 信号量的本质,就是一个计数器;
访问临界资源的时候,必须先申请信号量资源(sem- -,预定资源,P操作),使用完毕信号量资源(sem++, 释放资源,V操作);
2.信号量的使用场景
(1)有共享资源;
(2)共享资源可以被局部性访问;
(3)需要对局部性资源的数量进行描述;
3.信号量接口
-
信号量初始化:
参数:
sem:信号量对象
pshared:是否共享
value:初始默认值(计数器的值) -
申请信号量(P操作)
wait是会默认阻塞一个进程,直到申请到信号量,将信号量–; -
释放信号量(V操作)
将信号量++;
4.基于环形队列的生产消费模型
- 使用数组实现环形队列:
(1)当下标走到数组尾部的时候,下一个下标是数组头部,为了实现这一点,每次下标变动时,下标值都需要 %= n;
(2)环形队列的逻辑结构是环形的,物理结构是数组;
(3)生产消费模型需要两个下标,一个生产者,一个消费者;
(4)两个下标重合的时候,队列既有可能是空的,也有可能是满的;
判空/判满的方法:1.计数器;2.专门浪费一个位置;
(5)当生产者和消费者指向同一个位置时,线程之间具有互斥同步的关系;
当生产者和消费者指向不同位置时,让他们并发执行;
(6)生产者不能将消费者套圈;
消费者不能超过生产者;
队列为空,一定要先让生产者运行;
队列为满,一定要先让消费者运行;
(7)生产者最关心的是空间资源 -> spaceSem 剩余空间信号量,初值为N;
消费者最关心的是数据资源 -> dataSem 剩余数据信号量,初值为0;
(8)当生产者生产了一个数据后,空间资源被占用,但是数据资源多了一个;
(9)生产者生产资源前,要先申请空间信号量(spaceSem - -),之后在特定位置生产资源,生产完成后,释放数据信号量(dataSem++);
消费者消费资源前,要先申请数据信号量(dataSem - -),之后消费特定位置的资源,消费完成后,释放空间信号量(spaceSem++);
(10)当生产线程申请信号量失败,证明空间已满,进程就会被挂起;
5.基于环形队列的生产消费模型实现
- 多生产多消费模型的意义:
并不是将任务或者数据放在交易场所或者取出就是生产和消费,生产数据或任务和拿到数据或任务之后的处理,才是生产和消费,这才是最耗时的;
多生产多消费模型的意义在于能够并发处理任务;
生产的本质:将私有的任务或数据,放到公共空间中;
消费的本质:将公共空间中的任务或数据,拿到并私有;
sem.hpp
信号量的封装,初始化对象时,就调用构造进行信号量的初始化;
对象销毁时,就自动调用析构,销毁信号量 ;
#ifndef _SEM_HPP_#define _SEM_HPP_#include #include class Sem{public: Sem(int val) { sem_init(&_sem, 0, val); } void p() { sem_wait(&_sem); } void v() { sem_post(&_sem); } ~Sem() { sem_destroy(&_sem); }private: sem_t _sem;};#endif
ringQueue.hpp
- 单生产者和单消费者在队列为空或为满的时候,需要进行信号量的申请,因此信号量自动就形成了两者的互斥关系,一定会有一方竞争失败;
- 如果改成多生产多消费模型,就会有生产者之间和消费者之间的关系,因此需要两把锁,生产和消费各一把;
- 多生产:当一个生产者线程访问一个下标时,加锁,其他线程来访问时就需要等待;
- 加锁和申请信号量的先后:信号量一定是安全的,具有原子性,资源是要配发给线程的,资源配发的越快,运行效率越高,因此先申请信号量,再加锁,加锁区域的粒度越小越好;
#ifndef _RING_QUEUE_HPP_#define _RING_QUEUE_HPP_#include #include #include #include #include #include #include #include "sem.hpp"const int g_default_num = 5;using namespace std;template <class T>class RingQueue{public: RingQueue(int default_num = g_default_num) : _ring_queue(default_num) , _num(default_num) , _c_step(0) , _p_step(0) , _space_sem(default_num) , _data_sem(0) { pthread_mutex_init(&_clock, nullptr); pthread_mutex_init(&_plock, nullptr); } ~RingQueue() { pthread_mutex_destroy(&_clock); pthread_mutex_destroy(&_plock); } // 生产者:空间资源,生产者们的临界资源是下标 // 加锁和申请信号量的先后:信号量一定是安全的,具有原子性, // 资源是要配发给线程的,资源配发的越快,运行效率越高,因此先申请信号量,再加锁 // 加锁的粒度越小越好 void push(const T &in) { // 先申请空间信号量 _space_sem.p(); // 多生产进程访问时,当一个生产者线程访问一个下标时,加锁,其他线程来访问时就需要等待 pthread_mutex_lock(&_plock); // 成功竞争到锁的线程继续执行下面操作 // 放入数据 _ring_queue[_p_step++] = in; _p_step %= _num; // 生产完后,解锁 pthread_mutex_unlock(&_plock); // 释放数据信号量 _data_sem.v(); } void pop(T *out) { _data_sem.p(); pthread_mutex_lock(&_clock); *out = _ring_queue[_c_step++]; _c_step %= _num; pthread_mutex_unlock(&_clock); _space_sem.v(); }private: vector<T> _ring_queue; int _num; int _c_step; // 消费下标 int _p_step; // 生产下标 Sem _space_sem; // 空间信号量 Sem _data_sem; // 数据信号量 pthread_mutex_t _clock; // 多消费者进程的锁 pthread_mutex_t _plock; // 多生产者进程的锁};#endif
ConPod.cc
#include"ringQueue.hpp"void* consumer(void* args){ RingQueue<int>* rq = (RingQueue<int>*)args; while(true) { sleep(1); int x = 0; //从环形队列中获取任务或数据 rq->pop(&x); //进行一定的处理 cout << "消费:" << x << "[" << pthread_self() << "]" << endl; }}void* procudtor(void* args){ RingQueue<int>* rq = (RingQueue<int>*)args; while(true) { //构建数据或任务对象 int x = rand() % 100 + 1; //放入环形队列 rq->push(x); cout << "生产:" << x << "[" << pthread_self() << "]" << endl; }}int main(){ srand((uint64_t)time(nullptr) ^ getpid()); RingQueue<int>* rq = new RingQueue<int>(); pthread_t c[3], p[2]; pthread_create(c, nullptr, consumer, (void*)rq); pthread_create(c + 1, nullptr, consumer, (void*)rq); pthread_create(c + 2, nullptr, consumer, (void*)rq); pthread_create(p, nullptr, procudtor, (void*)rq); pthread_create(p + 1, nullptr, procudtor, (void*)rq); for(int i = 0; i < 3; i++) { pthread_join(c[i], nullptr); } for(int i = 0; i < 2; i++) { pthread_join(p[i], nullptr); } return 0;}
运行结果:
6.信号量的意义
信号量的本质是一个计数器,它的意义在于可以不用进入临界区,就可以得知资源的情况,甚至可以减少临界区内部的判断;
申请锁和释放锁的过程,本质在于我们并不清楚临界资源的情况;
信号量要预设临界资源的情况,而且在pv变化过程中,我们在外部就能够知晓临界资源的情况;
来源地址:https://blog.csdn.net/kissland96166/article/details/132416026