Linux C++ 线程同步

2015-04-23 linux language c/cpp

互斥

C++ 11 中的互斥量保存在 <mutex> 头文件中,

  • std::mutex 基本的互斥锁,成员函数包括了 lock() unlock() try_lock()
  • std::recursive_mutex 递归锁,可防止同一线程中多次调用发生死锁,也就是允许多次调用。
  • std::time_mutex 带有定时器,新增 try_lock_for() try_lock_until() 接口用于定时。
  • std::recursive_timed_mutex 定时递归。

相比最基础的互斥量,同时增加了递归、定时互斥量,

RAII

Resource Acquisition Is Initialization, RAII 由 C++ 之父 Bjarne Stroustrup 提出,也就是资源获取即初始化。在 C++ 中,实际上会利用栈的能力,对于锁来说,在初始化的时候获取锁,然后在退出时类似调用析构释放锁,从而可以降低发生死锁的风险。

C++11 提供了两种:A) std::lock_guard 简单使用;B) std::unique_lock 更高级的方法。

lock_guard

通过一局部变量,当函数退出的时候,变量会同时被销毁,也就是在销毁的时候同时会将锁释放。

#include <mutex>
#include <thread>
#include <iostream>
#include <stdexcept>

std::mutex mtx;

void dump_thread(void)
{
        try {
                // local lock_guard to guarante unlocking on exception.
                std::lock_guard<std::mutex> lock(mtx);
                throw(std::logic_error("some error"));
        } catch (std::logic_error &) {
                std::cout << "exception" << std::endl;
        }
}

int main(void)
{
        std::thread thd(dump_thread);
        thd.join();

        return 0;
}

unique_lock

通过互斥锁将原有的并发操作修改成了串行操作,为了保证正确性的前提下,同时提高并发,需要尽量降低加锁的粒度,而 lock_guard 只能在析构的时候解锁,不够灵活。

unique_lock 提供了更丰富的接口,但因为要维护内部的状态,如果在析构中锁已经释放,那么就不会再调用解锁,所以相比来说性能会有所损耗。

#include <mutex>
#include <thread>
#include <iostream>

std::mutex mtx;

void dump_thread(void)
{
        std::unique_lock<std::mutex> lock(mtx, std::defer_lock);

        std::cout << "Before lock it" << std::endl;
        lock.lock();
        std::cout << "Step #1" << std::endl;
        lock.unlock();

        std::cout << "Step #2" << std::endl;

        lock.lock();
        std::cout << "Step #3" << std::endl;
        lock.unlock();
}

int main(void)
{
        std::thread thd(dump_thread);
        thd.join();

        return 0;
}

如上的示例中,如果要使用 unique_lock 则需要有两个代码块完成。通过 std::defer_lock 声明在初始化的时候不加锁。

条件变量

其中 condition_variable 对象使用 unique_lock() 来锁住当前线程,并通过 wait() 函数阻塞等待,直到另外一个线程通过相同的变量唤醒改线程。

注意,默认 condition_variable 会搭配 std::unique_lock<std::mutex> 使用,如果要使用其它的 lockable 类型 (例如 time_mutex ),可以使用 condition_variable_any 类。

wait

在 C++ 11 中,提供了两个 wait() 函数接口,都会导致当前线程阻塞直到被唤醒,为了防止虚假唤醒发生,同时可以选择一个判断条件。

所谓的虚假唤醒 (Spurious Wakeup) 是指即使没有线程调用唤醒函数,原阻塞等待的函数也可能返回,也就是线程被唤醒了但条件不满足,此时如果继续执行就会导致错误。

在唤醒时是唤醒一个线程还是多个线程没有明确的要求,而且在多核、接收到信号时,都有可能会发生,所以一般会在唤醒之后再次检查条件是否满足。

两个函数的接口声明如下。

void wait(std::unique_lock<std::mutex> &lock);

template<class Predicate>
void wait(std::unique_lock<std::mutex> &lock, Predicate pred);

其中 Predicate 就是用来判断是否满足条件,防止虚假唤醒。

使用

与其它的互斥和条件变量的配合相似,在 wait() 函数中会自动调用 unlock() 释放锁,使其它线程可以继续获取锁;当前线程被唤醒的同时会对互斥量加锁,从而保证互斥量和条件变量的原子性。

#include <queue>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>

#define QUEUE_MAX_SIZE   100

bool quit_now = false;

std::queue<int> queue;
std::mutex queue_lock;
std::condition_variable queue_cond;

void consumer(void)
{
        while (true) {
                std::unique_lock<std::mutex> lock(queue_lock);
                queue_cond.wait(lock, []{return quit_now || queue.size() > 0;});
                if (quit_now && queue.size() <= 0) // consume all data.
                //if (quit_now) // leave now.
                        break;
                std::cout << "Consumer " << queue.front() << std::endl;
                queue.pop();
        }
        std::cout << "Consumer quit." << std::endl;
}

