目录
线程池的设计流程
1.设计线程池类
2.线程池的初始化
3.启动线程池
4.线程池运行线程
5.向线程池抛入任务
6.停止所有线程。
7.等待当前任务队列中, 所有工作全部结束
完整代码
zero_threadpool.h
zero_threadpool.cpp
threadpool.cpp
C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂
本文使用C++11实现实现线程池,涉及的技术如下:
可变参数
std::future
decltype
packaged_task
bind
支持可变参数列表
支持获取任务返回值
线程池类,采用c++11来实现了
class ZERO_ThreadPool
{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime) : _expireTime(expireTime){ }std::function _func;int64_t _expireTime = 0; //超时的绝对时间};typedef shared_ptr TaskFuncPtr;/*** @brief 获取任务* @return TaskFuncPtr*/bool get(TaskFuncPtr&task);/*** @brief 线程池是否退出*/bool isTerminate() { return bTerminate_; }/*** @brief 线程运行态*/void run();public:/*** @brief 构造函数*/ZERO_ThreadPool();/*** @brief 析构, 会停止所有线程*/virtual ~ZERO_ThreadPool();/*** @brief 初始化.* @param num 工作线程个数*/bool init(size_t num);/*** @brief 获取线程个数.* @return size_t 线程个数*/size_t getThreadNum(){std::unique_lock lock(mutex_);return threads_.size();}/*** @brief 获取当前线程池的任务数* @return size_t 线程池的任务数*/size_t getJobNum(){std::unique_lock lock(mutex_);return tasks_.size();}/*** @brief 停止所有线程, 会等待所有线程结束*/void stop();/*** @brief 启动所有线程*/bool start(); // 创建线程/*** @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).** @param millsecond 等待的时间(ms), -1:永远等待* @return true, 所有工作都处理完毕* false,超时退出*/bool waitForAllDone(int millsecond = -1);/*** @brief 用线程池启用任务(F是function, Args是参数)** @param ParentFunctor* @param tf* @return 返回任务的future对象, 可以通过这个对象来获取返回值*/template auto exec(F&& f, Args&&... args) -> std::future{return exec(0,f,args...);}/*** @brief 用线程池启用任务(F是function, Args是参数)** @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃* @param bind function* @return 返回任务的future对象, 可以通过这个对象来获取返回值*//*template 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数auto exec(F &&f, Args &&... args) -> std::futurestd::future:返回future,调用者可以通过future获取返回值返回值后置*/template auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future{int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取现在时间//定义返回值类型using RetType = decltype(f(args...)); // 推导返回值// 封装任务auto task = std::make_shared>(std::bind(std::forward(f), std::forward(args)...));TaskFuncPtr fPtr = std::make_shared(expireTime); // 封装任务指针,设置过期时间fPtr->_func = [task]() { // 具体执行的函数(*task)();};std::unique_lock lock(mutex_);tasks_.push(fPtr); // 插入任务condition_.notify_one(); // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notifyreturn task->get_future();;}
protected:queue tasks_; //任务队列std::vector threads_; //工作线程,vector 大致可以认为是一个可变数组std::mutex mutex_; //互斥锁std::condition_variable condition_; //条件变量size_t threadNum_; //线程数量bool bTerminate_; //判定是否终止线程池std::atomic atomic_{ 0 }; //原子变量
};
使用说明
/*** @file zero_thread_pool.h* @brief 线程池类,采用c++11来实现了,* 使用说明:* ZERO_ThreadPool tpool;* tpool.init(5); //初始化线程池线程数* //启动线程方式* tpool.start();* //将任务丢到线程池中* tpool.exec(testFunction, 10); //参数和start相同* //等待线程池结束* tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)* //此时: 外部需要结束线程池是调用* tpool.stop();
*//* 注意:* ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:* int testInt(int i)* {* return i;* }* auto f = tpool.exec(testInt, 5);* cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5** class Test* {* public:* int test(int i);* };* Test t;* auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);* //返回的future对象, 可以检查是否执行* cout << f.get() << endl;
*/
线程池初始化,首先加锁对资源进行保护,判断线程池为空,然后记录线程池线程的数量。
bool ZERO_ThreadPool::init(size_t num)
{std::unique_lock lock(mutex_);if (!threads_.empty()) //空则返回ture{return false;}threadNum_ = num; // 设置线程数量return true;
}
启动线程池,首先加锁对资源进行保护,判断线程池为空。创建线程,并放入vector数组中。
bool ZERO_ThreadPool::start()
{std::unique_lock lock(mutex_);if (!threads_.empty()){return false;}for (size_t i = 0; i < threadNum_; i++){threads_.push_back(new thread(&ZERO_ThreadPool::run, this));}return true;
}
判定是否终止线程池
从任务队列获取任务,判断是否任务存在。如果任务队列为空,睡眠等待条件变量唤醒。如果任务队列不为空,从队列中获取任务,释放队列中的任务。
返回的任务抛入线程池执行。
void ZERO_ThreadPool::run() // 执行任务的线程
{//调用处理部分while (!isTerminate()) // 判断是不是要停止{TaskFuncPtr task;bool ok = get(task); // 1. 读取任务if (ok){++atomic_;try{if (task->_expireTime != 0 && task->_expireTime < TNOWMS ){//超时任务,是否需要处理?}else{task->_func(); // 2. 执行任务}}catch (...){}--atomic_;//任务都执行完毕了std::unique_lock lock(mutex_);if (atomic_ == 0 && tasks_.empty()) // 3.检测是否所有任务都运行完毕{condition_.notify_all(); // 这里只是为了通知waitForAllDone}}}
}
bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{std::unique_lock lock(mutex_); // 也要等锁if (tasks_.empty()) // 判断是否任务存在{condition_.wait(lock, [this] { return bTerminate_ // 要终止线程池 bTerminate_设置为true,外部notify后|| !tasks_.empty(); // 任务队列不为空}); // notify -> 1. 退出线程池; 2.任务队列不为空}if (bTerminate_)return false;if (!tasks_.empty()) //不为空{task = std::move(tasks_.front()); // 使用了移动语义tasks_.pop(); // 释放一个任务return true;}return false;
}
使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
返回任务的future对象, 可以通过这个对象来获取返回值。
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。)
/*** @brief 用线程池启用任务(F是function, Args是参数)** @param ParentFunctor* @param tf* @return 返回任务的future对象, 可以通过这个对象来获取返回值*/template auto exec(F&& f, Args&&... args) -> std::future{return exec(0,f,args...);}/*** @brief 用线程池启用任务(F是function, Args是参数)** @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃* @param bind function* @return 返回任务的future对象, 可以通过这个对象来获取返回值*//*template 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数auto exec(F &&f, Args &&... args) -> std::futurestd::future:返回future,调用者可以通过future获取返回值返回值后置*/template auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future{int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取现在时间//定义返回值类型using RetType = decltype(f(args...)); // 推导返回值// 封装任务auto task = std::make_shared>(std::bind(std::forward(f), std::forward(args)...));TaskFuncPtr fPtr = std::make_shared(expireTime); // 封装任务指针,设置过期时间fPtr->_func = [task]() { // 具体执行的函数(*task)();};std::unique_lock lock(mutex_);tasks_.push(fPtr); // 插入任务condition_.notify_one(); // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notifyreturn task->get_future();;}
停止线程池,数组线程池停止标志,唤醒所有条件变量等待线程退出,清理线程vector数组。
void ZERO_ThreadPool::stop()
{{std::unique_lock lock(mutex_); //加锁bTerminate_ = true; // 触发退出condition_.notify_all(); //唤醒所有等待线程}for (size_t i = 0; i < threads_.size(); i++){//joinable判断线程是否可以加入等待if(threads_[i]->joinable()){threads_[i]->join(); // 等线程推出}delete threads_[i];threads_[i] = NULL;}std::unique_lock lock(mutex_);threads_.clear();
}
如果当前任务队列为空,直接退出。
当前队列不为空,等待条件变量,线程睡眠。满足条件后退出。
bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{std::unique_lock lock(mutex_);if (tasks_.empty())return true;if (millsecond < 0){//当被唤醒后 && 第二的参数为falsecondition_.wait(lock, [this] { return tasks_.empty(); });return true;}else{//在线程收到唤醒通知或者时间超时之前,该线程都会 处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty(); });}
}
//zero_threadpool.h
#ifndef ZERO_THREADPOOL_H
#define ZERO_THREADPOOL_H#include
#include
#include
#include
#include
#include
//#include #ifdef WIN32
#include
#else
#include
#endifusing namespace std;void getNow(timeval *tv);
int64_t getNowMs();#define TNOW getNow()
#define TNOWMS getNowMs()/
/*** @file zero_thread_pool.h* @brief 线程池类,采用c++11来实现了,* 使用说明:* ZERO_ThreadPool tpool;* tpool.init(5); //初始化线程池线程数* //启动线程方式* tpool.start();* //将任务丢到线程池中* tpool.exec(testFunction, 10); //参数和start相同* //等待线程池结束* tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)* //此时: 外部需要结束线程池是调用* tpool.stop();* 注意:* ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:* int testInt(int i)* {* return i;* }* auto f = tpool.exec(testInt, 5);* cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5** class Test* {* public:* int test(int i);* };* Test t;* auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);* //返回的future对象, 可以检查是否执行* cout << f.get() << endl;
*/class ZERO_ThreadPool
{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime) : _expireTime(expireTime){ }std::function _func;int64_t _expireTime = 0; //超时的绝对时间};typedef shared_ptr TaskFuncPtr;/*** @brief 获取任务* @return TaskFuncPtr*/bool get(TaskFuncPtr&task);/*** @brief 线程池是否退出*/bool isTerminate() { return bTerminate_; }/*** @brief 线程运行态*/void run();public:/*** @brief 构造函数*/ZERO_ThreadPool();/*** @brief 析构, 会停止所有线程*/virtual ~ZERO_ThreadPool();/*** @brief 初始化.* @param num 工作线程个数*/bool init(size_t num);/*** @brief 获取线程个数.* @return size_t 线程个数*/size_t getThreadNum(){std::unique_lock lock(mutex_);return threads_.size();}/*** @brief 获取当前线程池的任务数* @return size_t 线程池的任务数*/size_t getJobNum(){std::unique_lock lock(mutex_);return tasks_.size();}/*** @brief 停止所有线程, 会等待所有线程结束*/void stop();/*** @brief 启动所有线程*/bool start(); // 创建线程/*** @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).** @param millsecond 等待的时间(ms), -1:永远等待* @return true, 所有工作都处理完毕* false,超时退出*/bool waitForAllDone(int millsecond = -1);/*** @brief 用线程池启用任务(F是function, Args是参数)** @param ParentFunctor* @param tf* @return 返回任务的future对象, 可以通过这个对象来获取返回值*/template auto exec(F&& f, Args&&... args) -> std::future{return exec(0,f,args...);}/*** @brief 用线程池启用任务(F是function, Args是参数)** @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃* @param bind function* @return 返回任务的future对象, 可以通过这个对象来获取返回值*//*template 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数auto exec(F &&f, Args &&... args) -> std::futurestd::future:返回future,调用者可以通过future获取返回值返回值后置*/template auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future{int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs); // 获取现在时间//定义返回值类型using RetType = decltype(f(args...)); // 推导返回值// 封装任务auto task = std::make_shared>(std::bind(std::forward(f), std::forward(args)...));TaskFuncPtr fPtr = std::make_shared(expireTime); // 封装任务指针,设置过期时间fPtr->_func = [task]() { // 具体执行的函数(*task)();};std::unique_lock lock(mutex_);tasks_.push(fPtr); // 插入任务condition_.notify_one(); // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notifyreturn task->get_future();;}protected:queue tasks_; //任务队列std::vector threads_; //工作线程,vector 大致可以认为是一个可变数组std::mutex mutex_; //互斥锁std::condition_variable condition_; //条件变量size_t threadNum_; //线程数量bool bTerminate_; //判定是否终止线程池std::atomic atomic_{ 0 }; //原子变量
};#endif // ZERO_THREADPOOL_H
#include "zero_threadpool.h"ZERO_ThreadPool::ZERO_ThreadPool(): threadNum_(1), bTerminate_(false)
{
}ZERO_ThreadPool::~ZERO_ThreadPool()
{stop();
}bool ZERO_ThreadPool::init(size_t num)
{std::unique_lock lock(mutex_);if (!threads_.empty()) //空则返回ture{return false;}threadNum_ = num; // 设置线程数量return true;
}void ZERO_ThreadPool::stop()
{{std::unique_lock lock(mutex_); //加锁bTerminate_ = true; // 触发退出condition_.notify_all(); //唤醒所有等待线程}for (size_t i = 0; i < threads_.size(); i++){//joinable判断线程是否可以加入等待if(threads_[i]->joinable()){threads_[i]->join(); // 等线程推出}delete threads_[i];threads_[i] = NULL;}std::unique_lock lock(mutex_);threads_.clear();
}bool ZERO_ThreadPool::start()
{std::unique_lock lock(mutex_);if (!threads_.empty()){return false;}for (size_t i = 0; i < threadNum_; i++){threads_.push_back(new thread(&ZERO_ThreadPool::run, this));}return true;
}bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{std::unique_lock lock(mutex_); // 也要等锁if (tasks_.empty()) // 判断是否任务存在{condition_.wait(lock, [this] { return bTerminate_ // 要终止线程池 bTerminate_设置为true,外部notify后|| !tasks_.empty(); // 任务队列不为空}); // notify -> 1. 退出线程池; 2.任务队列不为空}if (bTerminate_)return false;if (!tasks_.empty()) //不为空{task = std::move(tasks_.front()); // 使用了移动语义tasks_.pop(); // 释放一个任务return true;}return false;
}void ZERO_ThreadPool::run() // 执行任务的线程
{//调用处理部分while (!isTerminate()) // 判断是不是要停止{TaskFuncPtr task;bool ok = get(task); // 1. 读取任务if (ok){++atomic_;try{if (task->_expireTime != 0 && task->_expireTime < TNOWMS ){//超时任务,是否需要处理?}else{task->_func(); // 2. 执行任务}}catch (...){}--atomic_;//任务都执行完毕了std::unique_lock lock(mutex_);if (atomic_ == 0 && tasks_.empty()) // 3.检测是否所有任务都运行完毕{condition_.notify_all(); // 这里只是为了通知waitForAllDone}}}
}
// 1000msbool ZERO_ThreadPool::waitForAllDone(int millsecond)
{std::unique_lock lock(mutex_);if (tasks_.empty())return true;if (millsecond < 0){//当被唤醒后 && 第二的参数为falsecondition_.wait(lock, [this] { return tasks_.empty(); });return true;}else{//在线程收到唤醒通知或者时间超时之前,该线程都会 处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty(); });}
}int gettimeofday(struct timeval &tv)
{
#if WIN32time_t clock;struct tm tm;SYSTEMTIME wtm;GetLocalTime(&wtm);tm.tm_year = wtm.wYear - 1900;tm.tm_mon = wtm.wMonth - 1;tm.tm_mday = wtm.wDay;tm.tm_hour = wtm.wHour;tm.tm_min = wtm.wMinute;tm.tm_sec = wtm.wSecond;tm. tm_isdst = -1;clock = mktime(&tm);tv.tv_sec = clock;tv.tv_usec = wtm.wMilliseconds * 1000;return 0;
#elsereturn ::gettimeofday(&tv, 0);
#endif
}void getNow(timeval *tv)
{
#if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUXint idx = _buf_idx;*tv = _t[idx];if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc){addTimeOffset(*tv, idx);}else{TC_Common::gettimeofday(*tv);}
#elsegettimeofday(*tv);
#endif
}int64_t getNowMs()
{struct timeval tv;getNow(&tv);return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
}
#include
#include "zero_threadpool.h"
using namespace std;void func0()
{cout << "func0()" << endl;
}void func1(int a)
{cout << "func1 int =" << a << endl;
}//void func1(string a)
//{
// cout << "func1 string =" << a << endl;
//}void func2(int a, string b)
{cout << "func2() a=" << a << ", b=" << b<< endl;
}void test1() // 简单测试线程池
{ZERO_ThreadPool threadpool; // 封装一个线程池threadpool.init(1); // 设置线程的数量threadpool.start(); // 启动线程池,创建线程, 线程没有start,创建完毕后被调度// 假如要执行的任务
// threadpool.exec(1000,func0); // 1000是超时1000的意思,目前没有起作用threadpool.exec(func1, 10);threadpool.exec((void(*)(int))func1, 10); // 插入任务
// threadpool.exec((void(*)(string))func1, "king");threadpool.exec(func2, 20, "darren"); // 插入任务threadpool.waitForAllDone(); // 等待都执行完退出 运行函数 插入1000个任务, 等1000个任务执行完毕才退出threadpool.stop(); // 这里才是真正执行退出
}int func1_future(int a)
{cout << "func1() a=" << a << endl;return a;
}string func2_future(int a, string b)
{cout << "func1() a=" << a << ", b=" << b<< endl;return b;
}void test2() // 测试任务函数返回值
{ZERO_ThreadPool threadpool; //封装一个线程池threadpool.init(1); //设置线程数量threadpool.start(); // 启动线程池,创建线程// 假如要执行的任务std::future result1 = threadpool.exec(func1_future, 10);std::future result2 = threadpool.exec(func2_future, 20, "darren");
// auto result2 = threadpool.exec(func2_future, 20, "darren");std::cout << "result1: " << result1.get() << std::endl;std::cout << "result2: " << result2.get() << std::endl;threadpool.waitForAllDone();threadpool.stop();
}class Test
{
public:int test(int i){cout << _name << ", i = " << i << endl;return i;}
// int test(string str) {
// cout << _name << ", str = " << str << endl;
// }void setName(string name){_name = name;}string _name;
};void test3() // 测试类对象函数的绑定
{ZERO_ThreadPool threadpool;threadpool.init(1);threadpool.start(); // 启动线程池Test t1;Test t2;t1.setName("Test1");t2.setName("Test2");auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);threadpool.waitForAllDone();cout << "t1 " << f1.get() << endl;cout << "t2 " << f2.get() << endl;
}void func2_1(int a, int b)
{cout << "func2_1 a + b = " << a+b << endl;
}int func2_1(string a, string b)
{cout << "func2_1 a + b = " << a << b<< endl;return 0;
}
void test4() // 简单测试线程池
{ZERO_ThreadPool threadpool; // 封装一个线程池threadpool.init(1); // 设置线程的数量threadpool.start(); // 启动线程池,创建线程, 线程没有start,创建完毕后被调度// 假如要执行的任务threadpool.exec((void(*)(int, int))func2_1, 10, 20); // 插入任务threadpool.exec((int(*)(string, string))func2_1, "king", " and darren");threadpool.waitForAllDone(); // 等待都执行完退出 运行函数 插入1000个任务, 等1000个任务执行完毕才退出threadpool.stop(); // 这里才是真正执行退出
}int main()
{
// test1(); // 简单测试线程池test2(); // 测试任务函数返回值
// test3(); // 测试类对象函数的绑定
// test4();cout << "main finish!" << endl;return 0;
}