C++实现线程池二

C++实现线程池二

线程池

大型的软件项目需要处理非常多的任务,例如:对于大量数据的数据流处理,或者是包含复杂GUI界面的应用程序。如果将所有的任务都以串行的方式执行,则整个系统的效率将会非常低下,应用程序的用户体验会非常的差。

如果一个系统支持多个动作同时存在,那么这个系统就是一个并发系统。如果这个系统还支持多个动作(物理时间上)同时执行,那么这个系统就是一个并行系统。

C++ 实现生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <chrono>
#include <condition_variable>
#include <future>
#include <mutex>
#include <queue>

// 某些调用可能会抛出std::system_error
std::mutex mutex;
std::condition_variable condvar;

std::queue<int> msgQueue;

void producer(int start, int end)
{
for (int x = start; x < end; x++) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
{
std::lock_guard<std::mutex> guard(mutex);
msgQueue.push(x);
}
printf("Produce message %d\n", x);
condvar.notify_all();
}
}

void consumer(int demand)
{
while (true) {
std::unique_lock<std::mutex> ulock(mutex);
condvar.wait(ulock, []{ return msgQueue.size() > 0;});
// wait的第二个参数使得显式的double check不再必要
printf("Consume message %d\n", msgQueue.front());
msgQueue.pop();
--demand;
if (!demand) break;
}
}


int main()
{
std::thread producer1(producer, 0, 10);
std::thread producer2(producer, 10, 20);
std::thread producer3(producer, 20, 30);
std::thread consumer1(consumer, 20);
std::thread consumer2(consumer, 10);

producer1.join();
producer2.join();
producer3.join();
consumer1.join();
consumer2.join();
}

C++线程池

对上一篇文章中的线程池进行改写,将C语言为C++编写,创建线程池对象,包含队列,从队列中取出线程并运行线程程序

1
2
3
4
5
6
7
8
9
10
liuchang@DESKTOP-LIUCHANG:~/codetest/THreads/cppthreadpool$ tree
.
├── main.cpp
├── main.exe
├── Makefile
├── run.sh
├── TaskQueue.cpp
├── TaskQueue.h
├── ThreadPool.cpp
└── ThreadPool.h

TaskQueue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#pragma once
#include<queue>
#include<pthread.h>


using callback = void(*)(void* arg);
template<typename T>
struct Task
{
Task<T>(){
function = nullptr;
arg = nullptr;
}
Task<T>(callback f, void* arg)
{
function = f;
this->arg = (T*)arg;
}
callback function;
T *arg;
};

template<typename T>
class TaskQueue{
public:
TaskQueue();
~TaskQueue();

// 添加任务
void addTask(Task<T> &task);
void addTask(callback func, void* arg);
// 取出一个任务
Task<T> takeTask();

// 获取当前任务的个数
inline size_t taskNumber(){
return m_taskQ.size();
}

private:
pthread_mutex_t m_mutex;
std::queue<Task<T>> m_taskQ;
};

TaskQueue.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "TaskQueue.h"

template<typename T>
TaskQueue<T>::TaskQueue(){
pthread_mutex_init(&m_mutex, NULL);
}

template<typename T>
TaskQueue<T>::~TaskQueue(){
pthread_mutex_destroy(&m_mutex);
}

template<typename T>
void TaskQueue<T>::addTask(Task<T> &task){
pthread_mutex_lock(&m_mutex);
m_taskQ.push(task);
pthread_mutex_unlock(&m_mutex);
}

template<typename T>
void TaskQueue<T>::addTask(callback func, void* arg){
pthread_mutex_lock(&m_mutex);
m_taskQ.push(Task<T>(func,arg));
pthread_mutex_unlock(&m_mutex);
}