void producer(void)
{
        int count = 0;

        while (count < 5) {
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
                std::unique_lock<std::mutex> lock(queue_lock);
                if (queue.size() >= QUEUE_MAX_SIZE) {
                        std::cout << "Queue is full now, try later." << std::endl;
                        continue;
                }

                std::cout << "Producer " << count << std::endl;
                queue.push(count++);

                queue_cond.notify_all();
        }
        std::cout << "Producer quit." << std::endl;
        quit_now = true;
        queue_cond.notify_all();
}

int main(void)
{
        std::thread consumers[2], producers[1];

        for (int i = 0; i < sizeof(consumers)/sizeof(consumers[0]); ++i)
                consumers[i] = std::thread(consumer);
        for (int i = 0; i < sizeof(producers)/sizeof(producers[0]); ++i)
                producers[i] = std::thread(producer);

        for (int i = 0; i < sizeof(producers)/sizeof(producers[0]); ++i)
                producers[i].join();
        for (int i = 0; i < sizeof(consumers)/sizeof(consumers[0]); ++i)
                consumers[i].join();

        return 0;
}

另外,还有 wait_for() 等待一段时间后超时,或者通过 wait_until() 等待到某个时间点。在唤醒时,可以通过 notify_one() 以及 notify_all() 分别唤醒一个或者多个线程。

同时还提供了 notify_all_at_thread_exit() 方便在线程退出的时候进行通知,不过 libstdc++ 要在 GCC 5.0 之后才支持。

Promise Future

异步编程的关键是如何进行通讯,一般有两种方式:A) 能够知道任务有没有完成,例如信号量、条件变量等;B) 在任务完成后执行一段特定的逻辑,也就是回调函数。

对于回调函数,如果一个任务分成多个异步阶段完成,那么需要在每个阶段的回调函数中加入下阶段的代码,这就会导致多次循环。

Future 和 Promise 源于函数式语言,其目的是分离一个值和产生值的方法,从而简化异步代码的处理。两者相互协作完成,其中 Promise 用来承诺某段时间后可以获取对象,然后再通过 Future 来获取这个结果。

Future

C++ 中提供的 std::future 简单来说是提供了一种访问异步操作结果的机制。

对于异步操作一般很难马上获取操作结果,可以同步等待或者通过 future 异步获取结果,通常有三种状态:A) deferred 未开始;B) ready 已完成;C) timeout 超时。

获取 future 结果有三种方式:A) get() 等待异步操作结束并返回结果;B) wait() 只等待异步操作完成,没有返回值;C) wait_for() 超时等待返回结果。

可调用对象

在 C++11 之前,可调用对象包括了函数、函数指针以及重载 operator() 运算符的对象;而在 C++11 之后,新增了 lambda 表达式以及 bind() 返回值,同时,引入了新的 function 类型,用来统一可调用对象。

bind() 函数可以把已有的变量绑定到指定函数的参数,从而产生一个新的函数。

如下是一个使用可调用对象的示例。

#include <string>
#include <iostream>
#include <functional>

class Hey {
public:
        void operator() (const std::string &msg) {
                std::cout << "Hey " << msg << "!" << std::endl;
        }
};

void Hi(const std::string &msg)
{
        std::cout << "Hi " << msg << "!" << std::endl;
}

void Message(const std::string &pre, const std::string &msg)
{
        std::cout << pre << msg << "!" << std::endl;
}

int main(void)
{
        Hey hey;
        void (*Hello)(const std::string &);

        /* before C++11 */
        Hi("Andy");                // function
        Hello = Hi, Hello("Andy"); // function pointer
        hey("Andy");               // override operator()

        /* after C++11 */
        std::function<void(const std::string &)> callit;

        callit = Hi;
        callit = hey;
        callit = [](const std::string &msg) {
                std::cout << "Hi " << msg << "!" << std::endl;
        };
        callit = std::bind(Message, "Hello ", std::placeholders::_1);
        callit("Bruce");

        return 0;
}

ref cref

在 C++ 语法中,已经存在了引用,那么为什么在 C++11 中还要引入 std::ref 呢?

其实,主要是考虑到函数式编程 (例如 std::bind() ) ,默认是对参数的直接拷贝而非引用,通过 std::refstd::cref 可以分别表示引用以及 const 引用。

#include <iostream>
#include <functional>

void foobar(int &a, int &b, const int &c)
{
        std::cout << "foobar " << a << ", " << b << ", " << c << std::endl;
        ++a; // increments the copy of a stored in the function object
        ++b; // increments the main()'s variable.
        // ++c; // compile error
}

int main(void)
{
        int a = 1, b = 2, c = 3;
        std::function<void()> func = std::bind(foobar, a, std::ref(b), std::cref(c));
        func();
        std::cout << " After " << a << ", " << b << ", " << c << std::endl;

        return 0;
}

其输出的结果如下。

foobar 1, 2, 3
 After 1, 3, 3

也就是说,只有变量 b 在被调用的函数中被修改了。