C++实现线程池二 线程池 大型的软件项目需要处理非常多的任务,例如:对于大量数据的数据流处理,或者是包含复杂GUI界面的应用程序。如果将所有的任务都以串行的方式执行,则整个系统的效率将会非常低下,应用程序的用户体验会非常的差。
如果一个系统支持多个动作同时存在,那么这个系统就是一个并发系统。如果这个系统还支持多个动作(物理时间上)同时执行,那么这个系统就是一个并行系统。
C++ 实现生产者消费者模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 #include <chrono> #include <condition_variable> #include <future> #include <mutex> #include <queue> std::mutex mutex; std::condition_variable condvar; std::queue<int > msgQueue;void producer (int start, int end) { for (int x = start; x < end; x++) { std::this_thread::sleep_for (std::chrono::milliseconds (200 )); { std::lock_guard<std::mutex> guard (mutex) ; msgQueue.push (x); } printf ("Produce message %d\n" , x); condvar.notify_all (); } }void consumer (int demand) { while (true ) { std::unique_lock<std::mutex> ulock (mutex) ; condvar.wait (ulock, []{ return msgQueue.size () > 0 ;}); printf ("Consume message %d\n" , msgQueue.front ()); msgQueue.pop (); --demand; if (!demand) break ; } }int main () { std::thread producer1 (producer, 0 , 10 ) ; std::thread producer2 (producer, 10 , 20 ) ; std::thread producer3 (producer, 20 , 30 ) ; std::thread consumer1 (consumer, 20 ) ; std::thread consumer2 (consumer, 10 ) ; producer1.join (); producer2.join (); producer3.join (); consumer1.join (); consumer2.join (); }
C++线程池 对上一篇文章中的线程池进行改写,将C语言为C++编写,创建线程池对象,包含队列,从队列中取出线程并运行线程程序
1 2 3 4 5 6 7 8 9 10 liuchang@DESKTOP-LIUCHANG:~/codetest/THreads/cppthreadpool$ tree . ├── main.cpp ├── main.exe ├── Makefile ├── run.sh ├── TaskQueue.cpp ├── TaskQueue.h ├── ThreadPool.cpp └── ThreadPool.h
TaskQueue.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #pragma once #include <queue> #include <pthread.h> using callback = void (*)(void * arg);template <typename T>struct Task { Task <T>(){ function = nullptr ; arg = nullptr ; } Task <T>(callback f, void * arg) { function = f; this ->arg = (T*)arg; } callback function; T *arg; };template <typename T>class TaskQueue {public : TaskQueue (); ~TaskQueue (); void addTask (Task<T> &task) ; void addTask (callback func, void * arg) ; Task<T> takeTask () ; inline size_t taskNumber () { return m_taskQ.size (); }private : pthread_mutex_t m_mutex; std::queue<Task<T>> m_taskQ; };
TaskQueue.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include "TaskQueue.h" template <typename T> TaskQueue<T>::TaskQueue (){ pthread_mutex_init (&m_mutex, NULL ); }template <typename T> TaskQueue<T>::~TaskQueue (){ pthread_mutex_destroy (&m_mutex); }template <typename T>void TaskQueue<T>::addTask (Task<T> &task){ pthread_mutex_lock (&m_mutex); m_taskQ.push (task); pthread_mutex_unlock (&m_mutex); }template <typename T>void TaskQueue<T>::addTask (callback func, void * arg){ pthread_mutex_lock (&m_mutex); m_taskQ.push (Task <T>(func,arg)); pthread_mutex_unlock (&m_mutex); }template <typename T> Task<T> TaskQueue<T>::takeTask (){ Task<T> t; pthread_mutex_lock (&m_mutex); if (!m_taskQ.empty ()){ t = m_taskQ.front (); m_taskQ.pop (); } pthread_mutex_unlock (&m_mutex); return t; }
ThreadPool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #pragma once #include "TaskQueue.h" #include "TaskQueue.cpp" template <typename T>class ThreadPool {public : ThreadPool (int min, int max); ~ThreadPool (); void addTask (Task<T> task) ; int getBusyNum () ; int getAliveNum () ;private : static void *worker (void *arg) ; static void *manager (void *arg) ; void threadExit () ;private : TaskQueue<T> *taskQ; pthread_t managerID; pthread_t *threadIDs; int minNum; int maxNum; int busyNum; int liveNum; int exitNum; pthread_mutex_t mutexpool; pthread_cond_t notEmpty; static const int NUMBER = 2 ; bool shutdown=false ; };
ThreadPool.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 #include "ThreadPool.h" #include <iostream> #include <string.h> #include <string> #include <unistd.h> using std::cout;using std::endl;using std::to_string;template <typename T> ThreadPool<T>::ThreadPool (int min, int max){ taskQ = new TaskQueue<T>; do { minNum = min; maxNum = max; busyNum = 0 ; liveNum = min; threadIDs = new pthread_t [max]; if (threadIDs == nullptr ){ cout<<"malloc threadIDs failed..." <<endl; break ; } memset (threadIDs,0 ,sizeof (pthread_t ) * maxNum); exitNum = 0 ; if (pthread_mutex_init (&mutexpool, NULL ) != 0 || pthread_cond_init (¬Empty, NULL ) != 0 ) { cout<<"mutex or cond init failed.." <<endl; break ; } shutdown = false ; pthread_create (&managerID, NULL , manager, this ); for (int i = 0 ; i < minNum; i++) { pthread_create (&threadIDs[i], NULL , worker, this ); cout<<"创建子线程,ID: " <<to_string (threadIDs[i]) << endl; } } while (0 ); }template <typename T> ThreadPool<T>::~ThreadPool (){ shutdown = true ; pthread_join (managerID, NULL ); for (int i =0 ;i<liveNum;i++){ pthread_cond_signal (¬Empty); } if (taskQ) delete taskQ; if (threadIDs) delete []threadIDs; pthread_mutex_destroy (&mutexpool); pthread_cond_destroy (¬Empty); }template <typename T>void * ThreadPool<T>:: worker (void *arg){ ThreadPool* pool = static_cast <ThreadPool*>(arg); while (true ){ pthread_mutex_lock (&pool->mutexpool); while (pool->taskQ->taskNumber ()==0 && !pool->shutdown){ cout << "thread " << to_string (pthread_self ()) << " waiting..." << endl; pthread_cond_wait (&pool->notEmpty,&pool->mutexpool); if (pool->exitNum>0 ) { pool->exitNum--; if (pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock (&pool->mutexpool); pool->threadExit (); } } } if (pool->shutdown){ pthread_mutex_unlock (&pool->mutexpool); pool->threadExit (); } Task<T> task = pool->taskQ->takeTask (); pool->busyNum++; pthread_mutex_unlock (&pool->mutexpool); cout << "thread " << to_string (pthread_self ()) << " start working..." << endl; task.function (task.arg); delete task.arg; task.arg= nullptr ; cout << "thread " << to_string (pthread_self ()) << " end working..." << endl; pthread_mutex_lock (&pool->mutexpool); pool->busyNum--; pthread_mutex_unlock (&pool->mutexpool); } return nullptr ; }template <typename T>void * ThreadPool<T>::manager (void * arg){ ThreadPool* pool = static_cast <ThreadPool*>(arg); while (!pool->shutdown){ sleep (3 ); pthread_mutex_lock (&pool->mutexpool); int queueSize=pool->taskQ->taskNumber (); int liveNum = pool->liveNum; int busyNum = pool->busyNum; pthread_mutex_unlock (&pool->mutexpool); if (queueSize > liveNum && liveNum < pool->maxNum) { pthread_mutex_lock (&pool->mutexpool); int num = 0 ; for (int i = 0 ; i < pool->maxNum && num < NUMBER && pool->liveNum < pool->maxNum; ++i) { if (pool->threadIDs[i] == 0 ) { pthread_create (&pool->threadIDs[i], NULL , worker, pool); num++; pool->liveNum++; } } pthread_mutex_unlock (&pool->mutexpool); } if (busyNum*2 < liveNum && liveNum> pool->minNum){ pthread_mutex_lock (&pool->mutexpool); pool->exitNum = NUMBER; pthread_mutex_unlock (&pool->mutexpool); for (int i=0 ;i<NUMBER;++i){ pthread_cond_signal (&pool->notEmpty); } } } return nullptr ; }template <typename T>void ThreadPool<T>::threadExit (){ pthread_t tid = pthread_self (); for (int i = 0 ; i < maxNum; ++i) { if (threadIDs[i] == tid) { cout << "threadExit() function: thread " << to_string (pthread_self ()) << " exiting..." << endl; threadIDs[i] = 0 ; break ; } } pthread_exit (NULL ); }template <typename T>void ThreadPool<T>::addTask (Task<T> task) { if (shutdown) { return ; } taskQ->addTask (task); pthread_cond_signal (¬Empty); }template <typename T>int ThreadPool<T>::getAliveNum () { int threadNum = 0 ; pthread_mutex_lock (&mutexpool); threadNum = this ->liveNum; pthread_mutex_unlock (&mutexpool); return threadNum; }template <typename T>int ThreadPool<T>::getBusyNum () { int busyNumt = 0 ; pthread_mutex_lock (&mutexpool); busyNumt = this ->busyNum; pthread_mutex_unlock (&mutexpool); return busyNumt; }
main.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <unistd.h> #include "ThreadPool.h" #include "ThreadPool.cpp" using std::cout;using std::endl;void taskFunc (void * arg) { int num = *(int *)arg; cout<<"thread " << pthread_self () <<" is working, number = " <<num<<endl; sleep (1 ); }int main () { ThreadPool<int > pool (3 ,10 ) ; for (int i=0 ;i<100 ;i++){ int * num = new int (i+100 ); pool.addTask (Task <int >(taskFunc,num)); } sleep (30 ); return 0 ; }
C++实现线程池二 C++11 实现 线程池需要维护的两个主要组成部分:
任务队列:这里直接用一个队列queue来实现
线程池:
ThreadPool类:在构造函数中创建了指定数目的线程。在每个线程中,不断地从任务队列中获取任务并执行,直到线程池被停止。在 enqueue() 函数中,将任务封装成一个 std::function 对象,并将它添加到任务队列中。在 ThreadPool 的析构函数中,我们等待所有线程执行完成后再停止所有线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include <iostream> #include <thread> #include <mutex> #include <string> #include <condition_variable> #include <queue> #include <functional> using std::cout;using std::endl;class ThreadPool {public : ThreadPool (int numThreads):stop (false ){ for (int i = 0 ; i < numThreads; i++) { threads.emplace_back ([this ]{ while (1 ){ std::unique_lock<std::mutex> lock (mtx); condition.wait (lock,[this ]{ return !tasks.empty ()||stop; }); if (stop && tasks.empty ()){ return ; } std::function<void ()> task (std::move (tasks.front ())); tasks.pop (); lock.unlock (); task (); } }); } } ~ThreadPool (){ { std::unique_lock<std::mutex> lock (mtx) ; stop = true ; } condition.notify_all (); for (auto & t: threads){ t.join (); } } template <class F, class ... Args> void enqueue (F &&f, Args&&... args) { std::function<void ()> task = std::bind (std::forward<F>(f), std::forward<Args>(args)...); { std::unique_lock<std::mutex> lock (mtx) ; tasks.emplace (std::move (task)); } condition.notify_one (); }private : std::vector<std::thread> threads; std::queue<std::function<void ()>> tasks; std::mutex mtx; std::condition_variable condition; bool stop; };int main () { ThreadPool pool (4 ) ; for (int i = 0 ; i < 8 ; ++i) { pool.enqueue ([i] { std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "Task " << i << " is done" << std::endl; }); } return 0 ; }
async future packaged_task promise async是C++11引入的函数模板,用于异步执行一个函数,并返回std::future对象,表示异步操作的结果,使用std::async可以方便地进行异步变成,避免了,手动创建线程和管理线程的麻烦。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <future> using namespace std;int func () { int i = 0 ; for (i=0 ;i<1000 ;i++){ i++; } return i; }int main () { std::future<int > future_result = std::async (std::launch::async,func); cout<< func () <<endl; cout<<future_result.get ()<<endl; return 0 ; }
packaged_task是一个类模板,用于将一个可调用对象(如函数,函数对象或lambda表达式)封装成一个异步操作,并返回一个std::future对象,表示异步操作的结果。packaged_task可以方便的将一个函数或可调用对象转换为一个异步操作,供其他线程使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <future> using namespace std;int func () { int i = 0 ; for (i=0 ;i<1000 ;i++){ i++; } return i; }int main () { std::packaged_task<int () > task (func) ; auto future_result = task.get_future (); std::thread t1 (std::move(task)) ; cout<< func () <<endl; t1.join (); cout<<future_result.get ()<<endl; return 0 ; }
std::promise是一个协助线程赋值的类模板,用于在线程中产生一个值,并在另一个线程中获取这个值。promise通常与future和async一起使用,用于实现异步编程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 #include <iostream> #include <future> using namespace std;void func (std::promise<int > &f) { f.set_value (1000 ); }int main () { std::promise<int > f; auto future_result = f.get_future (); std::thread t1 (func,std::ref(f)) ; t1.join (); cout<<future_result.get ()<<endl; return 0 ; }
atomic 原子操作 C++11提供了一个原子类型std::atomic<T>,通过这个原子类型管理的内部变量就可以称之为原子变量,我们可以给原子类型指定bool、char、int、long、指针等类型作为模板参数(不支持浮点类型和复合类型)。提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中数据竞争问题。
原子指的是一系列不可被CPU上下文交换的机器指令,这些指令组合在一起就形成了原子操作,在多核CPU下,当某个CPU核心开始运行原子操作时,会暂停其他CPU内核对内存的操作,以保证原子操作不会被其他CPU内核干扰。
由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多 。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能 ,几乎所有的语言都支持原子类型。
在多线程操作中,使用原子变量之后就不需要再使用互斥量来保护该变量了,用起来更简洁。因为对原子变量进行的操作只能是一个原子操作(atomic operation),原子操作指的是不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何的上下文切换。
常用的原子操作:
load():将std::atomic变量值加载到当前线程的本地缓存中,并返回这个值。 store(val):将val值存储到atomic变量中,并保证这个操作是原子性的。 exchange(val):改变值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #include <iostream> #include <thread> #include <mutex> #include <atomic> using namespace std;std::atomic<int > shared_data (0 ) ;void func () { for (int i=0 ;i<1000 ;i++){ shared_data++; } }int main () { std::thread t1 (func) ; std::thread t2 (func) ; t1.join (); t2.join (); std::cout<<"shared_data = " <<shared_data<<std::endl; return 0 ; }
使用了原子变量之后,就不需要再定义互斥量了,在使用上更加简便,并且这两种方式都能保证在多线程操作过程中数据的正确性,不会出现数据的混乱。
C++实现线程池三 C++11实现线程池:一个简易的单任务队列
线程池的实现思路:在线程池构造时初始化线程数,在析构时停止线程池。对外也只需要提供提交任务的接口就够了。
任务队列:用mutex限制队列的并发访问。
线程池:接收任何参数的任何函数(普通函数,lambda,成员函数…),立即返回任务结束的结果,避免阻塞主线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 #pragma once #include <mutex> #include <queue> #include <functional> #include <future> #include <thread> #include <utility> #include <vector> template <typename T>class SafeQueue {private : std::queue<T> m_queue; std::mutex m_mutex; public : SafeQueue () {} SafeQueue (SafeQueue &&other) {} ~SafeQueue () {} bool empty () { std::unique_lock<std::mutex> lock (m_mutex) ; return m_queue.empty (); } int size () { std::unique_lock<std::mutex> lock (m_mutex) ; return m_queue.size (); } void enqueue (T&& t) { std::unique_lock<std::mutex> lock (m_mutex) ; m_queue.emplace (std::forward<T>(t)); } bool dequeue (T &t) { std::unique_lock<std::mutex> lock (m_mutex) ; if (m_queue.empty ()) return false ; t = std::move (m_queue.front ()); m_queue.pop (); return true ; } };class ThreadPool {private : class ThreadWorker { private : int m_id; ThreadPool *m_pool; public : ThreadWorker (ThreadPool *pool, const int id) : m_pool (pool), m_id (id){} void operator () () { std::function<void ()> func; bool dequeued; while (!m_pool->m_shutdown){ { std::unique_lock<std::mutex> lock (m_pool->m_conditional_mutex) ; if (m_pool->m_queue.empty ()) { m_pool->m_conditional_lock.wait (lock); } dequeued = m_pool->m_queue.dequeue (func); } if (dequeued) func (); } } }; bool m_shutdown; SafeQueue<std::function<void ()>> m_queue; std::vector<std::thread> m_threads; std::mutex m_conditional_mutex; std::condition_variable m_conditional_lock; public : ThreadPool (const int n_threads = 4 ) : m_threads (std::vector <std::thread>(n_threads)), m_shutdown (false ) { } ThreadPool (const ThreadPool &) = delete ; ThreadPool (ThreadPool &&) = delete ; ThreadPool &operator =(const ThreadPool &) = delete ; ThreadPool &operator =(ThreadPool &&) = delete ; void init () { for (int i = 0 ; i < m_threads.size (); ++i) { m_threads[i] = std::thread (ThreadWorker (this , i)); } } void shutdown () { m_shutdown = true ; m_conditional_lock.notify_all (); for (int i = 0 ; i < m_threads.size (); ++i) { if (m_threads[i].joinable ()) { m_threads[i].join (); } } } template <typename F, typename ... Args> auto submit (F &&f, Args &&...args) -> std::future<decltype (f(args...)) > { std::function<decltype (f(args...))()> func = std::bind (std::forward<F>(f), std::forward<Args>(args)...); auto task_ptr = std::make_shared<std::packaged_task<decltype (f (args...))()>>(func); m_queue.enqueue ([task_ptr](){ (*task_ptr)(); }); m_conditional_lock.notify_one (); return task_ptr->get_future (); } };
参考列表:
https://www.bilibili.com/video/BV1d841117SH
https://subingwen.cn/linux/threadpool-cpp/
https://segmentfault.com/a/1190000002655852
https://paul.pub/cpp-concurrency