程序员的自我修养『二』:线程同步

使用多线程必须注意一个问题,如果有多个线程同时运行,而且它们试图访问相同的资源,就会遇到各种问题。如请求在代码在阅读时写入其他函数,即一个线程要读,一个线程要写,读线程占据着资源,写线程得不到资源,自然就会与理想的结果相违背。 本文更侧重理论解读,某些场景下,这些概念可同样用到进程中。

没有锁的保护

多线程中,变量都由所有线程共享。所以,任何一个变量都可以被任何一个线程修改。如修改一个变量需要多条语句,在执行这几条语句时,线程可能中断,很可能把内容给改乱了。如下所示的代码,创建一个为0的变量,创建两个线程,每个线程对变量执行+1的操作,得到的最后结果不一定是2。 t1, t2 为线程, x1, x2 为各自线程的临时变量:

// 初始值 g = 0
t1: x1 = g + 1   // x1 = 0 + 1
t2: x2 = g + 1   // x2 = 0 + 1
t1: g = x1       // g = x1 = 1
t2: g = x2       // g = x2 = 1
// 最终结果 g = 1 而不是 2

而执行顺序则完全取决于操作系统的调度方案和CPU的核心是否忙碌。如:操作系统是否允许任务长时间占用CPU,CPU在不忙碌的情况下是否会进行线程切换。如果一旦发生线程切换,且没有对临界资源添加保护锁,很容易写出危险的程序。

示例代码

实现一个上面所描述情况的代码:

#include <stdio.h>
#include <pthread.h>
#define NUM_THREADS 2000
int g = 0;
// 线程的运行函数
void* count(void* args){
g++;
}
int main(){
// 定义线程的 id 变量,多个变量使用数组
pthread_t tids[NUM_THREADS];
for (int j = 0; j < 10; j++){
for(int i = 0; i < NUM_THREADS; ++i)
{
//参数依次是:创建的线程id,线程参数,调用的函数,传入的函数参数
int ret = pthread_create(&tids[i], NULL, count, NULL);
if (ret != 0){
printf("pthread_create error: error_code = %d", ret);
}
}
for (int i = 0; i < NUM_THREADS; i++)
{
pthread_join(tids[i], NULL);
}
printf("g = %d\n", g);
g = 0;
}
return 0;
}

危险的程序自然得不到准确的结果:

临界区

临界资源:一个只允许一个进程使用的资源,也称互斥资源,独占资源,共享变量,如打印机,和上面程序中的变量 g 。涉及临界资源的代码段称为临界区。临界区是代码片段,包含了对临界资源的访问,每个进程可以有一个或多个临界区,临界区的设置由程序员设置。

为保证结果正确,需要各个进程互斥进入具有相同临界资源的临界区,实现对临界资源的互斥访问。有以下规则:

  • 互斥准则:某个进程在临界区执行,其他所有进程被排斥在临界区外。其他进程不能进入相同临界资源的临界区。所以只有相同临界资源的临界区需要互斥。
  • 有空让进:临界区无进程执行时,不能无限期延长下一个进入临界区进程的等待时间。离开临界区的进程要开放临界区,让其他进程加入。
  • 有限等待:每个进程进入临界区的等待时间有限,不能无限等待。临界区资源尽可能小,否则一个进程霸占资源,其他进程等待时间会很长。

按照上述原则,得到访问临界区的方法:

  • 进入区,实现互斥准则,保证一个进程进入
  • 临界区,有限等待准则,不能过大
  • 退出区,有空让进准则,放行其他进程的进入

互斥量

互斥量用于线程互斥访问临界资源,允许多个线程安全地共享一个关联的软件或者硬件资源。当一个线程想使用共享资源时,它必须先通过获取互斥量以获得专有的访问权限,成功加锁才能操作,操作结束解锁。如果该互斥量已被另一个线程锁定,请求线程可以等待该互斥量被解锁。

所以互斥信号量的取值范围只有0和1。在同一进程内使用,使用步骤为:

  • 初始化
  • 临界区前wait操作
  • 临界区后signal操作

因此同一时刻,只能有一个线程持有该锁。因为资源是共享的,线程间也还是竞争的,但通过锁就将资源的访问变成互斥操作,线程不能同时操作数据。虽然降低了并发性,但能保证结果正确。

所以在锁定互斥量后,线程可以长时间地安全地使用相关联的资源。但是单个线程持有互斥量时间应尽可能的短,避免其他线程一直处于等待状态。当线程不再需要使用资源时,它必须将互斥量解锁,使得其它线程可以使用该资源。

