Linux学习八(线程同步)

Linux学习八 : 线程同步

线程同步

临界资源:被多个线程共享的资源,如共享内存,共享文件,共享设备等。
临界区:在多线程程序中,访问共享内存的代码段。
线程同步是指多个线程在并发执行过程中,为了保持数据一致性,协调和合作的对数据进行访问。

  • 互斥锁:mutex:一个线程获得互斥锁,其他线程等待。
  • 条件变量:Condition Variable:线程发现条件不满足时,调用条件变量的等待操作,直到其他线程对条件进行通知。
  • 读写锁:ReadWrite Lock:读可以多线程,写只能有一个写入。
  • 自旋锁:Spin Lock:忙等待的锁机制,循环等待,直到获得锁为止。
  • 信号量:Semaphore:计数器,用于控制同时访问某个资源的线程数量。

死锁:两个或多个以上线程(或进程)互相等待对方持有的资源而无法继续执行的现象。
死锁产生的四个必要条件

  • 互斥条件:资源不能共享,只能被一个进程使用。
  • 请求和保持条件:一个进程至少占有一个资源,并等待获取其他进程占有的资源。
    • 一次申请所有资源:资源利用率低,进程饿死。
    • 提取新的请求时,释放原来占有的资源。
  • 不可剥夺条件:资源只能由进程释放,不可被剥夺。
    • 申请不到资源时,释放自己资源。
    • 申请不到资源时,抢占对方资源。
  • 循环等待条件:存在等待立案条件,每个进程都在等待下一个进程所占有的资源。
    • 按照资源编号顺序提出请求。

互斥锁

互斥锁:只有一个线程能访问被保护资源。需要初始化互斥量,加锁,解锁和销毁互斥量。

pthread_mutex_t mutex; 申请互斥锁。
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr); 初始化互斥锁
int pthread_mutex_lock(pthread_mutex_t *mutex); 上锁
int pthread_mutex_trylock(pthread_mutex_t *mutex); 尝试加锁,如果没锁,加锁成功;如果已经有锁,不会阻塞,直接返回失败。
int pthread_mutex_unlock(pthread_mutex_t *mutex);解锁,哪个线程加锁,就要哪个线程解锁。
int pthread_mutex_destroy(pthread_mutex_t *mutex); 销毁互斥锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <unistd.h>
#include <string.h>
#include <iostream>
#include <pthread.h>

using std::cout;
using std::cerr;
using std::endl;

int count = 10000;
pthread_mutex_t mutex; //声明互斥锁

void *thread_proc1(void* arg){
while(1){
pthread_mutex_lock(&mutex);
if(count>0){
count--;
cout<<"t1 :"<<count<<endl;
}
pthread_mutex_unlock(&mutex);
}
return NULL;
}

void *thread_proc2(void* arg){
while(1){
pthread_mutex_lock(&mutex);
if(count>0){
count--;
cout<<"t2 :"<<count<<endl;
}
pthread_mutex_unlock(&mutex);
}
return NULL;
}

int main()
{
pthread_t t1,t2;

pthread_mutex_init(&mutex,NULL); //初始化互斥锁
pthread_create(&t1,NULL,thread_proc1,NULL);
pthread_create(&t2,NULL,thread_proc2,NULL);

pthread_join(t1,NULL);
pthread_join(t2,NULL);

pthread_mutex_destroy(&mutex); //线程销毁后,再去释放互斥锁
return 0;
}

C++互斥锁

std::mutex:独占的互斥锁,不能递归使用
std::timed_mutex:带超时的独占互斥锁,不能递归使用
std::recursive_mutex:递归互斥锁,不带超时功能,可以使一个线程多次获取锁。
std::recursive_timed_mutex:带超时的递归互斥锁

std::mutex的常用操作:

lock()
unlock()
trylock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<iostream>
#include<thread>
#include<mutex>

int a = 0;
std::mutex mtx;
// 互斥锁的个数和共享资源的个数相等,而与线程的个数无关。
void func(){
for(int i=0;i<1000;i++){
mtx.lock();
a+=1;
mtx.unlock();
}
}

int main(){
std::thread t1(func);
std::thread t2(func);

t1.join();
t2.join();

std::cout<<a<<std::endl;
return 0;
}

