Linux 线程同步

2015-04-23 linux language

线程的三个主要同步原语:互斥锁 (mutex)、信号量 (semaphore) 和条件变量 (cond)。其中 mutex 和 sem 都是对应 futex 进行了简单的封装,在不存在冲突的情况下就不用陷入到内核中进行仲裁;而且 pthread_join 也是借助 futex 来实现的。

简单介绍下 Linux 中与线程相关的编程。

简介

Linux 线程在核内是以轻量级进程的形式存在的,拥有独立的进程表项,而所有的创建、同步、删除等操作都在核外的 pthread 库中进行。

同步 VS. 互斥

当有多个线程的时候,经常需要去同步这些线程以访问同一个数据或资源。

例如,假设有一个程序,其中一个线程用于把文件读到内存,而另一个线程用于统计文件中的字符数。当然,在把整个文件调入内存之前,统计它的计数是没有意义的。但是,由于每个操作都有自己的线程,操作系统会把两个线程当作是互不相干的任务分别执行,这样就可能在没有把整个文件装入内存时统计字数。为解决此问题,你必须使两个线程同步工作。

所谓互斥,是指散布在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。如果用对资源的访问来定义的话,互斥某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的

所谓同步,是指散步在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。如果用对资源的访问来定义的话,同步是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源。

互斥锁

----- 初始化一个互斥锁,可以通过PTHREAD_MUTEX_INITIALIZER宏进行初始化
int pthread_mutex_init(pthread_mutex_t* mutex, const thread_mutexattr_t* mutexattr);

----- 获取互斥锁,如果被锁定,当前线程处于等待状态;否则,本线程获得互斥锁并进入临界区
int pthread_mutex_lock(pthread_mutex_t* mutex);

----- lock操作不同的是,在尝试获得互斥锁失败后,不会进入阻塞状态,而是返回线程继续执行
int pthread_mutex_trylock(pthread_mutex_t* mutex);

----- 释放互斥锁,被阻塞的线程会获得互斥锁然后执行
int pthread_mutex_unlock(pthread_mutex_t* mutex);

----- 用来释放互斥锁的资源
int pthread_mutex_destroy(pthread_mutex_t* mutex);

使用方式如下。

pthread_mutex_t mutex;
pthread_mutex_init(&mutex,NULL);

void pthread1(void* arg)
{
   pthread_mutex_lock(&mutex);
   ... ...  //临界区
   pthread_mutex_unlock(&mutex);
}

void pthread2(void* arg)
{
   pthread_mutex_lock(&mutex);
   ... ...  //临界区
   pthread_mutex_unlock(&mutex);
}

读写锁

读写锁与互斥量类似,不过读写锁允许更高的并行性,适用于读的次数大于写的次数的数据结构。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。

pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr);

int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);

int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);
int pthread_destroy(pthread_rwlock_t* rwlock);

自旋锁

互斥锁或者读写锁是靠阻塞线程的方式是使其进入休眠,自旋锁是让线程不断循判断自旋锁是否已经被解锁,而不会使其休眠,适用于占用自旋锁时间比较短的场景。

条件变量

信号量只有锁和不锁两种状态,当条件变量和信号量一起使用时,允许线程以无竞争的方式等待特定的条件发生。也就是说,条件本身是由互斥量保护,线程在改变条件状态之前必须先锁住互斥量。

----- 动态初始化条件变量,也可以通过PTHREAD_COND_INITIALIZER宏进行初始化
int pthread_cond_init(pthread_cond_t* cond,const pthread_condattr_t* attr);

----- 阻塞等待条件变量为真,注意线程会被条件变量一直阻塞。
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);

----- wait类似,但是可以设置超时时间,在经历tspr时间后,即使条件变量不满足,阻塞也被解除
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex,const struct timespec* tspr);

----- 唤醒因为条件变量阻塞的线程,至少会唤醒一个等待队列上的线程
int pthread_cond_signal(pthread_cond_t* cond);