#include <stdio.h>
#include <string.h>
#include <pthread.h>
// 初始化线程数量
#define NUM_THREADS 200
pthread_t tid[NUM_THREADS];
// 要计算的量
int counter = 0;
// 互斥量
pthread_mutex_t lock;
// 多个线程要执行的代码
void *thread_func(void *arg)
{
pthread_mutex_lock(&lock);
counter += 1;
pthread_mutex_unlock(&lock);
}
int main()
{
int i = 0;
int err;
// 初始换互斥量
if (pthread_mutex_init(&lock, NULL) != 0)
{
printf("\n mutex init failed\n");
return 1;
}
// 10次循环执行,多次执行 观察结果
for (int j = 0; j < 10; j++)
{
// 创建线程 分配任务
while (i < NUM_THREADS)
{
err = pthread_create(&(tid[i]), NULL, &thread_func, NULL);
if (err != 0)
printf("\ncan't create thread :[%s]", strerror(err));
i++;
}
// 等待线程执行完毕
i = 0;
while (i < NUM_THREADS)
{
pthread_join(tid[i], NULL);
i++;
}
// 打印变量
printf("%d\n", counter);
// 变量清空
counter = 0;
i = 0;
}
// 销毁互斥量
pthread_mutex_destroy(&lock);
return 0;
}

信号量

信号量用于线程的同步,如经典的生产者消费者。信号量是一个同步对象,用于保持在0至指定最大值(初始化指定)之间的一个计数值,同样只能被两个标准的原语 waitsignal 访问。使用步骤为:

  1. 进入关键代码前,进程必须获取一个信号量,否则不能运行
  2. 执行完关键代码,释放信号量
  3. 信号量有数值,正值表示空闲,负值表示忙碌。

只能通过两个不可分割的原子操作来访问信号量,一个是申请操作 wait ,一个是释放操作 signal

wait
signal

假设信号量的初始值为$S$:

  • $S>0$:有S个资源可用
  • $S=0$:无资源可用
  • $S<0$:有$|S|$个进程在等待

有m个进程共享同一临界资源,若使用信号量机制实现对这一临界资源的互斥访问,则信号量的变化范围是 [-(m-1),1]

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>   //导入 信号量 的包
#define NUM_THREADS 200
pthread_t tid[NUM_THREADS];
int count = 0;
// 声明信号量
// 其中 sem_t 可视作 unsigned int
sem_t count_sem;
// 多线程要执行的函数
void *thread_func(void *arg)
{
sem_wait(&count_sem);
count++;
sem_post(&count_sem);
}
int main(int argc, char *argv[])
{
// 初始换信号量
if (sem_init(&count_sem, 0, 1) == -1)
{
printf("sem_init: failed\n");
}
int i = 0;
int err;
// 10次执行 多次执行观察结果
for (int j = 0; j < 10; j++){
// 创建多线程的任务
while(i < NUM_THREADS)
{
err = pthread_create(&(tid[i]), NULL, &thread_func, NULL);
if (err != 0)
printf("\ncan't create thread :[%s]", strerror(err));
i++;
}
// 等待线程释放
i = 0;
while (i < NUM_THREADS)
{
pthread_join(tid[i], NULL);
i++;
}
printf("%d\n", count);
count = 0;
i = 0;
}
// 销毁信号量
sem_destroy(&count_sem);
return 0;
}

信号量与互斥量的区别

虽然互斥量和信号量在一定程度上可以互相替代,比如可以把值最大为1的信号量当互斥量用,也可以用互斥量+计数器当信号量。

但是对于设计理念上还是有不同的,互斥量管理的是资源的使用权,偏向锁的概念;而信号量的绝对值表示资源的数量,有那么一点微妙的小区别。

打个比方,在早餐餐厅,大家要喝咖啡。

  • 如果用互斥量的方式,同时只有一个人可以使用咖啡机,他获得了咖啡机的使用权后,开始做咖啡,其他人只能在旁边等着,直到他做好咖啡后,另外一个人才能获得咖啡机的使用权。
  • 如果用信号量的模式,服务员获取咖啡机的锁,开始做咖啡。把咖啡做好放到柜台上,谁想喝咖啡就拿走一杯,服务员会不断做咖啡,如果咖啡杯被拿光了,想喝咖啡的人就排队等着。互斥量管理的是咖啡机的使用权,而信号量管理的是做好的咖啡数量。