std::lock_guard:C++11新增的模板类,可以简化互斥锁lock()和unlock()的写法,自动进行加锁和解锁,同时也更安全。

  • mutex需要自己加锁解锁。
  • lock_guard会自动锁定互斥量,而在退出作用域后进行析构时就会自动解锁,从而保证了互斥量的正确操作,避免忘记unlock()操作而导致线程死锁。一般在局部作用域中使用,使用完即使销毁。
  • lock_guard使用了RAII技术,就是在类构造函数中分配资源,在析构函数中释放资源,保证资源出了作用域就释放。
1
2
3
4
for(int i=0;i<1000;i++){
std::lock_guard<std::mutex> lg(mtx);
a+=1;
}

std::recursive_mutex:递归互斥锁,允许同一线程多次获得互斥锁。
std::timed_mutex:超时独占互斥锁,可以用做加超时的锁操作,mutex不支持超时锁设置。

std::unique_lock: C++ 标准库中提供的一个互斥量封装类,也可以自动进行加锁和解锁,用于在多线程程序中对互斥量进行加锁和解锁操作。它的主要特点是可以对互斥量进行更加灵活的管理,包括延迟加锁、条件变量、超时等。

lock():加锁
try_lock():尝试加锁,不阻塞
try_lock_for(const std::chrono::duration<Rep, Period>& rel_time):尝试对互斥量进行加锁操作,如果当前互斥量已经被其他线程持有,则当前线程会被阻塞,直到互斥量被成功加锁,或者超过了指定的时间。
try_lock_until(const std::chrono::time_point<Clock, Duration>& abs_time):指定时间节点,阻塞
unlock():对互斥量进行解锁操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<iostream>
#include<thread>
#include<mutex>

int a = 0;
std::timed_mutex mtx;
void func(){
for(int i=0;i<2;i++){

std::unique_lock<std::timed_mutex> lg(mtx,std::defer_lock); //不加锁,需要自己加锁
if(lg.try_lock_for(std::chrono::seconds(2))){
std::this_thread::sleep_for(std::chrono::seconds(1));
a+=1;
}
}
}

int main(){
std::thread t1(func);
std::thread t2(func);

t1.join();
t2.join();

std::cout<<a<<std::endl; //3
return 0;
}

std::call_once

在某些特定情况下,某些函数只能在多线程环境下调用一次。单例设计模式是一种常见的设计模式,用于确保某个类只能创建一个实例。由于单例实例是全局唯一的,因此在多线程环境中使用单例模式时,需要考虑线程安全的问题。

std::call_once()来保证函数在多线程环境下只能被调用一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<iostream>
#include<thread>
#include<mutex>

static std::once_flag once;

void print_once(){
std::cout<<__TIME__<<" once"<<std::endl;
}

void print_twice(){
call_once(once, print_once);
std::cout<<__TIME__<<" "<<std::endl;
}

int main(){

std::thread t1(print_twice); //线程不安全的情况下,可能会对log进行多次赋值,出现错误
std::thread t2(print_twice);
t1.join();
t2.join();
return 0;
}

条件变量

条件变量用来阻塞线程,直到某个特定条件得到满足。只使用条件变量无法实现线程的同步, 必须要配合互斥锁来使用。
当一个线程需要等待某个条件满足时,它可以通过条件变量将自己阻塞,同时释放互斥锁,使其他线程可以继续执行。条件变量提供了一个可以让多个线程间同步协作的功能。这对于生产者-消费者模型很有意义。

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 条件变量初始化
也可以用静态初始化进行:pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_destroy(pthread_cond_t *cond); 条件变量销毁。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); 等待阻塞,阻塞的过程中需要一个互斥锁参数,互斥锁让线程进入临界区,避免数据混乱。

  • 在阻塞线程时候,如果线程对互斥锁mutex上锁,那么会将这把锁打开,避免死锁。
  • 线程解除阻塞的时候,函数内部会将这个mutex互斥锁锁上,继续访问临界区。

int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime); 等待超时等待。
int pthread_cond_signal(pthread_cond_t *cond); 唤醒单个线程。
int pthread_cond_broadcast(pthread_cond_t *cond); 唤醒全部线程。