----- 唤醒等待该条件的所有线程
int pthread_cond_broadcast(pthread_cond_t* cond);

----- 销毁条件变量
int pthread_cond_destroy(pthread_cond_t* cond);

注意,如果有多个线程在等待信号的发生,那么在通过 pthread_cond_signal() 发送信号时,会根据调度策略选择等待队列中的一个线程。

示例

比较典型的应用是生产者和消费者模型。如下的示例中,两个线程共享 xy 变量,条件变量用于表示 x > y  成立。

int x, y;
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

//----- 等待x>y成立后执行
pthread_mutex_lock(&mut);
while (x <= y) {
        pthread_cond_wait(&cond, &mut);
}
/* operate on x and y */
pthread_mutex_unlock(&mut);

//----- 修改x,y,可能会导致x>y,如果是那么就触发条件变量
pthread_mutex_lock(&mut);
/* modify x and y */
if (x > y)
        pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mut);

这个处理逻辑看起来还比较简单,但是为什么在被唤醒后还要进行条件判断,也就是为什么要使用 while 循环来判断条件。实际上更多的是为了跨平台的兼容,由于对于调度策略没有明确的规定,那么不同的实现就可能会导致 “惊群效应”。

例如,多个线程在等待信号量,同时被唤醒,那么只有一个线程会进入临界区进行处理,那么此时就必须要再次条件判断。

#include <stdio.h>
#include <signal.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

#include <sys/time.h>
#include <sys/syscall.h>

#define gettid()    syscall(__NR_gettid)

#define log_info(fmt, args...)  do {                                    \
	printf("[%ld] %ld info : " fmt, gettid(), time(NULL), ## args); \
	putchar('\n');                                                  \
} while(0)
#define log_error(fmt, args...) do {                                    \
	printf("[%ld] %ld error: " fmt, gettid(), time(NULL), ## args); \
	putchar('\n');                                                  \
} while(0)

static int quit_now = 0;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t   cond = PTHREAD_COND_INITIALIZER;

void *thr1_fn(void *arg)
{
        (void) arg;
        int count = 0, rc;
        struct timeval now;
        struct timespec timeout;

        if (gettimeofday(&now, NULL) < 0) {
                log_error("gettimeofday() failed, %d:%s.", errno, strerror(errno));
                return NULL;
        }
        timeout.tv_nsec = now.tv_usec * 1000;

        while (quit_now == 0) {
#if 0
                pthread_mutex_lock(&mutex);
                while (quit_now == 0) {
                        timeout.tv_sec = time(NULL) + 1;
                        rc = pthread_cond_timedwait(&cond, &mutex, &timeout);
                        if (rc == ETIMEDOUT) {
                                pthread_mutex_unlock(&mutex);
                                log_info("timeout now.");
                                break;
                        } else if (rc != 0) {
                                log_error("condition wait failed, %d:%s.", errno, strerror(errno));
                                exit(1);
                        }
                        pthread_mutex_unlock(&mutex);
                        log_info("quit now.");
                        return NULL;
                }
                pthread_mutex_unlock(&mutex);
#else
                pthread_mutex_lock(&mutex);
                timeout.tv_sec = time(NULL) + 1;
                rc = pthread_cond_timedwait(&cond, &mutex, &timeout);
                if (rc == 0) {
                        pthread_mutex_unlock(&mutex);
                        log_info("maybe quit now.");
                        continue;
                } else {
                        if (rc != ETIMEDOUT) {
                                log_error("condition wait failed, %d:%s.", errno, strerror(errno));
                                exit(1);
                        }
                        pthread_mutex_unlock(&mutex);
                        log_info("timeout now.");
                }
#endif

                log_info("%d around, working.", ++count);
        }

        return NULL;
}