template<typename T>
Task<T> TaskQueue<T>::takeTask(){
Task<T> t;
pthread_mutex_lock(&m_mutex);
if(!m_taskQ.empty()){
t = m_taskQ.front();
m_taskQ.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
}

ThreadPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#pragma once
#include "TaskQueue.h"
#include "TaskQueue.cpp"

template<typename T>
class ThreadPool
{
public:
// 创建线程池并初始化,min线程池最小线程数,max线程池最大线程数
ThreadPool(int min, int max);

// 销毁线程池
~ThreadPool();

// 给线程池加任务
void addTask(Task<T> task);

// 获取线程池中工作的线程的个数
int getBusyNum();

// 获取线程池中活着的线程的个数
int getAliveNum();

private:

static void *worker(void *arg);
static void *manager(void *arg);
void threadExit();

private:
// 任务队列
TaskQueue<T> *taskQ;

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作者线程ID
int minNum; // 最小线程数目
int maxNum; // 最大线程数目
int busyNum; // 忙线程个数
int liveNum; // 存活线程个数
int exitNum; // 要杀死的线程个数
pthread_mutex_t mutexpool; // 锁整个线程池
pthread_cond_t notEmpty; // 任务队列是否空了
static const int NUMBER = 2;

bool shutdown=false; // 是不是要销毁线程池,销毁为true,不销毁为false
};

ThreadPool.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#include"ThreadPool.h"
#include<iostream>
#include<string.h>
#include<string>
#include<unistd.h>

using std::cout;
using std::endl;
using std::to_string;

template<typename T>
ThreadPool<T>::ThreadPool(int min, int max){
// 实例化任务队列
taskQ = new TaskQueue<T>;
do
{
// 初始化线程池
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;

// 根据线程最大上限给线程数组分配内存
threadIDs = new pthread_t[max];
if(threadIDs == nullptr){
cout<<"malloc threadIDs failed..."<<endl;
break;
}
memset(threadIDs,0,sizeof(pthread_t) * maxNum);

exitNum = 0;

if (pthread_mutex_init(&mutexpool, NULL) != 0 || pthread_cond_init(&notEmpty, NULL) != 0 )
{
cout<<"mutex or cond init failed.."<<endl;
break;
}

shutdown = false;

pthread_create(&managerID, NULL, manager, this); // 管理线程调用管理函数
for (int i = 0; i < minNum; i++)
{
pthread_create(&threadIDs[i], NULL, worker, this); // 工作线程调用worker函数
cout<<"创建子线程,ID: "<<to_string(threadIDs[i]) << endl;
}
} while (0);
}

template<typename T>
ThreadPool<T>::~ThreadPool(){
shutdown = true;
// 销毁管理者线程
pthread_join(managerID, NULL);
// 唤醒所有消费者线程
for(int i =0;i<liveNum;i++){
pthread_cond_signal(&notEmpty);
}
if(taskQ) delete taskQ;
if(threadIDs) delete[]threadIDs;
pthread_mutex_destroy(&mutexpool);
pthread_cond_destroy(&notEmpty);
}

template<typename T>
void* ThreadPool<T>:: worker(void *arg){
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while(true){
pthread_mutex_lock(&pool->mutexpool); //对线程池上锁
while(pool->taskQ->taskNumber()==0 && !pool->shutdown){
// 阻塞工作线程
cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;
pthread_cond_wait(&pool->notEmpty,&pool->mutexpool);

// 解除阻塞后,判断是不是要销毁线程
if (pool->exitNum>0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexpool);
pool->threadExit();
}
}
}

// 判断线程是否被关闭了
if(pool->shutdown){
pthread_mutex_unlock(&pool->mutexpool);
pool->threadExit();
}

// 从任务队列中取出一个任务
Task<T> task = pool->taskQ->takeTask();
// 工作的线程+1
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexpool); // 用完以后解锁

cout << "thread " << to_string(pthread_self()) << " start working..." << endl;

task.function(task.arg);
delete task.arg;
task.arg= nullptr;
cout << "thread " << to_string(pthread_self()) << " end working..." << endl;

pthread_mutex_lock(&pool->mutexpool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexpool);
}

return nullptr;
}

// 管理者线程任务函数
template<typename T>
void* ThreadPool<T>::manager(void* arg){
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 如果线程池没有关闭, 就一直检测。
while(!pool->shutdown){
sleep(3);
pthread_mutex_lock(&pool->mutexpool);
int queueSize=pool->taskQ->taskNumber();
int liveNum = pool->liveNum;
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexpool);

// 创建线程
// 当前任务个数>存活的线程数 && 存活的线程数<最大线程个数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
// 线程池加锁
pthread_mutex_lock(&pool->mutexpool);
int num = 0;
for (int i = 0; i < pool->maxNum && num < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
num++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexpool);
}