有了锁以后为什么还需要条件变量?

  • 锁和条件变量解决的不是同一个问题,锁解决的是访问临界区资源的竞争。
  • 条件变量是为了不让线程重复性的加锁解锁。比如有两个线程,线程A和线程B都在竞争资源,但是线程B在满足一定条件的时候才需要执行,如果没有条件变量,就会出现线程B也需要不断地加锁,解锁,这样给系统带来了不必要的开销,浪费了资源。如何让线程B不再不断地加锁解锁,就需要用到条件变量。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    while(true)
    {
    pthread_mutex_lock(&mutex);
    iCount++;
    pthread_mutex_unlock(&mutex);
    }


    //thread 2:
    while(true)
    {
    pthread_mutex_lock(&mutex);
    if(iCount >= 100)
    {
    iCount = 0;
    }
    pthread_mutex_unlock(&mutex);

    }

让线程B不再不断地加锁解锁,用到条件变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//thread1 :
while(true)
{
pthread_mutex_lock(&mutex);
iCount++;
pthread_mutex_unlock(&mutex);

pthread_mutex_lock(&mutex);
if(iCount >= 100)
{
pthread_cond_signal(&cond);
}
pthread_mutex_unlock(&mutex);
}

//thread2:
while(1)
{
pthread_mutex_lock(&mutex);
while(iCount < 100)
{
pthread_cond_wait(&cond, &mutex);
}
printf("iCount >= 100\r\n");
iCount = 0;
pthread_mutex_unlock(&mutex);
}

C++条件变量

C++中的条件变量有两种:

  • condition_variable:需要配合std::unique_lock< std::mutex>进行wait操作,也就是阻塞线程的操作。
  • condition_variable_any:可以和任意带有lock()、unlock()语义的mutex搭配使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<iostream>
#include<thread>
#include<mutex>
#include<string>
#include<condition_variable>
#include<queue>

std::queue<int> g_queue;
std::condition_variable g_cv;
std::mutex mtx;

void Producer(){
for(int i=0;i<10;i++){
std::unique_lock<std::mutex> lock(mtx);
g_queue.push(i);
g_cv.notify_one();
std::cout<<"task: "<<i<<std::endl;
std::this_thread::sleep_for(std::chrono::microseconds(10));

}

}

void Customer(){
while(1){
std::unique_lock<std::mutex> lock(mtx);
g_cv.wait(lock, [](){
return !g_queue.empty();
});

int value = g_queue.front();
g_queue.pop();
std::cout<<"Customer "<<value<<std::endl;

}

}

int main(){
std::thread t1(Producer); //线程不安全的情况下,可能会对log进行多次赋值,出现错误
std::thread t2(Customer);
t1.join();
t2.join();
return 0;
}

读写锁

读写锁是一种并发控制机制,允许多个线程同时读取共享资源,但只允许一个线程进行写操作。

  • 读操作之间不会互斥。
  • 写操作之间会互斥。

读写锁一般分为两种类型:读锁和写锁。

  • 请求读锁时,如果没有其他线程持有写锁,则可以读,否则需要等待写锁。
  • 获取写锁时,如果没有其他线程在持有读锁或写锁,可以写,否则等待写锁和读锁都释放。

读写锁的优点:

  1. 多个线程可以同时读共享资源,提高了并发性能。
  2. 写操作独占资源,保证了数据的完整性和一致性。

pthread_rwlock_init :读写锁初始化
pthread_rwlock_rdlock :读加锁
pthread_rwlock_tryrdlock :尝试读加锁
pthread_rwlock_timerdlock :超时读加锁
pthread_rwlock_wrlock :写加锁
pthread_rwlock_trywrlock :尝试写加锁
pthread_rwlock_timewrlock :超时写加锁
pthread_rwlock_unlock :解锁(读写共用)
pthread_rwlock_destroy :读写锁销毁
还有一些属性操作的函数:比如读优先/写优先。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <unistd.h>
#include <string.h>
#include <iostream>
#include <pthread.h>

using std::cout;
using std::cerr;
using std::endl;

int count = 10;
pthread_rwlock_t rwlock; //读写锁声明

void *thread_proc1(void* arg){
while(1){
// 读锁
pthread_rwlock_rdlock(&rwlock);
cout<<"t1 :"<<count<<endl;
pthread_rwlock_unlock(&rwlock);
}
return NULL;
}

void *thread_proc2(void* arg){
while(1){
// 写锁
pthread_rwlock_wrlock(&rwlock);
count++;
cout<<"t2 :"<<count<<endl;
pthread_rwlock_unlock(&rwlock);
}
return NULL;
}