int main(void)
{
        int rc;
        pthread_t t1;

        log_info("Main thread pid %lu", gettid());

        rc = pthread_create(&t1, NULL, thr1_fn, NULL);
        if (rc != 0) {
                log_error("Create thread failed, %s.", strerror(rc));
                exit(1);
        }
        sleep(5);

        quit_now++;
        pthread_mutex_lock(&mutex);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);

        pthread_join(t1, NULL);

        return 0;
}

原理

首先重点解释下 pthread_cond_wait() 函数,该函数功能可以直接通过 man 3p pthread_cond_wait 查看;简单来说,其实现的功能是,释放 mutex 并阻塞到 cond ,更重要的是,这两步是 原子操作

在解释之前,为了防止与 NTPL 中的实现混淆,暂时去掉 pthread 头,仅为方便说明。

1. 基本条件变量

----- 如上,在等待事件时,通常会通过while()进行条件检测
while(pass == 0)
    cond_wait(...);
 
----- 当条件被改变时,通过signal()函数通知到其它线程
pass = 1;
cond_signal(...)

2. 互斥量保护

注意,如上的两个线程中都涉及到了共享变量 pass ,那么为了防止竞态条件的发生,需要通过加锁进行保护,也就是如下的内容:

mutex_lock(mtx);
while(pass == 0)
    cond_wait(...);
/* more operations on pass */
mutex_unlock(mtx);

mutex_lock(mtx);
pass = 1;
/* more operations on pass */
cond_signal(...);
mutex_unlock(mtx); // 必须要在cond_signal之后,否则同样存在竞态条件

3. 条件变量语义

实际在上述的示例中,隐含了 cond_wait() 必须要自动释放锁,也就是说如果只是等待条件变量,实际的内容如下:

mutex_lock(mtx);
while(pass == 0) {
 mutex_unlock(mtx);
 cond_just_wait(cv);
 mutex_lock(mtx);
}
mutex_unlock(mtx);

mutex_lock(mtx);
pass = 1;
cond_signal(...);
mutex_unlock(mtx);

信号量

----- 初始化信号量,pshared为0只能在当前进程的线程间共享,否则可以进程间共享,value给出了信号量的初始值
int sem_init(sem_t* sem, int pshared,unsigned int value);
----- 阻塞线程,直到信号量的值大于0,解除阻塞后同时会将sem的值减一
sem_wait(sem_t* sem);
----- 是wait的非阻塞版本
sem_trywait(sem_t* sem);
----- 增加信号量的值,会利用线程的调度策略选择一个已经被阻塞的线程
sem_post(sem_t* sem);
----- 释放信号量所占用的资源
sem_destroy(sem_t* sem);

取消点

一个线程可以通过 pthread_cancel() 向另外一个线程发送结束请求,接收此请求的线程可以通过线程的两个属性来决定是否取消,以及是同步 (延时) 取消还是异步 (立即) 取消;而且即使该函数调用成功返回,也不代表线程就结束了。

可以通过如下的两个函数设置线程的属性:

int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);

PTHREAD_CANCEL_ENABLE
  可取消,不过有两种取消方式,一种是同步取消,另外一种是异步取消。
PTHREAD_CANCEL_DISABLE
  不可取消,那么接收取消请求后,它依然继续执行。
PTHREAD_CANCEL_DEFERRED
  可取消,会在下一个取消点退出;简单来说,取消点就是程序在运行的时候检测是否收到取消请求,常见的有如下函数:
    pthread_join() pthread_cond_wait() pthread_cond_timedwait() pthread_testcancel() sem_wait() sigwait()
PTHREAD_CANCEL_ASYNCHRONOUS
  异步取消,也就是说线程在收到取消请求之后,立即取消退出。

线程退出

一般来说,POSIX 的线程终止有两种情况:A) 正常退出,线程主动调用 pthread_exit() 或者从线程函数中 return 都将使线程正常退出,这是可预见的退出方式;B) 非正常终止,是指线程在其它线程的干预下,或者由于自身运行出错 (如访问非法地址) 而退出,这种退出方式是不可预见的。