如果信号量只有二进制的0或1,称为二进制信号量。在linux系统中,二进制信号量又称互斥锁。所以互斥锁和信号量的使用方法较为相似。

条件变量

以生产者消费者模型为例说明条件变量的使用条件。为节省开销,希望生产者制造出 100 个产品后才通知消费者。如果直接使用 mutex 互斥锁,需要在某个线程上不断地轮询:100 个产品是否生产完毕。相当于进行大量无效的询问,才能知道条件是否已经满足,并且每次询问均需要加锁和释放锁,这无疑会带来额外的开销。

而条件变量则高效地解决了这个问题。使用条件变量的情况下,我们可以直接等待某个条件的发生,而不需要主动轮询。条件变量是一种线程同步机制。核心思想是:一个线程等待『条件变量的条件成立』而挂起;另一个线程使『条件成立』。为了防止竞争,条件的检测是在互斥锁的保护下进行的,线程在改变条件状态前先要锁住互斥量。如果一个条件为假,则一个线程自动阻塞,该线程处于等待状态,并释放相关变量的互斥锁。如果另一个线程改变了条件,它将信号发送给关联的条件变量,唤醒一个或多个处于等待中的线程,使其重新获得互斥锁,重新评价条件。

但由于系统实现,会存在虚假唤醒等情况。即:线程并没有发出唤醒信号,处于等待中的线程仍然会自己醒来,这是一种能保证执行效率的方法。假设此时有10个线程处于等待中,在收到一个唤醒信号后,操作系统尝试去唤醒所有的线程,这会打破发送信号与唤醒之间一对一的关系。所以此时只能唤醒一个线程,而其余九个线程处于等待阶段。为了更灵活的处理这种情况,所以无论条件是否满足,操作系统允许等待中的线程自己醒来,称为虚假唤醒。

为了避免虚假唤醒对条件带来的影响,条件判断需要使用 while 而不是 if 。这是由于存在虚假唤醒等情况,导致唤醒后发现条件依旧不成立。因此需要使用 while 语句来循环地进行等待,直到条件成立为止。根据原理解析中生产者与消费者的模型,创建100个线程,且每当 count \% 20 == 0 时叫醒等待中的线程,防止过度轮循。试验结果如下图所示。

在上图中,可以观察到,在 count 取值为20,40,60,80时,由于叫醒速度过快,等待线程没有及时获取 mutex 锁来响应,因此等待线程中没有任何输出。而当等待线程因为虚假唤醒判断条件是否满足时,因为99小于100,不满足条件,所以等待线程继续等待。当最后 count 的取值为100时,等待线程被唤醒并继续向后执行,打印 [thread main] wake - cond was signalled. 语句。打印出最后的输出语句来判断 count 的值是否正确,结果 [thread main] count == 100 so everyone is count 显示答案正确。

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
// 创建 100 个线程
int NUMTHREADS = 100;
int count = 0;
// 互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 信号量满足条件时执行的任务
void *ThreadEntry()
{
// 获取锁
pthread_mutex_lock(&mutex);
count++;
// 叫 5 次,每个20回叫一次
if (count % 20 == 0 && count != 0){
printf("count is now %d. Signalling cond.\n", count);
// 每次完成加法后发送信号
pthread_cond_broadcast(&cond);
}
// 最后释放锁
pthread_mutex_unlock(&mutex);
}
int main(int argc, char **argv)
{
pthread_t threads[NUMTHREADS];
// 初始化线程
for (int t = 0; t < NUMTHREADS; t++)
pthread_create(&threads[t], NULL, ThreadEntry, NULL);
// 获取锁
pthread_mutex_lock(&mutex);
// 相加到 100 才会满足条件
while (count != NUMTHREADS)
{
printf("[thread main] count is %d which is < %d so waiting on cond\n", count, NUMTHREADS);
pthread_cond_wait(&cond, &mutex);
}
puts("[thread main] wake - cond was signalled.");
// 在 pthread_cond_wait 返回之前,会锁住 mutex
pthread_mutex_unlock(&mutex);
// 打印最后结果
printf("[thread main] count == %d so everyone is count\n", count);
// 销毁条件变量和互斥量
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}
Just for Life.
我还没有学会写个人说明!
上一篇

连Python都不熟也能跑通AI人脸识别?“隐藏Boss”竟是它!

下一篇

每一位程序员都应该学习的优秀代码

你也可能喜欢

评论已经被关闭。

插入图片