int main()
{
pthread_rwlock_init(&rwlock,NULL); // 读写锁初始化。

pthread_t t1,t2;
pthread_create(&t1,NULL,thread_proc1,NULL);
pthread_create(&t2,NULL,thread_proc2,NULL);

pthread_join(t1,NULL);
pthread_join(t2,NULL);

pthread_rwlock_destroy(&rwlock);
return 0;
}

自旋锁

自旋锁是一种忙等待的锁,当线程尝试获取锁时,如果发现锁被其他线程持有,它会一直不断地进行忙等待,直到获取锁为止。

  • 优点:避免了进程的切换,线程会一直在一个循环中进行忙等待,直到锁被释放为止。
  • 缺点:如果获取锁的线程长时间不释放锁,其他线程会一直忙等待消耗CPU资源。

适用场景:临界区代码执行时间短,并发冲突的概率较低,线程在获取到锁之间的等待时间较短。

pthread_spin_init : 自旋锁初始化
pthread_spin_lock : 自旋锁加锁
pthread_spin_trylock : 尝试加锁
pthread_spin_unlock : 解锁
pthread_spin_destroy : 自旋锁销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <unistd.h>
#include <string.h>
#include <iostream>
#include <pthread.h>

using std::cout;
using std::cerr;
using std::endl;

int count = 10000;
pthread_spinlock_t spin; //声明自旋锁

void *thread_proc1(void* arg){
while(1){
pthread_spin_lock(&spin);
if(count>0){
count--;
cout<<"t1 :"<<count<<endl;
}
pthread_spin_unlock(&spin);
}
return NULL;
}

void *thread_proc2(void* arg){
while(1){
pthread_spin_lock(&spin);
sleep(1);
if(count>0){
count--;
cout<<"t2 :"<<count<<endl;
}
pthread_spin_unlock(&spin);
}
return NULL;
}

int main()
{
pthread_t t1,t2;

pthread_spin_init(&spin,0); //初始化自旋锁
pthread_create(&t1,NULL,thread_proc1,NULL);
pthread_create(&t2,NULL,thread_proc2,NULL);

pthread_join(t1,NULL);
pthread_join(t2,NULL);

pthread_spin_destroy(&spin); //线程销毁后,再去释放自旋锁
return 0;
}

看到线程的CPU使用率非常高,自旋锁一直处于自旋的状态。相比于mutex来说,非常高,所以需要根据同步类型使用锁。
在这里插入图片描述

信号量

信号量维护了一个内部计数器,用于表示可用资源的数量。

int sem_init(sem_t *sem, int pshared, unsigned int value);:初始化无名信号量,用于线程通信。
int sem_destroy(sem_t *sem); 销毁信号量
int sem_wait(sem_t *sem); 信号量等待,如果信号量<0会阻塞。
int sem_trywait(sem_t *sem);信号量尝试等待,如果信号量<0不会阻塞。
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); 信号量超时等待。
int sem_post(sem_t *sem);信号量增加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <unistd.h>
#include <string.h>
#include <iostream>
#include <pthread.h>
#include <semaphore.h>

using std::cout;
using std::cerr;
using std::endl;

int count = 10000;
sem_t sem;

void *thread_proc1(void* arg){
while(1){
sleep(1);
count++;
cout<<"t1 :"<<count<<endl;
sem_post(&sem);// V操作,信号量+1
}
return NULL;
}

void *thread_proc2(void* arg){
while(1){
sem_wait(&sem);
// sem_trywait(&sem);
cout<<"t2 :"<<count<<endl;
}
return NULL;
}

int main()
{
pthread_t t1,t2;

sem_init(&sem,0,1);

pthread_create(&t1,NULL,thread_proc1,NULL);
pthread_create(&t2,NULL,thread_proc2,NULL);

pthread_join(t1,NULL);
pthread_join(t2,NULL);

sem_destroy(&sem);
return 0;
}

参考列表
https://www.bilibili.com/video/BV14Q4y187xp/
https://subingwen.cn/linux/thread-sync/
https://www.bilibili.com/video/BV1d841117SH


Linux学习八(线程同步)
https://cauccliu.github.io/2024/03/26/Linux学习八(线程同步)/
Author
Liuchang
Posted on
March 26, 2024
Licensed under