无论是那种方式,都会存在资源释放的问题,在不考虑因运行出错而退出的前提下,如何保证线程终止时能顺利的释放掉自己所占用的资源,特别是锁资源,就是一个必须考虑解决的问题。

最常见的场景是,线程为了访问临界资源而为其加锁,但在访问过程中被外界取消,如果允许异步取消,或者在打开独占锁之前的运行路径上存在取消点,则该临界资源将永远处于锁定状态得不到释放。

清理函数

默认情况下,一个线程是可取消的并且是同步取消的。对于可以取消的线程,需要考虑退出时该线程后续的资源清理,例如,线程执行到一个锁内时,已经获得锁了,这时异常退出了,那么此时就是一个死锁的问题。

POSIX 提供了如下的两个函数用于注册自动释放资源,也就是在 push 和 pop 之间的终止动作,包括 pthread_exit() 和取消点的取消。

#include <pthread.h>
void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute);

可多次调用 push 函数,将采用 FIFO 方式管理,这两个函数实际上是宏定义,定义如下:

#define pthread_cleanup_push(routine,arg)                   \
	{                                                   \
		struct _pthread_cleanup_buffer _buffer;     \
		_pthread_cleanup_push (&_buffer, (routine), (arg));

#define pthread_cleanup_pop(execute)                        \
		_pthread_cleanup_pop (&_buffer, (execute)); \
	}

上述使用了两个 { } ,导致这两个函数必须成对出现,而且必须位于程序的同一级别的代码段中才能通过编译,如下是常见的调用方式。

如果函数没有异常退出,那么可以通过 phread_cleanup_pop() 弹出这些栈里的函数,此时的参数要为 0,如果非 0 的话,弹出的同时也会执行这些函数的。例如,对于死锁问题可以做如下处理:

pthread_cleanup_push(pthread_mutex_unlock, (void *) &mutex);
pthread_mutex_lock(&mutex);
/* do some work */
pthread_mutex_unlock(&mutex);
pthread_cleanup_pop(0);

死锁实例

下面是一段在 Linux 平台下能引起线程死锁的小例子。

#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *thread1(void *arg)
{
	(void) arg;
	printf("Threads #1 enter\n");
	pthread_mutex_lock(&mutex);
	printf("Threads #1 111\n");
	pthread_cond_wait(&cond, &mutex);
	printf("Threads #1 222\n");
	pthread_mutex_unlock(&mutex);
	printf("Threads #1 quit\n");
	pthread_exit(NULL);
}

void *thread2(void *arg)
{
	(void) arg;
	printf("Threads #2 enter\n");
	sleep(10);
	printf("Threads #2 mutex\n");
	pthread_mutex_lock(&mutex);
	printf("Threads #2 111\n");
	pthread_cond_broadcast(&cond);
	printf("Threads #2 222\n");
	pthread_mutex_unlock(&mutex);
	printf("Threads #2 quit\n");
	pthread_exit(NULL);
}

int main(void)
{
	pthread_t tid1, tid2;

	pthread_create(&tid1, NULL, thread1, NULL);
	pthread_create(&tid2, NULL, thread2, NULL);
	printf("Create 2 threads\n");

	sleep(5);
	printf("Cancel thread #1\n");
	pthread_cancel(tid1);

	printf("Join threads\n");
	pthread_join(tid1, NULL);
	pthread_join(tid2, NULL);

	return 0;
}

这个实例程序仅仅是使用了条件变量和互斥量进行一个简单的线程同步:thread1 等待条件变量,thread2 启动 10s 后发送广播信号尝试唤醒 thread1 。

如果将 pthread_cancel() 注释掉,实际上程序可以按照预期执行,问题是,具体是什么原因导致的?

参考

实际上 Mutex 锁的性能很好,可以参考 Mutex锁的性能