开发环境:
涉及知识点:
项目链接,可以下载项目下来,边看边学效果更加!
码云,点击直达~
ThreadPool 线程池类型
线程池支持的类型:
如果是一开始设计,实际上线程池只需要
std::vector
来实现,后面改成了std::unordered_map
是因为我们需要给每一个线程一个自定义的id,因为ThreadPool需要动态删除线程的时候,需要找到对应的线程,此时如果遍历数组的效率低,我们后面改成哈希表。
总结:
如果是MODE_FIXED模式,这里用vector存储线程是可以的。
但是如果是MODE_CACHED,则需要使用哈希表。
// 线程支持的模式
enum class PoolMode
{MODE_FIXED,// 固定数量的线程 -- 不需要考虑线程安全问题MODE_CACHED, // 线程数量可动态增长 -- 需要考虑线程安全问题
};
注意:
std::queue> taskQue_;
如果存储任务,我们需要使用到多态,所以我们这里要用基类的指针,但是如果使用 std::queue 是有问题的
,因为用户若创建一个临时的任务对象,用户调用submitTask回来后,我们queue记录到任务,但是任务已经被销毁了。 但是如果用shared_ptr
后,其实这个任务的生命周期已经是threadFunc
线程的函数里面执行完,就会将任务进行析构。// std::vector> threads_; // 线程列表,避免手动释放线程列表
std::unordered_map> threads_;// 线程列表
int initThreadSize_; // 初始的线程数量
std::atomic_int curThreadSize_;// 记录当前线程池里面的线程总数量
int threadSizeThreshHold_; // 线程数量的上限阈值
// std::atomic_int idleThreadSize_;// 记录空闲线程的数量
int idleThreadSize_;// 记录空闲线程的数量std::queue> taskQue_;// 这里不用基类的裸指针,并且这里保证要基类,因为我们要用到多态
std::atomic_int taskSize_; // 任务的数量
int taskQueMaxThreshHold_; // 任务最大的上限,超过上线会阻塞生产者,直到队列有新的线程执行任务std::mutex taskQueMtx_; // 保证任务队列的原子性
std::condition_variable notFull_; // 表示任务队列不满
std::condition_variable nonEmpty_; // 表示任务队列不空
std::condition_variable exitCond_; // 等待线程资源全部回收PoolMode poolMode_; // 线程模式
std::atomic_bool isPoolRunning_;// 线程是否在运行
线程池需要提供的方法:
其实线程池就重要的就是submitTask和threadFunc函数,一个是放任务,一个是取任务。其他的都是设置一下相关的成员变量,方便使用。
由于这两个函数关系紧密,我们放到一起讲,就不分开了。
public:// 线程池构造ThreadPool();// 线程池析构~ThreadPool();// 设置线程池的工作模式void setMode(PoolMode mode);// 设置线程池cached模式下线程阈值void setThreadSizeThreshHold(int threshhold);// 设置task任务队列上线阈值void setTaskQueMaxThreshHold(int threshhold);// 给线程池提交任务Result submitTask(std::shared_ptr sp);// 开启线程池,指定线程里面的数量的多少,初始化这里给一次就可以了void start(int initThreadSize = 4);// 不希望线程池对象进行拷贝构造和赋值ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;private:// 定义线程函数void threadFunc(int threadid);bool checkRunningState() const;
线程池初始化批量线程,将线程绑定了线程函数,后续Thread::start
就可以直接用到线程池当中的私有成员变量(如任务队列等等)
。
所以线程函数我们是放在线程池当中,而不是放到线程当中。
// 开启线程池
void ThreadPool::start(int initThreadSize)
{// 设置线程池的运行状态isPoolRunning_ = true; // 记录初始线程的个数initThreadSize_ = initThreadSize;curThreadSize_ = initThreadSize;// 创建线程对象for (int i = 0; i < initThreadSize_ ;++ i){// 这里把线程函数传递给线程auto ptr = std::make_unique(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));/*threads_.emplace_back(std::move(ptr));*/}// 启动所有线程for (int i = 0; i < initThreadSize_; ++i){// 创建线程,启动线程,执行线程函数:即查看任务队列是否有任务,若有就拿走执行threads_[i]->start(); idleThreadSize_++; // 记录初始空闲线程的数量}
}
线程里面就存储函数对象
// 线程类型
class Thread
{
public:using ThreadFunc = std::function;//void threadFunc();Thread(ThreadFunc func);~Thread();// 启动线程void start();// 获取线程的idint getId() const;
private:ThreadFunc func_;static int generateId_;int threadId_; // 保存线程的id};
启动线程的时候就通过函数对象来跑就行了。
设置成分离线程,主线程退出的时候分离线程没执行完线程函数后也会退出。
// 启动线程,线程函数要访问的锁,条件变量都在ThreadPool里面,所以线程函数要放到ThreadPool里面
void Thread::start()
{// 创建一个线程函数,这里的thread的方法就是在构造函数里面执行的std::thread t(func_,threadId_); // C++11 来说,线程对象t 和线程函数func_ t.detach(); // 设置为分离线程,主线程和从线程func_
}
wait 就是阻塞的等待,wait_for 就是等待的时间长度会说,wait_util强调等待得时间结点会说。如果主线程放任务的过程超过1s,那么此时返回一个valid 为false 的Result,标识放任务失败。那么用户需要对返回值进行判断。
// 给线程池提交任务
Result ThreadPool::submitTask(std::shared_ptr sp)
{// 获取锁std::unique_lock lock(taskQueMtx_);// 线程的通信 --判断是否队列中的任务到达了上限// 在 nonempty 条件变量进行等待,Pred条件不满足才会跳出wait// 用户提交任务,最长不能超过1s,否则判断任务失败,返回// wait(与时间无关) wait_for(持续等待的时间) wait_until(等待的终点)// wait_for 返回值判断if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < taskQueMaxThreshHold_; })){// 返回值为false,条件依旧没有满足,任务队列满的std::cerr << "task queue is full,submit task fail." << std::endl;return Result(sp,false); // Task Result}//taskQue_.push(std::move(sp));// bugtaskQue_.emplace(sp);taskSize_++;// 通知消费者消费nonEmpty_.notify_all();// cached模式 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程// 应用场景: 小而快的任务if (poolMode_ == PoolMode::MODE_CACHED&& taskSize_ > idleThreadSize_&& curThreadSize_< threadSizeThreshHold_) {std::cout << ">>> create new thread" << std::endl;// 创建新的线程对象auto ptr = std::make_unique(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));threads_[threadId]->start();// 修改线程个数相关的变量curThreadSize_++;idleThreadSize_++;}return Result(sp);
}
exec函数:
exec 函数的执行,是执行程序,并且将返回值放到Task当中的Result*当中。
exec封装了setVal,实际上就是提前判断result_是否存在,存在才设置。
void Task::exec()
{if (result_ != nullptr){result_->setVal(run()); // 这里发生多态调用}
}
设置结果在Result的构造函数中执行。
void Task::setResult(Result* res)
{result_ = res;
}
get函数:
isValid_ 当任务没放到任务队列的时候会被设置为false,此时用户即使是调用get方法获取返回值也是会失败的。
if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < taskQueMaxThreshHold_; })){// 返回值为false,条件依旧没有满足,任务队列满的std::cerr << "task queue is full,submit task fail." << std::endl;return Result(sp,false); // Task Result}
// 用户调用
Any Result::get()
{if (!isValid_){return nullptr;}sem_.wait(); // 任务如果没有执行完,这里会阻塞用户return std::move(any_);
}
首先要构造函数要能接受任意类型,那么我们一定有一个构造函数是函数模板;
然后需要Any类是这个接受类型的父类,那么我们可以定义一个子类Derive,专门接受不同类型的,然后保存在子类当中;父类不需要模板,Any类放一个unique_ptr
,然后构造函数 Any(T data) :base_(std::make_unique
这样base_ 就指向了任意类型,并且是父类。若要使用的时候可以通过转化为子类(dynamic_cast)然后cast_操作就会将保存的值进行返回。
// Any 类型:可以接受任意数据的类型
class Any
{
public:Any() = default;~Any() = default;Any(const Any&) = delete;// 成员禁止左值拷贝和赋值Any& operator=(const Any&) = delete;Any(Any&&) = default;Any& operator=(Any&&) = default;// 模板收任意类型的数据来构造一个对象templateAny(T data) :base_(std::make_unique>(data)){}// 这个方法能把Any对象里面存储的data对象提取出来templateT cast_(){// 我们怎么从base_找到它所指向的Derive对象,从他里面取出data成员变量// 基类指针 =》 派生类指针// base.get_() 就是获取裸指针Derive* pd = dynamic_cast*>(base_.get());// 类型不匹配就会失败if (pd == nullptr){throw "type is unmatch !";}return pd->data_;}
private:class Base{public:virtual ~Base() = default;};// 派生类类型templateclass Derive : public Base{public:Derive(T data):data_(data){}public:T data_; // 保存了任意其他类型};
private:std::unique_ptr base_;// 基类指针
};
Result 就是提供给外部的,封装了Any,定义Any成员变量作为返回值,外部通过Result简介可以通过Any的cast_方法拿到想要的返回值。
但是如果cast_ 的类型不匹配,会返回nullptr。即外部定义MyTask的返回值sum的类型需要确定。
// 实现接受任务提交到线程池的task任务执行完成后的返回值类型Result
class Result
{
public:Result(std::shared_ptr task, bool isValid = true);~Result() = default;// 问题一:setVal 方法,获取任务执行完的返回值的void setVal(Any any);// 问题二: get方法,用户调用这个方法获取task的返回值Any get();
private:Any any_; // 存储任务返回值Semaphore sem_; // 线程通信信号量 ,默认二元std::shared_ptr task_;// 指向获取返回值的任务对象std::atomic_bool isValid_; // 返回值是否有效,任务提交失败那肯定时无效的
};
使用范例,由于调用方肯定知道返回值的类型,那么就可以拿到Result当中的Any对象存储的值提取成相应的返回值。
Result res1 = pool.submitTask(std::make_shared(1, 100000000));
pool.submitTask(std::make_shared(200000001, 300000000));
ull sum1 = res1.get().cast_();
信号量和mutex的区别:
二元信号量其实已经有了互斥的语义,但是信号量的P,V可以在不同线程,而mutex必须在一个线程。
C++20 提供了,但是我们自己实现。
由于拿返回值的时候可能存在线程池中的线程暂时还没处理完任务,所以得阻塞。
实现信号量可以用条件变量,用锁搭配条件变量实现通知,用resLimit_说明资源的数量。
class Semaphore
{
public:Semaphore(int limit = 0):resLimit_(limit){}~Semaphore() = default;// 获取一个信号量资源void wait(){std::unique_lock lock(mtx_);// 等待信号量有资源,没有资源的话,会阻塞当前线程cond_.wait(lock, [&]()->bool {return resLimit_ > 0; }); // 该条件满足就退出resLimit_--;}// 增加一个信号量资源void post(){std::unique_lock lock(mtx_);resLimit_++;cond_.notify_all();}
private:int resLimit_; // 资源的一个数量std::mutex mtx_; // 需要互斥std::condition_variable cond_; // 需要通知
};
submitTask 返回任务的结果,不能够通过往Task对象添加成员方法来实现,即task->getResult();这种是不行的。
但是Result(task)这种是没有问题的。 原因是:task的生命周期比Result的短。
Result的构造函数,会将对应的Task结构绑定起来。
其中std::shared_ptr
作为形参也是延续了Task的生命周期,至此task的生命周期由Result管控。
总结步骤
// Result 方法的实现
Result::Result(std::shared_ptr task, bool isValid):isValid_(isValid),task_(task)
{task_->setResult(this);
}// setResult 设置Task的返回结果
void Task::setResult(Result* res)
{result_ = res;
}// 用户调用,若是isValid_ 是false就表示任务都没传进来,其他情况就等待信号量。
Any Result::get()
{if (!isValid_){return nullptr;}sem_.wait(); // 任务如果没有执行完,这里会阻塞用户return std::move(any_);
}// 线程池内部线程调用
/// 设置返回值的调用
void Result::setVal(Any any)
{// 存储task的返回值this->any_ = std::move(any);sem_.post(); // 已经获取了任务的返回值
}/// 线程池内部调用exec函数会将用户传进来的run()方法跑了,返回值给到setVal方法,进行保存并且通知用户。
void Task::exec()
{if (result_ != nullptr){result_->setVal(run()); // 这里发生多态调用}
}
Result的生命周期什么时候结束。出了用户的生命周期才会析构。
所有的set方法都要在进程没被run起来之前被设置。
// 设置线程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{// 启动之后不允许设置线程池状态if (checkRunningState())return;poolMode_ = mode;
}
若是处于Cache模式:
Thread的成员变量threadid_ 就是为这里考虑的。因为删除线程的时候需要找到通过id来找,这样会比较优雅。
由于需要判断是否需要增加或者删除线程,用idleThreadSize_来记录空闲线程的数量,curThreadSize_ 来记录已经创建了的线程总数(空闲+非空闲),taskSize_记录任务的任务队列中数量,后续可以通过空闲的线程数量和taskSize_进行比较,从而来增加/删除线程。
submitTask:
负责提交,提交的时候需要考虑已有的任务若是比线程多,那么就可以动态的创建适量线程来执行任务。因为提交任务可以对任务进行初步的判断,考虑是否需要增加线程处理。
增加的时间点,只要是cache,并且当前的线程数没有达到上限值,任务数量大于空闲线程数,就可以进行创建。
// cached模式 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程
// 应用场景: 小而快的任务
if (poolMode_ == PoolMode::MODE_CACHED&& taskSize_ > idleThreadSize_&& curThreadSize_< threadSizeThreshHold_)
{std::cout << ">>> create new thread" << std::endl;// 创建新的线程对象auto ptr = std::make_unique(std::bind(&ThreadPool::threadFunc, this,std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));threads_[threadId]->start();// 修改线程个数相关的变量curThreadSize_++;idleThreadSize_++;
}
threadFunc:
线程池中的线程在wait_for进行等待,当被唤醒的时候判断如果是timeout了,表示一定时间内没有任务了,此时若是线程的数量超过初始的,就可以让线程销毁了。
我们这里规定,就是从等待任务的时候,每1s timeout一次,直到时间已经超过60s,那么线程就不能留着了。
if (poolMode_ == PoolMode::MODE_CACHED)
{if (std::cv_status::timeout == nonEmpty_.wait_for(lock, std::chrono::seconds(1))){auto now = std::chrono::high_resolution_clock().now();auto dur = std::chrono::duration_cast(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME&& curThreadSize_ > initThreadSize_){// 开始回收当前线程// 记录线程数量的相关变量的值修改// 把线程对象从线程列表容器中删除 到那时没有办法直到对应的thread的线程对象// threadid => thread对象 threads_.erase(threadid);// 删除id,不能用this_thread里面的idcurThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;return;}}
}
setThreadSizeThreshHold:
若是要设置线程为Cache模式,需要判断线程是否有启动。
// 线程池析构
ThreadPool::~ThreadPool()
{isPoolRunning_ = false;//nonEmpty_.notify_all();// 唤醒所有的线程 -- 这里唤醒的话,会有一种情况唤醒不了// 等待线程池里面所有的线程返回,有两种状态: 阻塞 & 正在执行的任务std::unique_lock lock{ taskQueMtx_ };nonEmpty_.notify_all();// 唤醒所有的线程// 所有线程都释放,这里才会彻底退出exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
若是nonEmpty的唤醒放在notify前面,有可能还会造成死锁,这个问题是非闭环的,一定要逻辑清晰。
即下面这种特殊情况。那为什么nonEmpty的唤醒放下面就没有问题呢? 因为即使获取锁后唤醒,那么我们可以通过双重判断解决。
~ThreadPool() 如果是先唤醒,在获取锁,(此时可能)就有可能主线程和线程池的线程都在等待唤醒的情况。但是如果是先获取锁,那么线程池的线程在第二次判断isPoolRunning这里,就一定会被判断失败,然后去执行任务。 其实单纯的只是析构函数唤醒的时候线程切换,就有可能发生部分线程执行任务的时候进入了临界区执行任务,但是没任务又wait了。
下图的说法是错误的,是一开始总结的时候想的:其实这里不用考虑切换,都上锁了,这里肯定是串行的。愚蠢。
析构函数代码如上。
// 线程池析构
ThreadPool::~ThreadPool()
{isPoolRunning_ = false;//nonEmpty_.notify_all();// 唤醒所有的线程 -- 这里唤醒的话,会有一种情况唤醒不了// 等待线程池里面所有的线程返回,有两种状态: 阻塞 & 正在执行的任务std::unique_lock lock{ taskQueMtx_ };nonEmpty_.notify_all();// 唤醒所有的线程// 所有线程都释放,这里才会彻底退出exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
配合线程函数,可以彻底解决死锁问题。
这种线程执行可以保证,已经在任务队列的任务都能执行完再退出。
// 定义线程函数
void ThreadPool::threadFunc(int threadid) // 线程函数返回,相应的线程也就结束了
{// 获取一个高精度时间auto lastTime = std::chrono::high_resolution_clock().now();while (isPoolRunning_){std::shared_ptr task;{// 先获取锁std::unique_lock lock(taskQueMtx_);cout << "tid: " << std::this_thread::get_id() << " 尝试获取任务..." << endl;// cache模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程给取消了// 当前时间 - 上一次线程执行时间 > 60s// 等待nonEmpty// 总结:也就是线程获取任务的过程中等待过长时间就可以进行删除// 每1s返回一次,区分 超时返回?有任务待执行while (isPoolRunning_ && taskSize_ <= 0){// 条件变量,超时返回if (poolMode_ == PoolMode::MODE_CACHED){if (std::cv_status::timeout == nonEmpty_.wait_for(lock, std::chrono::seconds(1))){auto now = std::chrono::high_resolution_clock().now();auto dur = std::chrono::duration_cast(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME&& curThreadSize_ > initThreadSize_){// 开始回收当前线程// 记录线程数量的相关变量的值修改// 把线程对象从线程列表容器中删除 到那时没有办法直到对应的thread的线程对象// threadid => thread对象 threads_.erase(threadid);// 删除id,不能用this_thread里面的idcurThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;return;}}}else{nonEmpty_.wait(lock);}// 线程等待被唤醒后,两种情况都检查是被谁唤醒// 线程池要结束,回收线程资源if (!isPoolRunning_){// erase就是调用了unique_ptr的析构函数,会把Thread删除,这里就已经删除线程了threads_.erase(threadid);// 删除id,不能用this_thread里面的idstd::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;exitCond_.notify_all();return;}}taskSize_--;}idleThreadSize_--;cout << "tid: " << std::this_thread::get_id() << " 获取任务成功" << endl;// 取任务task = taskQue_.front();taskQue_.pop();// 如果依然有剩余的任务,继续通知其他线程执行任务if (taskQue_.size() > 0){// 第一个吃螃蟹的通知其他人吃螃蟹nonEmpty_.notify_all();}// 取出一个任务,进行通知,通知可以继续提交生产任务notFull_.notify_all();// 释放锁,执行任务if (task != nullptr){// task->run();// 执行任务:把任务的返回值通过setVal方法给到Resulttask->exec();}// 返回值处理idleThreadSize_++;// 线程执行完了任务,空闲线程++// 更新线程调度执行完任务的时间lastTime = std::chrono::high_resolution_clock().now();}// isRunLooping_表示部分线程在线程池要退出的时候刚好在执行任务,执行后到这里threads_.erase(threadid);// 删除id,不能用this_thread里面的idstd::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;exitCond_.notify_all();
}
若项目移植到Linux下会发生错误,在Semaphore 提供一个变量isExit_可以解决。
具体调试步骤,发生死锁,查看进程id
1号是main线程,我们看看2号线程是在哪里阻塞了;
很明显,线程执行notify_all的时候进入睡眠了。
原因:此时的条件变量已经析构了。vs下没出问题是因为对条件变量的资源进行了删除,而Linux下没有对条件变量析构做处理。
那为啥Semaphore会被析构呢?
Semaphore 是在Result中被使用,是用来用户进行获取返回值的时候进行使用的。但是有一种场景:
如下图,解释也在图中。
解决办法,每次wait,post操作对isExit_进行判断。
// 实现一个信号量类
class Semaphore
{
public:Semaphore(int limit = 0):resLimit_(limit),isExit_(false){}~Semaphore(){isExit_ = true;}// 获取一个信号量资源void wait(){if(isExit_){return ;}std::unique_lock lock(mtx_);// 等待信号量有资源,没有资源的话,会阻塞当前线程cond_.wait(lock, [&]()->bool {return resLimit_ > 0; }); // 该条件满足就退出resLimit_--;}// 增加一个信号量资源void post(){// 如果已经不存在了,那么可以不做任何事情了if(isExit_){return ;}std::unique_lock lock(mtx_);resLimit_++;// 这里会有问题,因为Linuxcond_不释放资源,会发生无故阻塞cond_.notify_all();}
private:int resLimit_;// 资源的一个数量std::mutex mtx_;std::condition_variable cond_;std::atomic_bool isExit_;
};
用C++11,14 来进行代码重构,摒弃自己的Result,Task,Semaphore等等,用packaged_task 和 future类型来实现。
类似thread(func,12,3),我们希望给submitTask传参的时候也可以使用这种类似的方式,而不是自己手动创建任务,然后将任务传参。
简单的一些前置知识铺垫:
注意:packaged_task的operator()重载中若是执行失败,会返回future_error的异常,通常若是发生这个异常,可能是任务没有被执行。
submitTask改造,其中返回值类型用了auto,这是C++14提供的,std::future
// 给线程池提交任务
// 使用可变参模板变成,让submitTask接受任意任务函数和任意数量的参数
// pool.submitTask(sum1,10,20)
template
auto submitTask(Func&& func, Args&& ...args) -> std::future
{// 打包任务,放入任务队列// 返回值类型重命名using RType = decltype(func(args ...));auto task = std::make_shared>(std::bind(std::forward(func), std::forward(args)...));std::unique_lock lock(taskQueMtx_);// 线程的通信 --判断是否队列中的任务到达了上限// 在 nonempty 条件变量进行等待,Pred条件不满足才会跳出wait// 用户提交任务,最长不能超过1s,否则判断任务失败,返回// wait(与时间无关) wait_for(持续等待的时间) wait_until(等待的终点)// wait_for 返回值判断if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool {return taskQue_.size() < taskQueMaxThreshHold_; })){// 返回值为false,条件依旧没有满足,任务队列满的std::cerr << "task queue is full,submit task fail." << std::endl;auto task = std::make_shared>([]()->RType {return RType(); } // 返回一个空值的任务);(*task)(); // 相当于主线程来执行这个不成功的例子了return task->get_future(); // Task Result}//taskQue_.push(std::move(sp));// bug//taskQue_.emplace(sp);// using Task = std::function;taskQue_.emplace([task]() { // 以值形式拷贝,相当于shared_ptr的拷贝了一份,疑问,这个函数体返回值不是void吗// 去执行下面的任务(*task)(); // 执行任务,以及设置任务的返回值});taskSize_++;// 通知消费者消费nonEmpty_.notify_all();// cached模式 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程// 应用场景: 小而快的任务if (poolMode_ == PoolMode::MODE_CACHED&& taskSize_ > idleThreadSize_&& curThreadSize_ < threadSizeThreshHold_){std::cout << ">>> create new thread" << std::endl;// 创建新的线程对象auto ptr = std::make_unique(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));threads_[threadId]->start();// 修改线程个数相关的变量curThreadSize_++;idleThreadSize_++;}return task->get_future();
}
线程执行的函数基本上不用变,执行函数从task->exec() 换成 task() 即可,相当于执行了packaged_task的operator() 函数。
// 定义线程函数
void threadFunc(int threadid)
{// 获取一个高精度时间auto lastTime = std::chrono::high_resolution_clock().now();while (isPoolRunning_){Task task;{// 先获取锁std::unique_lock lock(taskQueMtx_);cout << "tid: " << std::this_thread::get_id() << " 尝试获取任务..." << endl;// cache模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程给取消了// 当前时间 - 上一次线程执行时间 > 60s// 等待nonEmpty// 总结:也就是线程获取任务的过程中等待过长时间就可以进行删除// 每1s返回一次,区分 超时返回?有任务待执行while (taskSize_ <= 0){// 条件变量,超时返回if (poolMode_ == PoolMode::MODE_CACHED){if (std::cv_status::timeout == nonEmpty_.wait_for(lock, std::chrono::seconds(1))){auto now = std::chrono::high_resolution_clock().now();auto dur = std::chrono::duration_cast(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME&& curThreadSize_ > initThreadSize_){// 开始回收当前线程// 记录线程数量的相关变量的值修改// 把线程对象从线程列表容器中删除 到那时没有办法直到对应的thread的线程对象// threadid => thread对象 threads_.erase(threadid);// 删除id,不能用this_thread里面的idcurThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;return;}}}else{nonEmpty_.wait(lock);}// 线程等待被唤醒后,两种情况都检查是被谁唤醒// 线程池要结束,回收线程资源if (!isPoolRunning_){// erase就是调用了unique_ptr的析构函数,会把Thread删除,这里就已经删除线程了threads_.erase(threadid);// 删除id,不能用this_thread里面的idstd::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;exitCond_.notify_all();return;}}taskSize_--;}idleThreadSize_--;cout << "tid: " << std::this_thread::get_id() << " 获取任务成功" << endl;// 取任务task = taskQue_.front();taskQue_.pop();// 如果依然有剩余的任务,继续通知其他线程执行任务if (taskQue_.size() > 0){// 第一个吃螃蟹的通知其他人吃螃蟹nonEmpty_.notify_all();}// 取出一个任务,进行通知,通知可以继续提交生产任务notFull_.notify_all();// 释放锁,执行任务if (task != nullptr){// task->run();// 执行任务:把任务的返回值通过setVal方法给到Result//task->exec();task(); // 执行package_task封装的函数,执行完会post}// 返回值处理idleThreadSize_++;// 线程执行完了任务,空闲线程++// 更新线程调度执行完任务的时间lastTime = std::chrono::high_resolution_clock().now();}// isRunLooping_表示部分线程在线程池要退出的时候刚好在执行任务,执行后到这里threads_.erase(threadid);// 删除id,不能用this_thread里面的idstd::cout << "threadid:" << std::this_thread::get_id() <<"exit !" << std::endl;exitCond_.notify_all();
}
该项目可能会碰到两次死锁,并且会碰到Linux平台和windows平台运行结果不一致的情况,均解决了。
项目链接:
码云,点击直达~