Linux学习八 : 线程同步 线程同步 临界资源
:被多个线程共享的资源,如共享内存,共享文件,共享设备等。临界区
:在多线程程序中,访问共享内存的代码段。 线程同步是指多个线程在并发执行过程中,为了保持数据一致性,协调和合作的对数据进行访问。
互斥锁:mutex:一个线程获得互斥锁,其他线程等待。
条件变量:Condition Variable:线程发现条件不满足时,调用条件变量的等待操作,直到其他线程对条件进行通知。
读写锁:ReadWrite Lock:读可以多线程,写只能有一个写入。
自旋锁:Spin Lock:忙等待的锁机制,循环等待,直到获得锁为止。
信号量:Semaphore:计数器,用于控制同时访问某个资源的线程数量。
死锁
:两个或多个以上线程(或进程)互相等待对方持有的资源而无法继续执行的现象。死锁产生的四个必要条件
:
互斥条件:资源不能共享,只能被一个进程使用。
请求和保持条件:一个进程至少占有一个资源,并等待获取其他进程占有的资源。
一次申请所有资源:资源利用率低,进程饿死。
提取新的请求时,释放原来占有的资源。
不可剥夺条件:资源只能由进程释放,不可被剥夺。
申请不到资源时,释放自己资源。
申请不到资源时,抢占对方资源。
循环等待条件:存在等待立案条件,每个进程都在等待下一个进程所占有的资源。
互斥锁 互斥锁:只有一个线程能访问被保护资源。需要初始化互斥量,加锁,解锁和销毁互斥量。
pthread_mutex_t mutex; 申请互斥锁。 int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr); 初始化互斥锁 int pthread_mutex_lock(pthread_mutex_t *mutex); 上锁 int pthread_mutex_trylock(pthread_mutex_t *mutex); 尝试加锁,如果没锁,加锁成功;如果已经有锁,不会阻塞,直接返回失败。 int pthread_mutex_unlock(pthread_mutex_t *mutex);解锁,哪个线程加锁,就要哪个线程解锁。 int pthread_mutex_destroy(pthread_mutex_t *mutex); 销毁互斥锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include <unistd.h> #include <string.h> #include <iostream> #include <pthread.h> using std::cout;using std::cerr;using std::endl;int count = 10000 ;pthread_mutex_t mutex; void *thread_proc1 (void * arg) { while (1 ){ pthread_mutex_lock (&mutex); if (count>0 ){ count--; cout<<"t1 :" <<count<<endl; } pthread_mutex_unlock (&mutex); } return NULL ; }void *thread_proc2 (void * arg) { while (1 ){ pthread_mutex_lock (&mutex); if (count>0 ){ count--; cout<<"t2 :" <<count<<endl; } pthread_mutex_unlock (&mutex); } return NULL ; }int main () { pthread_t t1,t2; pthread_mutex_init (&mutex,NULL ); pthread_create (&t1,NULL ,thread_proc1,NULL ); pthread_create (&t2,NULL ,thread_proc2,NULL ); pthread_join (t1,NULL ); pthread_join (t2,NULL ); pthread_mutex_destroy (&mutex); return 0 ; }
C++互斥锁
std::mutex:独占的互斥锁,不能递归使用 std::timed_mutex:带超时的独占互斥锁,不能递归使用 std::recursive_mutex:递归互斥锁,不带超时功能,可以使一个线程多次获取锁。 std::recursive_timed_mutex:带超时的递归互斥锁
std::mutex
的常用操作:
lock() unlock() trylock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include <thread> #include <mutex> int a = 0 ; std::mutex mtx;void func () { for (int i=0 ;i<1000 ;i++){ mtx.lock (); a+=1 ; mtx.unlock (); } }int main () { std::thread t1 (func) ; std::thread t2 (func) ; t1.join (); t2.join (); std::cout<<a<<std::endl; return 0 ; }
std::lock_guard:
C++11新增的模板类,可以简化互斥锁lock()和unlock()的写法,自动进行加锁和解锁,同时也更安全。
mutex需要自己加锁解锁。
lock_guard会自动锁定互斥量,而在退出作用域后进行析构时就会自动解锁,从而保证了互斥量的正确操作,避免忘记unlock()操作而导致线程死锁。一般在局部作用域中使用 ,使用完即使销毁。
lock_guard使用了RAII技术,就是在类构造函数中分配资源,在析构函数中释放资源,保证资源出了作用域就释放。
1 2 3 4 for (int i=0 ;i<1000 ;i++){ std::lock_guard<std::mutex> lg (mtx) ; a+=1 ; }
std::recursive_mutex
:递归互斥锁,允许同一线程多次获得互斥锁。std::timed_mutex
:超时独占互斥锁,可以用做加超时的锁操作,mutex不支持超时锁设置。
std::unique_lock
: C++ 标准库中提供的一个互斥量封装类,也可以自动进行加锁和解锁,用于在多线程程序中对互斥量进行加锁和解锁操作。它的主要特点是可以对互斥量进行更加灵活的管理,包括延迟加锁、条件变量、超时等。
lock():加锁 try_lock():尝试加锁,不阻塞 try_lock_for(const std::chrono::duration<Rep, Period>& rel_time):尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁,或者超过了指定的时间。 try_lock_until(const std::chrono::time_point<Clock, Duration>& abs_time):指定时间节点,阻塞 unlock():对互斥量进行解锁操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <thread> #include <mutex> int a = 0 ; std::timed_mutex mtx;void func () { for (int i=0 ;i<2 ;i++){ std::unique_lock<std::timed_mutex> lg (mtx,std::defer_lock) ; if (lg.try_lock_for (std::chrono::seconds (2 ))){ std::this_thread::sleep_for (std::chrono::seconds (1 )); a+=1 ; } } }int main () { std::thread t1 (func) ; std::thread t2 (func) ; t1.join (); t2.join (); std::cout<<a<<std::endl; return 0 ; }
std::call_once 在某些特定情况下,某些函数只能在多线程环境下调用一次。单例设计模式是一种常见的设计模式,用于确保某个类只能创建一个实例。由于单例实例是全局唯一的,因此在多线程环境中使用单例模式时,需要考虑线程安全的问题。
std::call_once()来保证函数在多线程环境下只能被调用一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #include <iostream> #include <thread> #include <mutex> static std::once_flag once;void print_once () { std::cout<<__TIME__<<" once" <<std::endl; }void print_twice () { call_once (once, print_once); std::cout<<__TIME__<<" " <<std::endl; }int main () { std::thread t1 (print_twice) ; std::thread t2 (print_twice) ; t1.join (); t2.join (); return 0 ; }
条件变量 条件变量用来阻塞线程,直到某个特定条件得到满足。
只使用条件变量无法实现线程的同步, 必须要配合互斥锁来使用。当一个线程需要等待某个条件满足时,它可以通过条件变量将自己阻塞,同时释放互斥锁,使其他线程可以继续执行。条件变量提供了一个可以让多个线程间同步协作的功能。这对于生产者-消费者模型很有意义。
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 条件变量初始化 也可以用静态初始化进行:pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int pthread_cond_destroy(pthread_cond_t *cond); 条件变量销毁。 int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); 等待阻塞,阻塞的过程中需要一个互斥锁参数,互斥锁让线程进入临界区,避免数据混乱。
在阻塞线程时候,如果线程对互斥锁mutex上锁,那么会将这把锁打开,避免死锁。
线程解除阻塞的时候,函数内部会将这个mutex互斥锁锁上,继续访问临界区。
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); 等待超时等待。 int pthread_cond_signal(pthread_cond_t *cond); 唤醒单个线程。 int pthread_cond_broadcast(pthread_cond_t *cond); 唤醒全部线程。
有了锁以后为什么还需要条件变量?
锁和条件变量解决的不是同一个问题,锁解决的是访问临界区资源的竞争。
条件变量是为了不让线程重复性的加锁解锁。比如有两个线程,线程A和线程B都在竞争资源,但是线程B在满足一定条件的时候才需要执行,如果没有条件变量,就会出现线程B也需要不断地加锁,解锁,这样给系统带来了不必要的开销,浪费了资源。如何让线程B不再不断地加锁解锁,就需要用到条件变量。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 while (true ) { pthread_mutex_lock (&mutex); iCount++; pthread_mutex_unlock (&mutex); }while (true ) { pthread_mutex_lock (&mutex); if (iCount >= 100 ) { iCount = 0 ; } pthread_mutex_unlock (&mutex); }
让线程B不再不断地加锁解锁,用到条件变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 while (true ) { pthread_mutex_lock (&mutex); iCount++; pthread_mutex_unlock (&mutex); pthread_mutex_lock (&mutex); if (iCount >= 100 ) { pthread_cond_signal (&cond); } pthread_mutex_unlock (&mutex); }while (1 ) { pthread_mutex_lock (&mutex); while (iCount < 100 ) { pthread_cond_wait (&cond, &mutex); } printf ("iCount >= 100\r\n" ); iCount = 0 ; pthread_mutex_unlock (&mutex); }
C++条件变量 C++中的条件变量有两种:
condition_variable:需要配合std::unique_lock< std::mutex>进行wait操作,也就是阻塞线程的操作。
condition_variable_any:可以和任意带有lock()、unlock()语义的mutex搭配使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include <iostream> #include <thread> #include <mutex> #include <string> #include <condition_variable> #include <queue> std::queue<int > g_queue; std::condition_variable g_cv; std::mutex mtx;void Producer () { for (int i=0 ;i<10 ;i++){ std::unique_lock<std::mutex> lock (mtx) ; g_queue.push (i); g_cv.notify_one (); std::cout<<"task: " <<i<<std::endl; std::this_thread::sleep_for (std::chrono::microseconds (10 )); } }void Customer () { while (1 ){ std::unique_lock<std::mutex> lock (mtx) ; g_cv.wait (lock, [](){ return !g_queue.empty (); }); int value = g_queue.front (); g_queue.pop (); std::cout<<"Customer " <<value<<std::endl; } }int main () { std::thread t1 (Producer) ; std::thread t2 (Customer) ; t1.join (); t2.join (); return 0 ; }
读写锁 读写锁是一种并发控制机制,允许多个线程同时读取共享资源,但只允许一个线程进行写操作。
读写锁一般分为两种类型:读锁和写锁。
请求读锁时,如果没有其他线程持有写锁,则可以读,否则需要等待写锁。
获取写锁时,如果没有其他线程在持有读锁或写锁,可以写,否则等待写锁和读锁都释放。
读写锁的优点:
多个线程可以同时读共享资源,提高了并发性能。
写操作独占资源,保证了数据的完整性和一致性。
pthread_rwlock_init :读写锁初始化 pthread_rwlock_rdlock :读加锁 pthread_rwlock_tryrdlock :尝试读加锁 pthread_rwlock_timerdlock :超时读加锁 pthread_rwlock_wrlock :写加锁 pthread_rwlock_trywrlock :尝试写加锁 pthread_rwlock_timewrlock :超时写加锁 pthread_rwlock_unlock :解锁(读写共用) pthread_rwlock_destroy :读写锁销毁 还有一些属性操作的函数:比如读优先/写优先。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <unistd.h> #include <string.h> #include <iostream> #include <pthread.h> using std::cout;using std::cerr;using std::endl;int count = 10 ;pthread_rwlock_t rwlock; void *thread_proc1 (void * arg) { while (1 ){ pthread_rwlock_rdlock (&rwlock); cout<<"t1 :" <<count<<endl; pthread_rwlock_unlock (&rwlock); } return NULL ; }void *thread_proc2 (void * arg) { while (1 ){ pthread_rwlock_wrlock (&rwlock); count++; cout<<"t2 :" <<count<<endl; pthread_rwlock_unlock (&rwlock); } return NULL ; }int main () { pthread_rwlock_init (&rwlock,NULL ); pthread_t t1,t2; pthread_create (&t1,NULL ,thread_proc1,NULL ); pthread_create (&t2,NULL ,thread_proc2,NULL ); pthread_join (t1,NULL ); pthread_join (t2,NULL ); pthread_rwlock_destroy (&rwlock); return 0 ; }
自旋锁 自旋锁是一种忙等待的锁,当线程尝试获取锁时,如果发现锁被其他线程持有,它会一直不断地进行忙等待,直到获取锁为止。
优点:避免了进程的切换,线程会一直在一个循环中进行忙等待,直到锁被释放为止。
缺点:如果获取锁的线程长时间不释放锁,其他线程会一直忙等待消耗CPU资源。
适用场景:临界区代码执行时间短,并发冲突的概率较低,线程在获取到锁之间的等待时间较短。
pthread_spin_init : 自旋锁初始化 pthread_spin_lock : 自旋锁加锁 pthread_spin_trylock : 尝试加锁 pthread_spin_unlock : 解锁 pthread_spin_destroy : 自旋锁销毁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #include <unistd.h> #include <string.h> #include <iostream> #include <pthread.h> using std::cout;using std::cerr;using std::endl;int count = 10000 ;pthread_spinlock_t spin; void *thread_proc1 (void * arg) { while (1 ){ pthread_spin_lock (&spin); if (count>0 ){ count--; cout<<"t1 :" <<count<<endl; } pthread_spin_unlock (&spin); } return NULL ; }void *thread_proc2 (void * arg) { while (1 ){ pthread_spin_lock (&spin); sleep (1 ); if (count>0 ){ count--; cout<<"t2 :" <<count<<endl; } pthread_spin_unlock (&spin); } return NULL ; }int main () { pthread_t t1,t2; pthread_spin_init (&spin,0 ); pthread_create (&t1,NULL ,thread_proc1,NULL ); pthread_create (&t2,NULL ,thread_proc2,NULL ); pthread_join (t1,NULL ); pthread_join (t2,NULL ); pthread_spin_destroy (&spin); return 0 ; }
看到线程的CPU使用率非常高,自旋锁一直处于自旋的状态。相比于mutex来说,非常高,所以需要根据同步类型使用锁。
信号量 信号量维护了一个内部计数器,用于表示可用资源的数量。
int sem_init(sem_t *sem, int pshared, unsigned int value);:初始化无名信号量,用于线程通信。 int sem_destroy(sem_t *sem); 销毁信号量 int sem_wait(sem_t *sem); 信号量等待,如果信号量<0会阻塞。 int sem_trywait(sem_t *sem);信号量尝试等待,如果信号量<0不会阻塞。 int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); 信号量超时等待。 int sem_post(sem_t *sem);信号量增加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <unistd.h> #include <string.h> #include <iostream> #include <pthread.h> #include <semaphore.h> using std::cout;using std::cerr;using std::endl;int count = 10000 ;sem_t sem;void *thread_proc1 (void * arg) { while (1 ){ sleep (1 ); count++; cout<<"t1 :" <<count<<endl; sem_post (&sem); } return NULL ; }void *thread_proc2 (void * arg) { while (1 ){ sem_wait (&sem); cout<<"t2 :" <<count<<endl; } return NULL ; }int main () { pthread_t t1,t2; sem_init (&sem,0 ,1 ); pthread_create (&t1,NULL ,thread_proc1,NULL ); pthread_create (&t2,NULL ,thread_proc2,NULL ); pthread_join (t1,NULL ); pthread_join (t2,NULL ); sem_destroy (&sem); return 0 ; }
参考列表https://www.bilibili.com/video/BV14Q4y187xp/ https://subingwen.cn/linux/thread-sync/ https://www.bilibili.com/video/BV1d841117SH