//销毁线程
if(busyNum*2 < liveNum && liveNum> pool->minNum){
pthread_mutex_lock(&pool->mutexpool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexpool);
// 让工作的线程自杀
for(int i=0;i<NUMBER;++i){
pthread_cond_signal(&pool->notEmpty);
}
}
}
return nullptr;
}

template<typename T>
void ThreadPool<T>::threadExit(){
pthread_t tid = pthread_self();
for (int i = 0; i < maxNum; ++i)
{
if (threadIDs[i] == tid)
{
cout << "threadExit() function: thread "
<< to_string(pthread_self()) << " exiting..." << endl;
threadIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}

template<typename T>
void ThreadPool<T>::addTask(Task<T> task)
{
// 往任务队列加任务的时候,不需要自己加锁。内部已经做了同步
if (shutdown)
{
return;
}
// 添加任务,不需要加锁,任务队列中有锁
taskQ->addTask(task);
// 唤醒工作的线程
pthread_cond_signal(&notEmpty);
}

template<typename T>
int ThreadPool<T>::getAliveNum()
{
int threadNum = 0;
pthread_mutex_lock(&mutexpool);
threadNum = this->liveNum;
pthread_mutex_unlock(&mutexpool);
return threadNum;
}

template<typename T>
int ThreadPool<T>::getBusyNum()
{
int busyNumt = 0;
pthread_mutex_lock(&mutexpool);
busyNumt = this->busyNum;
pthread_mutex_unlock(&mutexpool);
return busyNumt;
}

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <unistd.h>
#include "ThreadPool.h"
#include "ThreadPool.cpp"

using std::cout;
using std::endl;

void taskFunc(void* arg){
int num = *(int*)arg;
cout<<"thread "<< pthread_self() <<" is working, number = "<<num<<endl;
sleep(1);
}


int main(){

// 创建线程池
ThreadPool<int> pool(3,10);// 做小运行线程数为3,最大运行线程数为10
for(int i=0;i<100;i++){
int* num = new int (i+100);
pool.addTask(Task<int>(taskFunc,num));
}
sleep(30);
return 0;
}

C++实现线程池二

C++11 实现
线程池需要维护的两个主要组成部分:

  • 任务队列:这里直接用一个队列queue来实现
  • 线程池:

ThreadPool类:在构造函数中创建了指定数目的线程。在每个线程中,不断地从任务队列中获取任务并执行,直到线程池被停止。在 enqueue() 函数中,将任务封装成一个 std::function 对象,并将它添加到任务队列中。在 ThreadPool 的析构函数中,我们等待所有线程执行完成后再停止所有线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include<iostream>
#include<thread>
#include<mutex>
#include<string>
#include<condition_variable>
#include<queue>
#include<functional>

using std::cout;
using std::endl;


class ThreadPool{
public:
ThreadPool(int numThreads):stop(false){
for (int i = 0; i < numThreads; i++)
{
threads.emplace_back([this]{
while(1){
std::unique_lock<std::mutex> lock(mtx);
condition.wait(lock,[this]{
return !tasks.empty()||stop;
});
if(stop && tasks.empty()){
return;
}

std::function<void()> task(std::move(tasks.front()));
tasks.pop();
lock.unlock();
task();
}
});
}
}

~ThreadPool(){
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
condition.notify_all();//通知所有线程,抓紧时间完成所有任务
for(auto& t: threads){
t.join();
}
}

template<class F, class... Args>
void enqueue(F &&f, Args&&... args){
std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

{
std::unique_lock<std::mutex> lock(mtx);
tasks.emplace(std::move(task));
}
condition.notify_one();

}

private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;

std::mutex mtx;
std::condition_variable condition;

bool stop;
};

int main(){
ThreadPool pool(4);
for (int i = 0; i < 8; ++i) {
pool.enqueue([i] {
std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task " << i << " is done" << std::endl;
});
}
return 0;
}

async future packaged_task promise

async是C++11引入的函数模板,用于异步执行一个函数,并返回std::future对象,表示异步操作的结果,使用std::async可以方便地进行异步变成,避免了,手动创建线程和管理线程的麻烦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include<iostream>
#include<future>

using namespace std;

int func(){
int i = 0;
for(i=0;i<1000;i++){
i++;
}
return i;
}

int main(){
// 传入的func已经开辟新的线程在执行了,执行结果返回给future_result
// 实际用法,就是开辟多线程更加方便了
std::future<int> future_result = std::async(std::launch::async,func);
cout<< func() <<endl;
cout<<future_result.get()<<endl;
return 0;
}

packaged_task是一个类模板,用于将一个可调用对象(如函数,函数对象或lambda表达式)封装成一个异步操作,并返回一个std::future对象,表示异步操作的结果。packaged_task可以方便的将一个函数或可调用对象转换为一个异步操作,供其他线程使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include<iostream>
#include<future>

using namespace std;

int func(){
int i = 0;
for(i=0;i<1000;i++){
i++;
}
return i;
}

int main(){

std::packaged_task<int()> task(func);
auto future_result = task.get_future();//此时只是封装一个异步操作,但是还没有真正执行异步操作。

//packaged_task是一个可移动对象,要把可移动对象放到线程中执行,要用move转移一下,把函数放到线程中执行。
std::thread t1(std::move(task));

cout<< func() <<endl;
t1.join();
cout<<future_result.get()<<endl;
return 0;
}

std::promise是一个协助线程赋值的类模板,用于在线程中产生一个值,并在另一个线程中获取这个值。promise通常与future和async一起使用,用于实现异步编程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include<iostream>
#include<future>

using namespace std;

void func(std::promise<int> &f){
f.set_value(1000);
}

int main(){
std::promise<int> f;
auto future_result = f.get_future();
std::thread t1(func,std::ref(f));
t1.join();
cout<<future_result.get()<<endl; //1000
return 0;
}

atomic 原子操作

C++11提供了一个原子类型std::atomic<T>,通过这个原子类型管理的内部变量就可以称之为原子变量,我们可以给原子类型指定bool、char、int、long、指针等类型作为模板参数(不支持浮点类型和复合类型)。提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中数据竞争问题。

原子指的是一系列不可被CPU上下文交换的机器指令,这些指令组合在一起就形成了原子操作,在多核CPU下,当某个CPU核心开始运行原子操作时,会暂停其他CPU内核对内存的操作,以保证原子操作不会被其他CPU内核干扰。

由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。

在多线程操作中,使用原子变量之后就不需要再使用互斥量来保护该变量了,用起来更简洁。因为对原子变量进行的操作只能是一个原子操作(atomic operation),原子操作指的是不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何的上下文切换。

常用的原子操作:

load():将std::atomic变量值加载到当前线程的本地缓存中,并返回这个值。
store(val):将val值存储到atomic变量中,并保证这个操作是原子性的。
exchange(val):改变值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include<iostream>
#include<thread>
#include<mutex>
#include<atomic>

using namespace std;
std::atomic<int> shared_data(0);

void func(){
for(int i=0;i<1000;i++){
shared_data++;
}
}

int main(){
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout<<"shared_data = "<<shared_data<<std::endl;
return 0;
}

使用了原子变量之后,就不需要再定义互斥量了,在使用上更加简便,并且这两种方式都能保证在多线程操作过程中数据的正确性,不会出现数据的混乱。

C++实现线程池三

C++11实现线程池:一个简易的单任务队列线程池的实现思路:在线程池构造时初始化线程数,在析构时停止线程池。对外也只需要提供提交任务的接口就够了。

  • 任务队列:用mutex限制队列的并发访问。
  • 线程池:接收任何参数的任何函数(普通函数,lambda,成员函数…),立即返回任务结束的结果,避免阻塞主线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#pragma once

#include <mutex>
#include <queue>
#include <functional>
#include <future>
#include <thread>
#include <utility>
#include <vector>

// 队列,同时用mutex限制并发
template <typename T>
class SafeQueue
{
private:
std::queue<T> m_queue; // 利用模板函数构造队列
std::mutex m_mutex; // 访问互斥信号量

public:
SafeQueue() {}
SafeQueue(SafeQueue &&other) {}
~SafeQueue() {}

bool empty() // 返回队列是否为空
{
std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
return m_queue.empty();
}

int size()
{
std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
return m_queue.size();
}

/* // 队列添加元素 在添加lambda表达式时会报错,因为enqueue 方法期望的是一个左值引用,lambda表达式是无名的右值
void enqueue(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
} */

// 万能引用
void enqueue(T&& t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(std::forward<T>(t));// 队列添加元素 完美转发
}

// 队列取出元素
bool dequeue(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex); // 队列加锁
if (m_queue.empty())
return false;
t = std::move(m_queue.front()); // 取出队首元素,返回队首元素值,并进行右值引用
m_queue.pop(); // 弹出入队的第一个元素
return true;
}
};

class ThreadPool
{
private:
class ThreadWorker // 内置线程工作类,执行真正的工作
{
private:
int m_id; // 工作id
ThreadPool *m_pool; // 所属线程池
public:
// 构造函数
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id){}
// 重载()操作,核心操作,不然函数运行不起来
void operator()(){
std::function<void()> func; //定义基础函数类func
bool dequeued; // 是否正在取出队列中元素
while (!m_pool->m_shutdown){
{
// 为线程环境加锁,互访问工作线程的休眠和唤醒
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
// 如果任务队列为空,阻塞当前线程
if (m_pool->m_queue.empty())
{
m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
}

// 取出任务队列中的元素
dequeued = m_pool->m_queue.dequeue(func);
}
// 如果成功取出,执行工作函数
if (dequeued)
func();
}
}
};
bool m_shutdown; // 线程池是否关闭

SafeQueue<std::function<void()>> m_queue; // 执行函数安全队列,即任务队列

std::vector<std::thread> m_threads; // 工作线程队列

std::mutex m_conditional_mutex; // 线程休眠锁互斥变量

std::condition_variable m_conditional_lock; // 线程环境锁,可以让线程处于休眠或者唤醒状态

public:
// 线程池构造函数
ThreadPool(const int n_threads = 4)
: m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
{
}
// 保证资源的唯一性
ThreadPool(const ThreadPool &) = delete; // 禁用拷贝构造

ThreadPool(ThreadPool &&) = delete; // 禁用移动构造

ThreadPool &operator=(const ThreadPool &) = delete; // 禁用拷贝赋值运算符

ThreadPool &operator=(ThreadPool &&) = delete; // 禁用移动赋值运算符

// 初始化线程池
void init()
{
for (int i = 0; i < m_threads.size(); ++i)
{
m_threads[i] = std::thread(ThreadWorker(this, i)); // 分配工作线程
}
}

// 销毁线程池,等待线程池中的工作完成
void shutdown()
{
m_shutdown = true;
m_conditional_lock.notify_all(); // 通知,唤醒所有工作线程,抓紧时间处理工作

for (int i = 0; i < m_threads.size(); ++i)
{
if (m_threads[i].joinable()) // 判断线程是否在等待
{
m_threads[i].join(); // 将线程加入到等待队列
}
}
}

/**
* template <typename F, typename... Args> 可变参数模板。
* auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> {} 尾返回类型推导的技巧。
* 利用auto关键字,将返回类型后置,可以扩充函数的类型。
* F&& f和Args&&... args中的&&并非是右值引用意思,而是万能引用
*/

template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{
// Create a function with bounded parameter ready to execute
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误

// Encapsulate it into a shared pointer in order to be able to copy construct
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

/* // Warp packaged task into void function
std::function<void()>warpper_func = [task_ptr](){
(*task_ptr)();
};

// 队列通用安全封包函数,并压入安全队列
m_queue.enqueue(warpper_func); */

m_queue.enqueue([task_ptr](){
(*task_ptr)();
});

// 唤醒一个等待中的线程
m_conditional_lock.notify_one();

// 返回先前注册的任务指针
return task_ptr->get_future();
}
};

参考列表:

  1. https://www.bilibili.com/video/BV1d841117SH
  2. https://subingwen.cn/linux/threadpool-cpp/
  3. https://segmentfault.com/a/1190000002655852
  4. https://paul.pub/cpp-concurrency

C++实现线程池二
https://cauccliu.github.io/2024/03/26/C++线程池二/
Author
Liuchang
Posted on
March 26, 2024
Licensed under