实际上目前大部分的语言都提供了原子操作的能力,包括了 GoLang、JAVA、C/C++、Rust 等等,当然这些能力大部分都需要硬件平台的支撑。
这里简单介绍 GCC 以及 C/C++ 中提供的一些原子操作指令。
基本介绍
在多线程编程时,通常需要对一些常见的基本类型进行操作,如 int
float
等等,一般为了解决竞态条件,通常是通过 mutex、spinlock 等进行保护。
如果不进行保护,那么实际得到的值是什么?可以从如下程序进行测试。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/syscall.h>
#define INC_TO 1000000 // one million...
int global_int = 0;
pid_t gettid( void )
{
return syscall( __NR_gettid );
}
void *thread_routine(void *arg)
{
int i;
int proc_num = (int)(long)arg;
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(proc_num, &set);
if (sched_setaffinity(gettid(), sizeof(cpu_set_t), &set)) {
perror( "sched_setaffinity" );
return NULL;
}
for (i = 0; i < INC_TO; i++) {
global_int++;
}
return NULL;
}
int main(void)
{
int procs = 0;
int i;
pthread_t *thrs;
/* Getting number of CPUs */
procs = (int)sysconf(_SC_NPROCESSORS_ONLN);
if (procs < 0) {
perror("sysconf");
return -1;
}
thrs = (pthread_t *)malloc(sizeof(pthread_t) * procs);
if (thrs == NULL) {
perror( "malloc" );
return -1;
}
printf("Starting %d threads...\n", procs);
for (i = 0; i < procs; i++) {
if (pthread_create(&thrs[i], NULL, thread_routine, (void *)(long)i)) {
perror( "pthread_create" );
procs = i;
break;
}
}
for (i = 0; i < procs; i++)
pthread_join(thrs[i], NULL);
free(thrs);
printf("After doing all the math, global_int value is: %d\n", global_int);
printf("Expected value is: %d\n", INC_TO * procs);
return 0;
}
如上程序中,每个 CPU 会绑定一个线程,并对一个线程累加,不同的平台可能会有所区别。
一般输出的内容如下,当然不同的平台也可能会输出 4000000
。
$ ./atomics
Starting 4 threads...
After doing all the math, global_int value is: 2933043
Expected value is: 4000000
在编译使用 -O2
参数时会输出 4000000
,实际上这是编译器优化的效果,将原来的循环直接替换成了加 1000000 。
对于 CPU 操作,每次读写、累加都是原子操作,但是几个操作的组合将不再是原子操作。
原理
原子操作对于 CPU 来说很简单,在访问内存时,可以通过特定的指令可以锁定 Front Serial Bus, FSB 。FSB 就是 CPU 与内存通讯的总线,当锁 FSB 时,CPU 就无法访问内存,从而达到原子操作。
内核中很早就在使用原子操作了,而 gcc 在 4.1.2 才支持用户模式下的原子操作。
假设,有如下的伪代码,看看当两个线程同时操作时会发生什么问题。
decrement_atomic_value();
if (atomic_value() == 0)
fire_a_gun();
假设其执行顺序如下。
对于上述的执行顺序,实际上 line3 不会执行。
原子操作
Windows、C++ 实际上都提供了一些原子操作指令,这里简单介绍的是 GCC 4.1.2
版本之后提供的内置原子操作,可以直接对 1
2
4
8
字节的数值或指针类型,进行原子 加
减
与
或
异或
等操作。
其中接口示例如下。
//----- 比较*ptr与oldval的值,如果相等则将newval更新到*ptr并返回true
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...)
//----- 比较*ptr与oldval的值,如果相等则将newval更新到*ptr并返回操作之前*ptr的值
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...)
//----- 将value加、减、或、与、异或到*ptr上,结果更新到*ptr,并返回操作前*ptr的值
type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)
示例
注意,原子操作并没有解决竞争,只是将竞争放到了硬件层面,冲突高仍会导致低效。如下,直接利用 Linux 中的多线程,对一个全局的变量进行累加操作。
#include <stdio.h>
#include <pthread.h>
#define COUNT_PER_WORKER 5000000
#define WORKER_NUM 10
int sum = 0;
pthread_t workers[WORKER_NUM];
void *worker_func(void *arg)
{
(void) arg;
int i;
// printf("Worker thread %08lx startup.\n", pthread_self());
for (i = 0; i < COUNT_PER_WORKER; ++i) {
#ifdef USE_ATOMIC
__sync_fetch_and_add(&sum, 1);
#else
sum++;
#endif
}
return NULL;
}
int main(void)
{
int i;
for (i = 0; i < WORKER_NUM; ++i)
pthread_create(&workers[i], NULL, worker_func, NULL);
for (i = 0; i < WORKER_NUM; ++i)
pthread_join(workers[i], NULL);
printf("Sum expect %d, got %d.\n", COUNT_PER_WORKER * WORKER_NUM, sum);
return 0;
}
可以通过如下方式进行编译,其中前者没有使用原子操作,会导致最后累加的值远小于预期。
----- 使用线程操作
$ gcc -o atomic -lpthread atomic.c
$ time ./atomic
Sum expect 50000000, got 12635555.
real 0m0.240s
user 0m0.794s
sys 0m0.001s
----- 使用原子操作
$ gcc -DUSE_ATOMIC -o atomic -lpthread atomic.c
$ time ./atomic
Sum expect 50000000, got 50000000.
real 0m1.237s
user 0m4.164s
sys 0m0.010s
从上述的结果可以看出,虽然通过原子操作保证了逻辑的正确性,但同时成本也非常高,这也就是为什么架构设计如此重要。或者简单来说,原子操作并没有解决竞争,只是将竞争放到了硬件层面,冲突高仍会导致低效。