1.并发
并发,指两个或两个以上的独立活动同时发生。
计算机的并发,指在单个系统里同时执行多个独立的任务。
并发的两种方式:真正并行 vs 任务切换
四个任务在双核处理器上的任务切换,仍是将任务整齐地划分为同等大小子任务块的理想情况。实际上,许多因素会使得任务分割不均或调度不规则。
四个任务在两个核心之间的切换
并发的方式
- 多进程并发
- 多线程并发
多进程并发:将应用程序分为多个独立的进程同时运行,就像同时进行网页浏览和文字处理一样。
独立的进程可以通过进程间的通信渠道传递讯息(信号、套接字、文件、管道等等)。
缺点:
- 进程间的通信非常复杂,速度很慢,因为操作系统会对进程进行保护,以避免一个进程去修改另一个进程的数据。
- 运行多个进程的固定开销大:需要时间启动进程,操作系统需要资源来管理进程等等。
多线程并发:在单进程中运行多个线程。
线程很像轻量级的进程:每个线程相互独立运行,并且可以在不同的指令序列中运行。不过,进程中的所有线程都共享地址空间,并且能访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。
一个进程中的两个线程,正在通过共享内存进行通信。
地址空间共享,以及缺少线程间的数据保护,使得操作系统记录的工作量减小,所以使用多线程的开销远远小于多进程。不过,共享内存的灵活性是有代价的:如果多个线程访问数据,那么必须确保每个线程所访问到的数据一致,这就需要对线程通信做大量的工作。
使用并发的原因
原因有二:分离关注点(SOC)、性能
分离关注点。通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能。即使一些操作需要同时进行,依旧可以使用并发,分离不同的功能区域。
性能。两种利用并发提高性能的方式:
- 任务并行:将一个单个任务分成几部分并行运行,从而降低总运行时间
- 数据并行:一个线程执行算法的一部分,而另一个线程执行算法的另一个部分,每个线程在不同的数据块上执行相同的操作
什么时候不使用并发:
收益比不上成本。使用并发的代码在很多情况下难以理解,因此编写和维护多线程代码会产生脑力成本,而增加的复杂性也可能会引起更多的错误。
线程的资源有限。如果太多的线程同时运行,则会消耗很多操作系统资源,从而使得操作系统整体上运行得更加缓慢。不仅如此,因为每个线程都需要一个独立的堆栈,所以运行太多的线程也会耗尽进程的可用内存或地址空间。
运行越多的线程,操作系统就需要越多的上下文切换,每一次切换都需要耗费时间。所以在某些时候,增加线程实际上会降低应用的整体性能。
并发程序
C++多线程程序和其他C++程序差不多。唯一的区别在于某些函数可以并发运行,所以需要确保共享数据在并发访问时是安全的。
例子:打印“Hello World”的程序
单线程运行:
1 |
|
启动独立的线程显示信息:
1 |
|
2.线程管理
线程基本操作
启动线程
使用C++线程库启动线程,就是构造std::thread
对象:
1 | void do_some_work(); |
需要包含<thread>
头文件,std::thread
可以通过有函数操作符类型的实例进行构造:
1 | class background_task |
代码中,提供的函数对象会复制到新线程的存储空间中,函数对象的执行和调用都在线程的内存空间中进行。
Lambda表达式:
1 | std::thread my_thread([]{ |
线程启动后是要等待线程结束,还是让其自主运行。
当std::thread
对象销毁之前还没有做出决定,程序就会终止。
即便是有异常存在,也需要确保线程能够正确汇入(joined)或分离(detached)。
如果不等待线程汇入 ,就必须保证线程结束之前,访问数据的有效性。
代码2.1:函数已经返回,线程依旧访问局部变量
1 | struct func |
可能会出现新线程访问已经销毁的变量。过程如下:
主线程 | 新线程 |
---|---|
使用some_local_state构造my_func | |
开启新线程my_thread | |
启动 | |
调用func::operator() | |
将my_thread分离 | 执行func::operator();可能会在do_something中调用some_local_state的引用 |
销毁some_local_state | 持续运行 |
退出oops函数 | 持续执行func::operator();可能会在do_something中调用some_local_state的引用 –> 导致未定义行为 |
常规处理方法:
将数据复制到线程中。对于对象中包含的指针和引用需谨慎。使用访问局部变量的函数去创建线程是一个糟糕的主意。
可以通过join()函数来确保线程在主函数完成前结束。
等待线程完成
如需等待线程,需要使用**join()**。
将例2-1中的my_thread.detach()
替换为my_thread.join()
,就可以确保局部变量在线程完成后才销毁。
只能对一个线程使用一次join(),一旦使用过join(),std::thread
对象就不能再次汇入了。当对其使用joinable()时,将返回false。
特殊情况下的等待
避免应用被抛出的异常所终止。通常,在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免生命周期的问题。
代码2.2 等待线程完成
1 | struct func; // 定义在代码2.1中 |
使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),提供一个类,在析构函数中使用join()。
代码2.3 使用RAII等待线程完成
1 | class thread_guard |
后台运行线程
detach()会让线程在后台运行,这就意味着与主线程不能直接交互。
守护线程(daemon threads):分离线程,UNIX中是指没有任何显示的接口,在后台运行的线程。特点是长时间运行。
分离线程只能确定线程什么时候结束,发后即忘的任务使用分离线程。
代码2-4 使用分离线程处理文档
1 | void edit_document(std::string const& filename) |
传递参数
向可调用对象或函数传递参数很简单,只需要将这些参数作为 std::thread
构造函数的附加参数即可。
即使函数中的参数是引用的形式,拷贝操作也会执行。
1 | void f(int i, std::string const& s); |
代码创建了一个调用f(3, “hello”)的线程。函数f需要一个std::string
对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *
类型,线程的上下文完成字面值向std::string
的转化。
1 | void f(int i,std::string const& s); |
内部代码会将拷贝的参数以右值的方式进行传递,这是为了那些只支持移动的类型
1 | void update_data_for_widget(widget_id w,widget_data& data); // 1 |
因为函数期望的是一个非常量引用作为参数(而非右值),所以会在编译时出错。可以使用std::ref
将参数转换成引用的形式。因此可将线程的调用改为以下形式:
1 | std::thread t(update_data_for_widget,w,std::ref(data)); |
update_data_for_widget就会收到data的引用,而非data的拷贝副本,这样代码就能顺利的通过编译了。
如果提供的参数支持移动(move),不能拷贝。
std::unique_ptr
(C++11中的智能指针),为动态分配的对象提供内存自动管理机制(类似垃圾回收机制)。同一时间内,只允许一个std::unique_ptr
实例指向一个对象,并且当这个实例销毁时,指向的对象也将被删除。
移动构造函数(move constructor)和移动赋值操作符(move assignment operator)允许一个对象的所有权在多个std::unique_ptr
实例中传递。使用“移动”转移对象所有权后,就会留下一个空指针。
std::move
转移动态对象的所有权到线程中:
1 | void process_big_object(std::unique_ptr<big_object>); |
线程的所有权可以在多个std::thread
实例中转移,这依赖于std::thread
实例的可移动且不可复制性。
可移动性:使得开发者可以自己决定,哪个实例拥有线程实际执行的所有权。
不可复制性:在某一时间点,一个std::thread
实例只能关联一个执行线程。
转移所有权
例子,创建两个执行进程,并在std::thread
实例之间(t1,t2和t3)转移所有权:
1 | void some_function(); |
最后的移动操作,将some_function线程的所有权转移给t1,但是t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()
终止程序继续运行。
std::terminate()
不抛出异常,是noexpect函数,保证与std::thread
的析构函数的行为一致。
线程对象析构前,显式的等待线程完成或分离它,即不能通过赋新值给std::thread
对象的方式”丢弃“一个线程。
std::thread
支持移动,线程的所有权可以在函数外进行转移。
代码2.5 函数返回std::thread
对象
1 | std::thread f() |
所有权可以在函数内部传递,允许std::thread
实例作为参数进行传递:
1 | void f(std::thread t); |
为了确保线程在程序退出前完成,定义了scoped_thread类。
代码2.6 scoped_thread的用法
1 | class scoped_thread |
代码2.7 joining_thread类的实现
1 | class joining_thread |
std::thread
中对移动语义的支持,也适用于使用std::thread
的移动敏感(move-aware)容器(比如,std::vector<>
)。了解这些后,就可以量产了一些线程,并且等待它们结束,代码如下所示。
代码2.8 量产线程,等待它们结束
1 | void do_work(unsigned id); |
代码2.8中每个线程所做的工作都是独立的,并且结果会受到共享数据的影响。
如果f()有返回值,这个返回值就依赖于线程得到的结果。写入返回值之前,程序会检查使用共享数据的线程是否终止。
确定线程数量
std::thread::hardware_concurrency()
在新版C++中非常有用,返回并发线程的数量。
代码2.9将整体工作拆分成小任务,交给每个线程去做,并设置最小任务数,避免产生太多的线程,程序会在操作数量为0时抛出异常。比如,std::thread
无法启动线程,就会抛出异常。
代码2.9 并行版的std::accumulate
1 | template<typename Iterator,typename T> |
注意:
T类型的加法不满足结合律(比如,对于float型或double型,在进行加法操作时,系统很可能会做截断操作),因为对范围中元素的分组,会导致parallel_accumulate得到的结果可能与
std::accumulate
的结果不同。迭代器必须是前向迭代器。
对于results容器,需要保证T有默认构造函数。
线程标识
std::thread::id
类型,获取方式:
- 调用
std::thread
对象的成员函数get_id()
来直接获取。如果std::thread
对象没有与任何执行线程相关联,get_id()
将返回std::thread::type
默认构造值,这个值表示“无线程”。 - 当前线程中调用
std::this_thread::get_id()
(这个函数定义在<thread>
头文件中)也可以获得线程标识。
std::thread::id
对象可以自由的拷贝和对比,因为标识符可以复用。
如果两个对象的std::thread::id
相等,那就是同一个线程,或者都“无线程”。
如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有线程。
标准库提供std::hash<std::thread::id>
容器,std::thread::id
也可以作为无序容器的键值。
启动其他线程前,可以通过std::this_thread::get_id()
得到自己的线程ID。每个线程都要检查一下,其拥有的线程ID是否与初始线程的ID相同。
1 | std::thread::id master_thread; |
线程ID在容器中可作为键值.
可以使用输出流(std::cout
)来记录一个std::thread::id
对象的值。
1 | std::cout << std::this_thread::get_id(); |
3.共享数据
线程间的问题在于修改共享数据,会使不变量遭到破坏。
并行中常见错误:条件竞争(race condition)。
共享数据的问题
条件竞争
并发中的竞争条件,取决于一个以上线程的执行顺序,每个线程都抢着完成自己的任务。
当不变量遭到破坏时,才会产生条件竞争。
操作要访问两个独立的数据块,独立的指令会对数据块将进行修改,并且其中一个线程可能正在进行修改,另一个线程就对数据块进行了访问。因为出现的概率低,很难查找,也很难复现。
避免恶性条件竞争
最简单的办法:对数据结构采用某种保护机制,确保只有修改线程才能看到不变量的中间状态。
从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。
对数据结构和不变量进行修改,修改完的结构必须能完成一系列不可分割的变化,也就保证了每个不变量的状态,这就是所谓的无锁编程。
使用事务的方式去处理数据结构的更新。一些数据和读取都存储在事务日志中,然后将之前的操作进行合并,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”(software transactional memory (STM))
使用互斥量
访问共享数据前,将数据锁住,在访问结束后,再将数据解锁。线程库需要保证,当线程使用互斥量锁住共享数据时,其他的线程都必须等到之前那个线程对数据进行解锁后,才能进行访问数据。
互斥量会造成死锁,或对数据保护的太多(或太少)
互斥量
实例化std::mutex
创建互斥量实例,成员函数lock()可对互斥量上锁,unlock()为解锁。
不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用unlock()(包括异常的情况)。
std::lock_guard
:在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。
代码3.1 使用互斥量保护列表
1 |
|
C++17中添加了一个新特性,称为模板类参数推导,类似std::lock_guard
这样简单的模板类型,其模板参数列表可以省略。③和④的代码可以简化成:
1 | std::lock_guard guard(some_mutex); |
C++17中的一种加强版数据保护机制——std::scoped_lock
,所以在C++17的环境下,上面的这行代码也可以写成:
1 | std::scoped_lock guard(some_mutex); |
大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。
当其中一个成员函数返回的是保护数据的指针或引用时,也会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)保护数据,而不会被互斥锁限制。这就需要对接口谨慎设计,要确保互斥量能锁住数据访问,并且不留后门。
保护共享数据
代码3.2 无意中传递了保护数据的引用
1 | class some_data |
例子中process_data看起来没有问题,std::lock_guard
对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function
传递进去②,可以在没有锁定互斥量的情况下调用do_something()
。
函数foo()
中调用unprotected->do_something()
的代码未能被标记为互斥。
切勿将受保护数据的指针或引用传递到互斥锁作用域之外。
接口间的条件竞争
代码3.3 std::stack
容器的实现
1 | template<typename T,typename Container=std::deque<T> > |
虽然empty()和size()可能在返回时是正确的,但结果不可靠。当返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的数值就有问题了。
非共享的栈对象,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:
1 | stack<int> s; |
对于共享的栈对象,这样的调用顺序就不再安全,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
解决:改变接口的设计
先获取顶部元素(top()),然后从栈中移除(pop())。这样,在不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。当问题是堆空间不足,应用可能会释放一些内存,然后再进行尝试。
不幸的是,这样的分割却制造了本想避免的条件竞争。
选项1:传入一个引用
将变量的引用作为参数,传入pop()函数中获取“弹出值”:
1 | std::vector<int> result; |
缺点:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。
选项2:无异常抛出的拷贝构造函数或移动构造函数
一个有用的选项可以限制对线程安全栈的使用,并且能让栈安全的返回所需的值,而不抛出异常。虽然安全,但非可靠。
那些有抛出异常的拷贝构造函数,但没有移动构造函数的类型往往更多。
选项3:返回指向弹出值的指针
指针的优势是自由拷贝,并且不会产生异常,这样就能避免Cargill提到的异常问题了。
缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。
使用std::shared_ptr
,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,就不需要new和delete操作。
例:定义线程安全的堆栈
代码3.4中是一个接口没有条件竞争的堆栈类定义,它实现了选项1和选项3:重载了pop(),使用局部引用去存储弹出值,并返回std::shared_ptr<>
对象。它有一个简单的接口,只有两个函数:push()和pop();
代码3.4 线程安全的堆栈类定义(概述)
1 |
|
当栈为空时,pop()函数会抛出一个empty_stack异常,所以在empty()函数被调用后,其他部件还能正常工作。
使用std::shared_ptr
可以避免内存分配管理的问题,并避免多次使用new和delete操作。堆栈中的五个操作,现在就剩下三个:push(), pop()和empty()(这里empty()都有些多余)。
简化接口更有利于数据控制,可以保证互斥量将操作完全锁住。
代码3.5 扩充(线程安全)堆栈
1 |
|
堆栈可以拷贝——拷贝构造函数对互斥量上锁,再拷贝堆栈。构造函数体中①的拷贝使用互斥量来确保复制结果的正确性,这样的方式比成员初始化列表好。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。
死锁
线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。因为他们都在等待对方释放互斥量,没有线程能工作。
避免死锁:让两个互斥量以相同顺序上锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。
选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
std::lock
——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。
代码3.6 交换操作中使用std::lock()
和std::lock_guard
1 | // 这里的std::lock()需要包含<mutex>头文件 |
std::adopt_lock
参数除了表示std::lock_guard
可获取锁之外,还将锁交由std::lock_guard
管理,就不需要std::lock_guard
再去构建新的锁了。
std::lock
要么将两个锁都锁住,要不一个都不锁。
C++17中,std::scoped_lock<>
是一种新的RAII模板类型,与 std::lock_guard<>
的功能相同,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。
互斥量支持构造时上锁,与std::lock
的用法相同,解锁在析构中进行。代码3.6中swap()操作可以重写如下:
1 | void swap(X& lhs, X& rhs) |
C++17新特性:自动推导模板参数。隐式参数模板类型推导机制。
std::scoped_lock
的好处在于,可以将所有std::lock
替换掉,从而减少错误的发生。
避免死锁
无锁的情况下,仅需要两个线程std::thread
对象互相调用join()就能产生死锁。这种情况下,没有线程可以继续运行,因为他们正在互相等待。
避免嵌套锁
线程获得一个锁时,就别再去获取第二个。每个线程只持有一个锁,就不会产生死锁。
当需要获取多个锁,使用std::lock
来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用外部代码
外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁。
使用固定顺序获取锁
当硬性要求获取两个或两个以上的锁,并且不能使用std::lock
单独操作来获取它们时,最好在每个线程上,用固定的顺序获取它们(锁)。
例如,不同线程以相反顺序访问列表所造成的死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并试图获取节点A上的锁——经典的死锁场景。
线程1 | 线程2 |
---|---|
锁住主入口的互斥量 | |
读取头结点指针 | |
锁住头结点互斥量 | |
解锁主入口互斥量 | |
锁住主入口互斥量 | |
读取head->next指针 | 锁住尾结点互斥量 |
锁住next结点的互斥量 | 读取tail->prev指针 |
读取next->next指针 | 解锁尾结点的互斥量 |
… | … |
锁住A结点的互斥量 | 锁住C结点的互斥量 |
读取A->next指针(也就是B结点) | 读取C->next指针(也就是B结点) |
锁住B结点互斥量 | |
阻塞,尝试锁住B结点的互斥量 | 解锁C结点互斥量 |
读取B->prev指针(也就是A结点) | |
阻塞,尝试锁住A结点的互斥量 | |
死锁! |
当A、C节点中间的B节点删除时,有线程在已获取A和C上的锁后,还要获取B节点上的锁时,就可能发生死锁。
解决:定义遍历的顺序,一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁,不允许反向遍历链表。
使用层次锁结构
当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。可以通过每个互斥量对应的层数,以及每个线程使用的互斥量,在运行时检查锁定操作是否可以进行。
层级互斥量不可能死锁,因为互斥量本身会严格遵循约定进行上锁。当多个互斥量在是在同一级上时,不能同时持有多个锁。
代码3.7 使用层次锁来避免死锁
1 | hierarchical_mutex high_level_mutex(10000); // 1 3个hierarchical_mutex实例,逐渐递减的层级构造 |
代码3.8 简单的层级互斥量实现
1 | class hierarchical_mutex |
超越锁的延伸扩展
死锁不仅仅会发生在锁之间,也会发生在同步构造中(可能会产生一个等待循环)
如果去等待一个线程结束,应该确定这个线程的层级,这样一个线程只需要等待比其层级低的线程结束即可。
std::lock()
和std::lock_guard
可组成简单的锁,并覆盖大多数情况,但有时需要更多的灵活性,可以使用标准库提供的std::unique_lock
模板。
std::unique_lock
——灵活的锁
std::unique_lock
实例不带互斥量:信息已存储,且已更新。在创建时自动加锁,在销毁时自动解锁****
unique_lock是对lock_guard的扩展,允许在生命周期内再调用lock和unlock来加解锁以切换锁的状态。
但是内存会占用比较多的空间,并且比std::lock_guard
稍慢一些。
使用时std::lock_guard<std::mutex> lk(mtx);
直接替换成std::unique_lock<std::mutex> lk(mtx);
lock_guard和unique_lock第二参数的作用:
std::adopt_lock
: 假设调用一方已经拥有了互斥量的所有权(已经lock成功了);通知lock_guard不需要再构造函数中lock这个互斥量了。1
2
3
4
5{
std::lock_guard<std::mutex> lock(g_mtx, std::adopt_lock);
g_mtx.lock();
临界区或临界资源
}对于lock_guard第二参数类型只有一种,锁管理器构造的时候不会自动对可锁对象上锁;由可锁对象自己加锁;等锁管理器析构的时候自动解锁。
如果指定了第二参数,但是没有lock,锁管理器析构的时候解锁了无拥有权的可锁对象,导致异常。
多锁场景下,会调用
std::lock
避免死锁的出现,但是这个方法要求锁管理器不能拥有可锁对象,由std::lock
方法执行锁操作。如果没有提供第二参数构造函数,那么就无法使用该方法。注意:使用该参数类型构造的锁管理器必须只能通过可锁对象进行lock,不可通过锁管理器进行lock,误用会导致程序异常。
std::defer_lock
: 初始化了一个没有加锁的mutex。前提是自己不能先lock否则报异常。1
2
3
4
5{
std::unique_lock<std::mutex> lock(g_mtx, std::defer_lock);
lock.lock(); // 不能用g_mtx.lock(),第二次锁的时候会崩溃
临界区或临界资源
}锁管理器在构造的时候不主动lock且不拥有可锁对象;如果后续执行lock,锁管理器析构的时候自动解锁。
注意:该类型构造的锁管理器只能通过锁管理器执行lock且拥有可锁对象。如果直接调用可锁对象进行锁操作后,会导致程序异常。
1 | class some_big_object; |
std::unique_lock
支持lock(), try_lock()和unlock()成员函数
不同域中互斥量的传递
std::unique_lock
是可移动,但不可赋值的类型。
函数get_lock()锁住了互斥量,然后准备数据,返回锁的调用函数:
1 | std::unique_lock<std::mutex> get_lock() |
std::unique_lock
的灵活性同样也允许实例在销毁之前放弃拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock
的成员函数提供类似于锁定和解锁的功能。
锁的粒度
锁的粒度:用来描述通过一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。
如果很多线程正在等待同一个资源(等待收银员对自己拿到的商品进行清点),当有线程持有锁的时间过长,这就会增加等待的时间(别等到结账的时候,才想起来蔓越莓酱没拿)。
std::unique_lock
在这种情况下工作正常,调用unlock()时,代码不需要再访问共享数据。当再次需要对共享数据进行访问时,再调用lock()就可以了。
1 | void get_and_process_data() |
不需要让锁住的互斥量越过对process()函数的调用,所以可以在函数调用①前对互斥量进行手动解锁,之后对其再次上锁②。
代码3.10 比较操作符中一次锁住一个互斥量
1 | class Y |
比较操作符首先通过调用get_detail()成员函数检索要比较的值②③,函数在索引时被锁保护着①。比较操作符会在之后比较索引出来的值④。
虽然锁只持有一次的操作能减少锁持有的时间(这样能消除死锁的可能性),但这里有一个微妙的语义操作同时对两个锁住的值进行比较。
当操作符返回true时,就意味着在这个时间点上的lhs.some_detail与另一个时间点的rhs.some_detail相同。
这两个值在读取之后,可能会以任意方式修改。两个值会在②和③处进行交换,这样就会失去了比较的意义。比较可能会返回true,表明这两个值是相等的,实际上这两个值相等的情况可能就发生在一瞬间。
当持有锁的时间没有达到整个操作时间,就会让自己处于条件竞争的状态。
保护共享数据的方式
保护共享数据的初始化过程
假设有一个共享源,构建代价很昂贵,它可能会打开一个数据库连接或分配出很多的内存。
延迟初始化(Lazy initialization)在单线程代码很常见————每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:
1 | std::shared_ptr<some_resource> resource_ptr; |
转为多线程代码时,只有①处需要保护,这样共享数据对于并发访问就是安全的。
代码3.11 使用延迟初始化(线程安全)的过程
1 | std::shared_ptr<some_resource> resource_ptr; |
代码3.12 使用std::call_once
作为类成员的延迟初始化(线程安全)
1 | class X |
例子中第一次调用send_data()①或receive_data()③的线程完成初始化过程。使用成员函数open_connection()去初始化数据,也需要将this指针传进去。
std::mutex
和std::once_flag
的实例不能拷贝和移动,需要通过显式定义相应的成员函数,对这些类成员进行操作。
保护不常更新的数据机构
互斥量“读者-作者锁”,其允许两种不同的使用方式:一个“作者”线程独占访问和共享访问,让多个“读者”线程并发访问。
c++17标准库提供了两种非常好的互斥量——std::shared_mutex
和std::shared_timed_mutex
。
对于更新操作,可以使用std::lock_guard<std::shared_mutex>
和std::unique_lock<std::shared_mutex>
上锁。
代码3.13 使用std::shared_mutex
对数据结构进行保护
1 |
|
多线程可以同时调用find_entry(),且不会出错。
update_or_add_entry()函数调用时,独占锁会阻止其他线程对数据结构进行修改,并且阻止线程调用find_entry()。
嵌套锁
线程对已经获取的std::mutex
(已经上锁)再次上锁是错误的,尝试这样做会导致未定义行为。
C++标准库提供了std::recursive_mutex
类。除了可以在同一线程的单个实例上多次上锁,其他功能与std::mutex
相同。
其他线程对互斥量上锁前,当前线程必须释放拥有的所有锁,所以如果你调用lock()三次,也必须调用unlock()三次。
使用std::lock_guard<std::recursive_mutex>
和std::unique_lock<std::recursive_mutex>
嵌套锁一般用在可并发访问的类上。每个公共成员函数都会对互斥量上锁,然后完成对应的操作后再解锁互斥量。不过,有时成员函数会调用另一个成员函数,这种情况下,第二个成员函数也会试图锁住互斥量,这就会导致未定义行为的发生。
解决方案为将互斥量转为嵌套锁,第二个成员函数就能成功的进行上锁,并且函数能继续执行。
缺点:对应类的不变量通常会被破坏。当不变量被破坏时,第二个成员函数还需要继续执行。
一个比较好的方式是,从中提取出一个函数作为类的私有成员,这个私有成员函数不会对互斥量进行上锁(调用前必须获得锁)。然后,需要仔细考虑一下,这种情况调用新函数时数据的状态。
4.同步操作
等待事件或条件
当一个线程等待另一个线程完成时,可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标识进行重置。但是,这种方式会消耗线程的执行时间检查标识,并且当互斥量上锁后,其他线程就没有办法获取锁,就会持续等待。
在等待线程在检查间隙,使用std::this_thread::sleep_for()
进行周期性的间歇
1 | bool flag; |
当线程休眠时没有浪费执行时间,但很难确定正确的休眠时间。
条件变量:通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时)
条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行),终止线程将会向等待着的线程广播“条件达成”的信息。
等待条件达成
c++标准库条件变量的实现:std::condition_variable
和std::condition_variable_any
,这两个实现都包含在<condition_variable>
头文件的声明中。
两者都需要与互斥量一起才能工作(互斥量是为了同步),前者仅能与std::mutex
一起工作,而后者可以和合适的互斥量一起工作,从而加上了_any
的后缀。
std::condition_variable_any
更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将std::condition_variable
作为首选类型。
代码4.1 使用std::condition_variable
处理数据等待
1 | std::mutex mut; |
wait()会去检查这些条件(通过Lambda函数),当条件满足(Lambda函数返回true)时返回。如果条件不满足(Lambda函数返回false),wait()将解锁互斥量,并且将线程(处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠中苏醒,重新获取互斥锁,并且再次进行条件检查。在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并重新等待。
用std::unique_lock
而不使用std::lock_guard
的原因:等待中的线程必须在等待期间解锁互斥量,并对互斥量再次上锁,而std::lock_guard
没有这么灵活。
可以不使用lambda表达式,写一个检查函数传入到wait()。调用wait()的过程中,在互斥量锁定时,可能会去检查条件变量若干次,当提供测试条件的函数返回true就会立即返回。
伪唤醒:当等待线程重新获取互斥量并检查条件变量时,并非直接响应另一个线程的通知。
当多个线程同时在等待同一条件,此时条件满足发起唤醒则可能会唤醒多个线程,但是如果对应的资源不够所有唤醒线程使用,则剩余线程的唤醒就是无意义的,也就被称作虚假唤醒。
避免方法:在睡眠返回之后重新检查条件判断(while代替if)
std::condition_variable::wait
是“忙碌-等待”的优化。下面用简单的循环实现了一个“忙碌-等待”:
1 | template<typename Predicate> |
构建线程安全队列
代码4.3 线程安全队列的接口
1 |
|
裁剪了很多构造函数,并禁止简单赋值。需要提供两个版本的try_pop()和wait_for_pop()。
代码4.4 从代码4.1中提取push()和wait_and_pop()
1 |
|
线程队列中有互斥量和条件变量,所以独立的变量就不需要了①,并且push()不需要外部同步②。当然,wait_and_pop()还要兼顾条件变量的等待③。
代码4.5 使用条件变量的线程安全队列(完整版)
1 |
|
empty()是一个const成员函数,并且传入拷贝构造函数的other形参是一个const引用。因为其他线程可能有非const引用对象,并调用变种成员函数,所以这里有必要对互斥量上锁。又因为锁住互斥量是个可变操作,所以互斥量成员必须为mutable①才能在empty()和拷贝构造函数中进行上锁。
当数据准备完成时,调用notify_one()将会唤醒一个正在wait()的线程,检查条件和wait()函数的返回状态(因为仅是向data_queue添加了一个数据项)。
使用future
future:当线程需要等待特定事件时,某种程度上来说就需要知道期望的结果。之后,线程会周期性(较短的周期)的等待或检查事件是否触发(检查信息板),检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间也可以先执行另外的任务,直到对应的任务触发,而后等待future的状态会变为就绪状态。
当事件发生时(状态为就绪),这个future就不能重置了。
c++标准库中,声明在<future>
头文件中: unique future(std::future<>
)和shared futures(std::shared_future<>
),与了std::unique_ptr
和std::shared_ptr
非常类似。
std::future
只能与指定事件相关联,而std::shared_future
就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且可以访问与事件相关的数据。
当多个线程需要访问一个独立future对象时,必须使用互斥量或类似同步机制进行保护。
后台任务的返回值
当不着急让任务结果时,可以使用std::async
启动一个异步任务。与std::thread
对象等待的方式不同,std::async
会返回一个std::future
对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()成员函数,就会阻塞线程直到future为就绪为止,并返回计算结果。
代码4.6 std::future
从异步任务中获取返回值
1 |
|
std::async
允许通过添加额外的调用参数,向函数传递额外的参数。第一个参数是指向成员函数的指针,第二个参数提供这个函数成员类的具体对象(是通过指针,也可以包装在std::ref
中),剩余的参数可作为函数的参数传入。
代码4.7 使用std::async
向函数传递参数
1 |
|
future的等待取决于std::async
是否启动一个线程,或是否有任务在进行同步。
可以在函数调用之前向std::async
传递一个额外参数:
std::launch::async
,表明函数必须在其所在的独立线程上执行;
std::launch::defered
,表明函数调用延迟到wait()或get()函数调用时才执行。
std::launch::deferred | std::launch::async
表明实现可以选择这两种方式的一种(默认选项)。
1 | auto f6=std::async(std::launch::async,Y(),1.2); // 在新线程上执行 |
future与任务关联
std::packaged_task<>
会将future与函数或可调用对象进行绑定。当调用std::packaged_task<>
对象时,就会调用相关函数或可调用对象,当future状态为就绪时,会存储返回值。
构造std::packaged_task<>
实例时,就必须传入函数或可调用对象。这个函数或可调用的对象,需要能接收指定的参数和返回(可转换为指定返回类型的)值。类型可以不完全匹配,因为这里类型可以隐式转换,可以用int类型参数和返回float类型的函数,来构建std::packaged_task<double(double)>
实例。
函数签名的返回类型可以用来标识从get_future()返回的std::future<>
的类型,而函数签名的参数列表,可用来指定packaged_task的函数调用操作符。例如,模板偏特化
代码4.8 std::packaged_task<>
的偏特化
1 | template<> |
当std::packaged_task
作为函数调用时,实参将由函数调用操作符传递至底层函数,并且返回值作为异步结果存储在std::future
中,并且可通过get_future()获取。
std::packaged_task
对任务进行打包,并适时的取回future。当异步任务需要返回值时,可以等待future状态变为“就绪”。
线程间传递任务
代码4.9 使用std::packaged_task
执行一个图形界面线程
1 |
|
std::packaged_task
是一个类模板,顾名思义是用来打包的,将一个可调用对象封装起来,然后可以将其的返回值传给future。std::packaged_task<函数返回类型(参数类型)> 变量名(函数名)
。
1 |
|
使用std::promises
当线程处理多个连接事件,来自不同的端口连接的数据包基本上以乱序方式进行处理。同样的,数据包也将以乱序的方式进入队列。很多情况下,一些应用不是等待数据成功的发送,就是等待(新的)指定网络接口数据的接收成功。
std::promise
是一个类模板,它的作用是在不同的线程中实现数据的同步,与future结合使用,也间接实现了future在不同线程间的同步。
std::promise/std::future
对提供一种机制:future可以阻塞等待线程,提供数据的线程可以使用promise对相关值进行设置,并将future的状态置为“就绪”。
当promise设置完毕(使用set_value()成员函数)时,对应的future状态就变为“就绪”,并且可用于检索已存储的值。当设置值之前销毁std::promise
,将会存储一个异常。
代码4.10 使用promise解决单线程多连接问题
1 |
|
set_value()
则直接将future的状态设置为ready。需要注意的是在使用的过程中不能多次set_value()
,也不能多次get_future()
和多次get()
,因为一个promise对象只能和一个对象相关联,否则就会抛出异常。
将异常存于future中
函数作为std::async
的一部分时,当调用抛出一个异常时,这个异常就会存储到future中,之后future的状态置为“就绪”,之后调用get()会抛出已存储的异常(注意:标准级别没有指定重新抛出的这个异常是原始的异常对象,还是一个拷贝。不同的编译器和库将会在这方面做出不同的选择)。
将函数打包入std::packaged_task
任务包后,当任务调用时,同样的事情也会发生。打包函数抛出一个异常,这个异常将存储在future中,在get()调用时会再次抛出。
std::promise
也能提供同样的功能。当存入的是异常而非数值时,就需要调用set_exception()成员函数,而非set_value()。这通常是用在一个catch块中,并作为算法的一部分。为了捕获异常,这里使用异常填充promise:
1 | extern std::promise<double> some_promise; |
使用std::current_exception()
来检索抛出的异常,可用std::copy_exception()
作为替代方案,std::copy_exception()
会直接存储新的异常而不抛出:
1 | some_promise.set_exception(std::copy_exception(std::logic_error("foo "))); |
这比使用try/catch块更加清晰,当异常类型已知,就应该优先使用。
多个线程的等待
多线程在没有额外同步的情况下,访问独立std::future
对象时,就会有数据竞争和未定义行为。因为std::future
独享同步结果,并且通过调用get()函数,一次性的获取数据,这就让并发访问变的毫无意义。
std::shared_future
:其所有权可以在不同的实例中互相传递,但只有一个实例可以获得特定的同步结果,而std::shared_future
实例是可拷贝的,所以多个对象可以引用同一关联期望值的结果。
每一个std::shared_future
的独立对象上,成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时避免数据竞争,必须使用锁来对访问进行保护。优先使用的办法:为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的std::shared_future
对象获取结果,那么多个线程访问共享同步结果就是安全的。
std::shared_future
的实例同步std::future
实例的状态。当std::future
对象没有与其他对象共享同步状态所有权,那么所有权必须使用std::move
将所有权传递到std::shared_future
1 | std::promise<int> p; |
期望值f开始是合法的①,因为引用的是promise p的同步状态,但是在转移sf的状态后,f就不合法了②,而sf就是合法的了③。
如其他可移动对象一样,转移所有权是对右值的隐式操作,所以可以通过std::promise
对象的成员函数get_future()的返回值,直接构造一个std::shared_future
对象,例如:
1 | std::promise<std::string> p; |
std::future
有一个share()成员函数,可用来创建新的std::shared_future
,并且可以直接转移future的所有权。这样也就能保存很多类型,并且使得代码易于修改:
1 | std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator, |
当比较器或分配器有所改动,只需要对promise的类型进行修改即可。future的类型会自动与promise的修改进行匹配。
限时等待
时钟
时钟和与时间相关的工具,都在<chrono>
库头文件中定义。
c++标准库中,时钟是一个类,提供了四种不同的信息:
当前时间可以通过静态成员函数now()从获取。例如,std::chrono::system_clock::now()
会返回系统的当前时间。返回类型为some_clock::time_point
通常情况下,因为std::chrono::system_clock
可调,所以是不稳定的。这可调可能造成首次调用now()返回的时间要早于上次调用now()所返回的时间,这就违反了节拍频率的均匀分布。
稳定时钟std::chrono::steady_clock
。
时间段
std::chrono::duration<>
函数模板能够对时间段进行处理(线程库使用到的所有C++时间处理工具,都在std::chrono
命名空间内)。第一个模板参数是一个类型表示(比如,int,long或double),第二个模板参数是定制部分,表示每一个单元所用秒数。
例如,std::chrono::duration<short, std::ratio<60, 1>>
,几分钟的时间要存在short类型中,因为60秒是才是1分钟,所以第二个参数写成std::ratio<60, 1>
。当需要将毫秒级计数存在double类型中时,可以写成std::chrono::duration<double, std::ratio<1, 1000>>
,因为1秒等于1000毫秒
标准库在std::chrono
命名空间内为时间段变量提供一系列预定义类型:nanoseconds[纳秒] , microseconds[微秒] , milliseconds[毫秒] , seconds[秒] , minutes[分]和hours[时]。
当不要求截断值的情况下(时转换成秒是没问题,但是秒转换成时就不行)时间段的转换是隐式的,显示转换可以由std::chrono::duration_cast<>
来完成。
1 | std::chrono::milliseconds ms(54802); |
这里的结果就是截断的,而不是进行了舍入,所以s最后的值为54。
时间值支持四则运算,所以能够对两个时间段进行加减,或者是对一个时间段乘除一个常数(模板的第一个参数)来获得一个新时间段变量。例如,5*seconds(1)与seconds(5)或minutes(1)-seconds(55)是一样。
在时间段中可以通过count()成员函数获得单位时间的数量。例如,std::chrono::milliseconds(1234).count()
就是1234。
例如:等待future状态变为就绪需要35毫秒:
1 | std::future<int> f=std::async(some_task); |
等待函数会返回状态值,表示是等待是超时,还是继续等待。等待future时,超时时会返回std::future_status::timeout
。当future状态改变,则会返回std::future_status::ready
。当与future相关的任务延迟了,则会返回std::future_status::deferred
。
系统调度的不确定性和不同操作系统的时钟精度意味着:线程调用和返回的实际时间间隔可能要比35毫秒长。
时间点
时间点可用std::chrono::time_point<>
来表示,第一个参数用来指定使用的时钟,第二个函数参数用来表示时间单位(特化的std::chrono::duration<>
)。
可以通过对std::chrono::time_point<>
实例进行加/减,来获得一个新的时间点,所以std::chrono::hight_resolution_clock::now() + std::chrono::nanoseconds(500)
将得到500纳秒后的时间,这对于计算绝对时间来说非常方便。
可以减去一个时间点(二者需要共享同一个时钟),结果是两个时间点的时间差:
1 | auto start=std::chrono::high_resolution_clock::now(); |
代码4.11 等待条件变量满足条件——有超时功能
1 |
|
当没有什么可以等待时,可在一定时限中等待条件变量。这种方式中,循环的整体长度有限。当循环中使用wait_for()时,可能在等待了足够长的时间后结束等待(在假唤醒之前),且下一次等待又开始了。这可能重复很多次,出现无限等待的情况。
使用超时
最简单方式,对特定线程添加延迟处理。
例如循环检查“done”标志,两个处理函数分别是std::this_thread::sleep_for()
和std::this_thread::sleep_until()
。当线程因为指定时长而进入睡眠时,可使用sleep_for()唤醒,可指定休眠的时间点,之后可使用sleep_until唤醒。
std::mutex
和std::recursive_mutex
都不支持超时,而std::timed_mutex
和std::recursive_timed_mutex
支持超时。这两种类型也有try_lock_for()和try_lock_until()成员函数,可以在一段时期内尝试获取锁,或在指定时间点前获取互斥锁。
表4.1 可接受超时的函数
类型/命名空间 | 函数 | 返回值 |
std::this_thread 命名空间 | sleep_for(duration) | N/A |
sleep_until(time_point) | ||
std::condition_variable 或 std::condition_variable_any | wait_for(lock, duration) | std::cv_status::time_out 或 std::cv_status::no_timeout |
wait_until(lock, time_point) | ||
wait_for(lock, duration, predicate) | bool —— 当唤醒时,返回谓词的结果 | |
wait_until(lock, duration, predicate) | ||
std::timed_mutex 或 std::recursive_timed_mutex | try_lock_for(duration) | bool —— 获取锁时返回true,否则返回fasle |
try_lock_until(time_point) | ||
std::unique_lock<TimedLockable> | unique_lock(lockable, duration) | N/A —— 对新构建的对象调用owns_lock(); |
unique_lock(lockable, time_point) | 当获取锁时返回true,否则返回false | |
try_lock_for(duration) | bool —— 当获取锁时返回true,否则返回false | |
try_lock_until(time_point) | ||
std::future<ValueType>或std::shared_future<ValueType> | wait_for(duration) | 当等待超时,返回std::future_status::timeout |
wait_until(time_point) | 当期望值准备就绪时,返回std::future_status::ready | |
当期望值持有一个为启动的延迟函数,返回std::future_status::deferred |
5.内存模型和原子操作
内存模型
对象和内存位置
图5.1 将一个struct分解为多个对象,展示每个对象的内存位置。
完整的struct是一个有多个子对象(每一个成员变量)组成的对象。
四个原则:
- 每个变量都是对象,包括其成员变量的对象。
- 每个对象至少占有一个内存位置。
- 基本类型都有确定的内存位置(无论类型大小如何,即使他们是相邻的,或是数组的一部分)。
- 相邻位域是相同内存中的一部分。
对象、内存位置和并发
当两个线程访问同一个内存位置,如果线程不更新数据,只读数据不需要保护或同步。当线程对内存位置上的数据进行修改,就可能会产生条件竞争。
为了避免条件竞争,线程就要以一定的顺序执行:
- 使用互斥量来确定访问的顺序。当同一互斥量在两个线程同时访问前锁住,那么在同一时间内就只有一个线程能够访问对应的内存位置。
- 使用原子操作决定两个线程的访问顺序,当多个线程访问同一个内存地址时,对每个访问者都需要设定顺序。
如果不规定对同一内存地址访问的顺序,那么访问就不是原子的。当两个线程都是“写入者”时,就会产生数据竞争和未定义行为。
使用原子操作避免未定义行为。
修改顺序
C++程序中的对象都有(由程序中的所有线程对象)在初始化开始阶段确定好修改顺序的。大多数情况下,这个顺序不同于执行中的顺序,但在给定的程序中,所有线程都需要遵守这个顺序。
如果使用原子操作,编译器就有责任去做同步。
原子操作和原子类型
标准原子类型
标准原子类型定义在头文件<atomic>
中。
原子操作可以替代互斥量,来完成同步操作。
表5.1 标准原子类型的备选名和与其相关的std::atomic<>
特化类
原子类型 | 相关特化类 |
---|---|
atomic_bool | std::atomic<bool> |
atomic_char | std::atomic<char> |
atomic_schar | std::atomic<signed char> |
atomic_uchar | std::atomic<unsigned char> |
atomic_int | std::atomic<int> |
atomic_uint | std::atomic<unsigned> |
atomic_short | std::atomic<short> |
atomic_ushort | std::atomic<unsigned short> |
atomic_long | std::atomic<long> |
atomic_ulong | std::atomic<unsigned long> |
atomic_llong | std::atomic<long long> |
atomic_ullong | std::atomic<unsigned long long> |
atomic_char16_t | std::atomic<char16_t> |
atomic_char32_t | std::atomic<char32_t> |
atomic_wchar_t | std::atomic<wchar_t> |
C++标准库不仅提供基本原子类型,还定义了与原子类型对应的非原子类型,就如同标准库中的std::size_t
。如表5.2所示这些类型:
表5.2 标准原子类型定义(typedefs)和对应的内置类型定义(typedefs)
原子类型定义 | 标准库中相关类型定义 |
---|---|
atomic_int_least8_t | int_least8_t |
atomic_uint_least8_t | uint_least8_t |
atomic_int_least16_t | int_least16_t |
atomic_uint_least16_t | uint_least16_t |
atomic_int_least32_t | int_least32_t |
atomic_uint_least32_t | uint_least32_t |
atomic_int_least64_t | int_least64_t |
atomic_uint_least64_t | uint_least64_t |
atomic_int_fast8_t | int_fast8_t |
atomic_uint_fast8_t | uint_fast8_t |
atomic_int_fast16_t | int_fast16_t |
atomic_uint_fast16_t | uint_fast16_t |
atomic_int_fast32_t | int_fast32_t |
atomic_uint_fast32_t | uint_fast32_t |
atomic_int_fast64_t | int_fast64_t |
atomic_uint_fast64_t | uint_fast64_t |
atomic_intptr_t | intptr_t |
atomic_uintptr_t | uintptr_t |
atomic_size_t | size_t |
atomic_ptrdiff_t | ptrdiff_t |
atomic_intmax_t | intmax_t |
atomic_uintmax_t | uintmax_t |
对于标准类型进行typedef T,相关的原子类型就在原来的类型名前加上atomic_的前缀:atomic_T。
通常,标准原子类型不能进行拷贝和赋值,它们没有拷贝构造函数和拷贝赋值操作符。但是,可以隐式转化成对应的内置类型,所以这些类型依旧支持赋值,可以使用load()
和store()
、exchange()
、compare_exchange_weak()
和compare_exchange_strong()
。
它们都支持复合赋值符:+=, -=, *=, |= 等等。并且使用整型和指针的特化类型还支持++和–操作。
每种函数类型的操作都有一个内存序参数,这个参数可以用来指定存储的顺序。操作分为三类:
- Store操作,可选如下内存序:
memory_order_relaxed
,memory_order_release
,memory_order_seq_cst
。 - Load操作,可选如下内存序:
memory_order_relaxed
,memory_order_consume
,memory_order_acquire
,memory_order_seq_cst
。 - Read-modify-write(读-改-写)操作,可选如下内存序:
memory_order_relaxed
,memory_order_consume
,memory_order_acquire
,memory_order_release
,memory_order_acq_rel
,memory_order_seq_cst
。
std::atomic_flag
std::atomic_flag
是最简单的原子类型,这个类型的对象可以在两个状态间切换:设置和清除。
std::atomic_flag
类型的对象必须被ATOMIC_FLAG_INIT初始化。初始化标志位是“清除”状态。这里没得选择,这个标志总是初始化为“清除”:
1 | std::atomic_flag f = ATOMIC_FLAG_INIT; |
这适用于任何对象的声明,是唯一需要以如此特殊的方式初始化的原子类型,但也是唯一保证无锁的类型。首次使用时,需要初始化。如果std::atomic_flag
是静态存储的,那么就的保证其是静态初始化的,也就意味着没有初始化顺序问题。
当标志对象已初始化,只能做三件事情:
- 销毁,clear()成员函数,是一个存储操作,所以不能有memory_order_acquire或memory_order_acq_rel语义
- 清除或设置(查询之前的值),test_and_set()成员函数,“读-改-写”操作,可以应用于任何内存顺序。
每一个原子操作,默认的内存序都是memory_order_seq_cst。例如:
1 | f.clear(std::memory_order_release); // 1 使用释放语义清除标志 |
不能拷贝构造std::atomic_flag
对象,不能将一个对象赋予另一个std::atomic_flag
对象。这不是std::atomic_flag
特有的属性,而是所有原子类型共有的属性。
原子类型的所有操作都是原子的,而赋值和拷贝调用了两个对象,这就就破坏了操作的原子性。
互斥锁与自旋锁
同一时刻只能有一个线程获取到锁,没有获取到锁的线程通常有两种处理方式:
- 没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁叫做自旋锁,它不用将线程阻塞起来(NON-BLOCKING)
- 把自己阻塞起来,等待重新调度请求,这种叫做互斥锁。
自旋锁的定义:当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)。
因为自旋锁避免了操作系统进程调度和线程切换,所以自旋锁通常适用在时间比较短的情况下。由于这个原因,操作系统的内核经常使用自旋锁。但是,如果长时间上锁的话,自旋锁会非常耗费性能,它阻止了其他线程的运行和调度。线程持有锁的时间越长,则持有该锁的线程将被
OS(Operating System)
调度程序中断的风险越大。
代码5.1 使用std::atomic_flag
实现自旋锁
1 | class spinlock_mutex |
由于std::atomic_flag
的局限性太强,没有非修改查询操作,甚至不能像普通的布尔标志那样使用。所以,实际操作中最好使用std::atomic<bool>
std::atomic<bool>
最基本的原子整型类型就是std::atomic<bool>
,它有着比std::atomic_flag
更加齐全的布尔标志特性。
不能拷贝和拷贝赋值,但可以使用非原子的bool类型进行构造,所以可以初始化为true或false,并且可以从非原子bool变量赋值给std::atomic<bool>
:
1 | std::atomic<bool> b(true); |
store()是一个存储操作,而load()是一个加载操作,exchange()是一个“读-改-写”操作:
1 | std::atomic<bool> b; |
另一种存储方式:当前值与预期值一致时,存储新值的操作。存储一个新值(或旧值)取决于当前值
比较/交换:表现为compare_exchange_weak()和compare_exchange_strong()
比较原子变量的当前值和期望值,当两值相等时,存储所提供值。当两值不等,期望值就会被更新为原子变量中的值。
“比较/交换”函数值是一个bool变量,当返回true时执行存储操作,false则更新期望值。当存储完成(因为只相等),则操作是成功的,否则即为失败。操作成功是返回true,失败时返回false。
因为compare_exchange_weak()
可以伪失败,所以通常会配合一个循环使用:
1 | bool expected=false; |
循环中expected的值始终是false,表示compare_exchange_weak()会莫名的失败。
另一方面,当实际值与expected
不符,compare_exchange_strong()就能保证值返回false。这就能消除对循环的需要,就可以知道是否成功的改变了一个变量,或已让另一个线程完成。
“compare/exchange”另一点不同的是,它拥有对两个内存序的参数进行操作的能力,这就允许内存序语义在成功和失败的例子中有所不同。可能成功时使用memory_order_acq_rel,而失败时使用memory_order_relaxed。失败的“compare/exchange”将不会进行存储,所以“compare/exchange”操作不能拥有meory_order_release或memory_order_acq_rel。
下面对compare_exchange_weak()的两次调用是等价的:
1 | std::atomic<bool> b; |
std::atomic<bool>
和std::atomic_flag
的不同之处在于,std::atomic<bool>
可能不是无锁的。为了保证操作的原子性,其实现中可能需要内置的互斥量。特殊情况时,可以使用is_lock_free()成员函数,检查std::atomic<bool>
上的操作是否无锁。这是除了std::atomic_flag
之外,另一个所有原子类型都拥有的特征(is_lock_free)。
std::atomic<T*>
原子指针类型,可以使用内置类型或自定义类型T,通过特化std::atomic<T*>
进行定义,操作是针对于相关类型的指针。
不能拷贝构造,也不能拷贝赋值,但是可以通过合适的类型指针进行构造和赋值。std::atomic<T*>
也有load(), store(), exchange(), compare_exchange_weak()和compare_exchage_strong()成员函数,获取与返回的类型都是T*。
std::atomic<T*>
为指针运算提供新的操作。基本操作有fetch_add()和fetch_sub(),它们在存储地址上做原子加法和减法,为+=, -=, ++和–提供简易的封装。对于内置类型的操作,例如:如果x是std::atomic<Foo*>
类型的数组的首地址,然后x+=3让其偏移到第四个元素的地址,并返回一个普通的Foo*
类型值,这个指针值是指向数组中第四个元素。
fetch_add()和fetch_sub()的返回值略有不同(所以x.ftech_add(3)让x指向第四个元素,并且函数返回指向第一个元素的地址)。这种操作也被称为“交换-相加”,并且这是一个原子的“读-改-写”操作,如同exchange()和compare_exchange_weak()/compare_exchange_strong()一样。正像其他操作那样,返回值是一个普通的T*
值,而非是std::atomic<T*>
对象的引用,所以调用代码可以基于之前的值进行操作:
1 | class Foo{}; |
函数也允许内存序作为给定函数的参数:
1 | p.fetch_add(3,std::memory_order_release); |
因为fetch_add()和fetch_sub()都是“读-改-写”操作,可以使用任意的内存序,以及加入到一个释放序列中。因为没办法提供必要的信息(这些形式都具有memory_order_seq_cst语义),所以指定的语序不支持操作符形式。
标准原子整型的相关操作
如同普通的操作集合一样(load(), store(), exchange(), compare_exchange_weak(), 和compare_exchange_strong()),std::atomic<int>
和std::atomic<unsigned long long>
也是有一套完整的操作可以供使用:fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor(),还有复合赋值方式((+=, -=, &=, |=和^=),以及++和–(++x, x++, –x和x–)。
虽然对于普通的整型来说,这些复合赋值方式还不完全:除法、乘法和移位操作不在其中。因为,整型原子值通常用来作计数器,或者是掩码,所以以上操作的缺失显得不是那么重要。如果需要,可以使用compare_exchange_weak()完成。
对于std::atomic<T*>
类型,紧密相关的两个函数就是fetch_add()和fetch_sub()。函数原子化操作,并且返回旧值,而符合赋值运算会返回新值。前缀加减和后缀加减与普通用法一样:++x对变量进行自加,并且返回新值;而x++对变量自加,返回旧值。
std::atomic<>
类模板
模板允许用户使用自定义类型创建一个原子变量(除了标准原子类型之外),需要满足一定的标准才可以使用std::atomic<>
。
为了使用std::atomic<UDT>
(UDT是用户定义类型),这个类型必须有拷贝赋值运算符。这就意味着这个类型不能有任何虚函数或虚基类,以及必须使用编译器创建的拷贝赋值操作。自定义类型中所有的基类和非静态数据成员也都需要支持拷贝赋值操作。这(基本上)就允许编译器使用memcpy()或赋值操作的等价操作,因为实现中没有用户代码。
不要将锁定区域内的数据以引用或指针的形式,作为参数传递给用户提供的函数。
创建一个std::atomic<std::vector<int>>
类型。不能使用包含有计数器,标志指针和简单数组的类型,作为特化类型。虽然这不会导致任何问题,但是越是复杂的数据结构,就有越多的操作,而非只有赋值和比较。如果这种情况发生了,最好使用std::mutex
保护数据。
当使用用户定义类型T进行实例化时,std::atomic<T>
的可用接口就只有: load(), store(), exchange(), compare_exchange_weak(), compare_exchange_strong()和赋值操作,以及向类型T转换的操作。
表5.3 每一个原子类型所能使用的操作
原子操作的非成员函数
大多数非成员函数的命名与对应成员函数有关,需要atomic_
作为前缀(比如,std::atomic_load()
)。这些函数都会重载不同的原子类型,指定内存序时会分成两种:一种没有标签,另一种以_explicit
为后缀,并且需要额外的参数,或将内存序作为标签,亦或只有标签(例如,std::atomic_store(&atomic_var,new_value)
与std::atomic_store_explicit(&atomic_var,new_value,std::memory_order_release
)。
成员函数隐式引用原子对象,所有非成员函数都持有一个指向原子对象的指针(作为第一个参数)。
例如,std::atomic_is_lock_free()
只有一种类型(虽然会被其他类型所重载),并且对于同一个对象a,std::atomic_is_lock_free(&a)
返回值与a.is_lock_free()相同。
同样的,std::atomic_load(&a)
和a.load()的作用一样。需要注意的是,a.load(std::memory_order_acquire)
与std::atomic_load_explicit(&a, std::memory_order_acquire)
的操作相同。
非成员函数的设计是为了与C语言兼容,C语言中没有引用。
C++标准库也对原子类型中的std::shared_ptr<>
智能指针类型提供非成员函数,这打破了“只有原子类型,才能提供原子操作”的原则。std::shared_ptr<>
不是原子类型,但是C++标准委员会认为这很重要。可使用的原子操作有:load, store, exchange和compare/exchange,这些操作重载了标准原子类型的操作,并且可获取std::shared_ptr<>*
作为第一个参数:
1 | std::shared_ptr<my_data> p; |
标准原子类型不仅仅是为了避免数据竞争所造成的未定义行为,还允许用户对不同线程上的操作进行强制排序。这种强制排序是数据保护和同步操作的基础,例如:std::mutex
和std::future
。
同步操作和强制排序
假设两个线程,一个向数据结构中填充数据,另一个读取数据结构中的数据。为了避免恶性条件竞争,第一个线程设置一个标志,用来表明数据已经准备就绪,从而第二个线程在这个标志设置前不能读取数据。
代码5.2 不同线程对数据的读写
1 |
|
访问顺序通过对std::atomic<bool>
类型的data_ready变量进行操作完成,这些操作通过*先行(happens-before)和同发*(synchronizes-with)确定顺序。写入数据③在写入data_ready④前发生,读取①发生在读取数据②之前。
强制了顺序:写入数据先行于读取数据。
当data_ready①为true,写操作就会与读操作同步,建立一个“先行”的关系。因为“先行”关系是可传递的,所以写入③先行于写入④,这两个行为又先行于读取操作①,之前的操作都先行于读取数据②。
图5.2 对非原子操作,使用原子操作对操作进行强制排序
同步发生
“同发”只在原子类型之间进行。例如:操作一个数据结构(对互斥量上锁),如果数据结构包含有原子类型,并且操作内部执行了一定的原子操作,那这些操作就是“同发”关系。
“同发”的基本想法:原子写操作W对变量x进行标记,同步与对x进行原子读操作,读取的是W操作写入的内容,或是W之后,同一线程上的原子写操作对x写入的值,亦或是任意线程对x的一系列原子读-改-写操作(例如,fetch_add()或compare_exchange_weak())。
因为对原子类型的操作默认都有“适当的标记”,如果线程A存储了一个值,并且线程B读取了这个值,线程A的存储操作与线程B的载入操作就是同步发生关系。
所有细微的差别都在“适当的标记”中,C++内存模型允许为原子类型提供各种约束顺序。
先行发生
“先行”关系是一个程序中基本构建块的操作顺序:指定了某个操作去影响另一个操作。
对于单线程来说:一个操作排在另一个之后,那这个操作就先执行。
如果源码中操作A发生在操作B之前,那A就先行于B。可以回看代码5.2:对data的写入③先于对data_ready④的写入。如果操作在同时发生,因为操作间无序执行,通常情况下就没有先行关系了。
代码5.3 对于参数中的函数调用顺序未指定顺序
1 |
|
程序会输出“1,2”或“2,1”,因为两个get_num()的执行顺序未指定。
这种情况下,操作在单一声明中可测序,例如:逗号操作符的使用或是表达式的结果作为参数传给另一个表达式。通常情况下,操作在单一声明中不可排序,所以无法先行安排顺序(也就没有先行发生了)。
线程间先行可以与排序先行相结合:如果操作A排序先行于操作B,并且操作B线程间先行于操作C,那么A线程间先行于C。同样的,如果A同步于B,并且B排序先于C,那么A线程间先行于C。当对数据进行一系列修改(单线程)时,只需要对数据进行一次同步即可。
如果操作A与操作B同步,或操作A的顺序在操作B之前,那么A就是强先行于B。也适用于顺序传递:如果A强先行于B,并且B强先行于C,那么A就肯定强先行于C。
原子操作的内存序
有六个内存序列选项可应用于对原子类型的操作:
- memory_order_relaxed
- memory_order_consume
- memory_order_acquire
- memory_order_release
- memory_order_acq_rel
- memory_order_seq_cst
除非为特定的操作指定一个序列选项,要不内存序列默认都是memory_order_seq_cst。
三种内存模型:
- 顺序一致性(sequentially consistent)
- 获取-释放序(memory_order_consume, memory_order_acquire, memory_order_release和memory_order_acq_rel)
- 自由序(memory_order_relaxed)。
不同的内存序在不同的CPU架构下功耗不同。不同种类的内存序,允许使用其提升相关操作的性能。使用顺序一致序(相较于其他序列,它是最简单的)时,对于在通常情况来说就够用了。
顺序一致性
默认序命名为顺序一致性,因为程序中的行为从任意角度去看,序列都保持一定顺序。
如果原子实例的所有操作都是序列一致的,那么多线程就会如单线程那样以某种特殊的排序执行
不同的操作也要遵守相同的顺序。因为行为简单,可以使用原子变量进行编写。通过不同的线程,可以写出所有可能的操作消除那些不一致,以及确认代码的行为是否与预期相符。所以,操作都不能重排;如果代码在一个线程中,将一个操作放在另一个操作前面,那其他线程也需要了解这个顺序。
缺点:因为整个序列中的操作都必须在多个处理器上保持一致,可能需要对处理器间的同步操作进行扩展(代价很昂贵!)
代码5.4 全序——序列一致性
#include
#include
#include <assert.h>
std::atomic
std::atomic
1 |
|
assert⑤语句是永远不会触发的,因为不是存储x的操作①发生,就是存储y的操作②发生。
图5.3 序列一致与先行关系
序列一致性是最简单、直观的序列,因为需要对所有线程进行全局同步,所以也是开销最大的内存序。多处理器设备上需要在处理期间,在信息交换上耗费大量的时间。
为了避免这种消耗,就需考虑使用其他内存序。
非顺序一致性内存
不同线程看到相同操作,不一定有着相同的顺序。
不仅是考虑事情同时发生的问题,还有线程没办法保证一致性。
自由序
原子类型上的操作以自由序执行。同一线程中对于同一变量的操作还是遵从先行关系,但不同线程不需要规定顺序。唯一的要求是在访问同一线程中的单个原子变量不能重排序,当给定线程看到原子变量的值时,随后线程的读操作就不会去检索较早的那个值。当使用memory_order_relaxed时,不需要任何额外的同步,对于每个变量的修改顺序只存在于线程间共享。
代码5.5 非限制操作只有非常少的顺序要求
1 |
|
这次assert⑤可能会触发,因为加载x的操作④可能读取到false,即使加载y的操作③读取到true,并且存储x的操作①先发与存储y的操作②。x和y是两个不同的变量,所以没有顺序去保证每个操作产生相关值的可见性。
非限制操作对于不同变量可以重排序,只要服从任意的先行关系即可(比如,在同一线程中)。
图5.4 非限制原子操作与先发执行
代码5.6 非限制操作——多线程版
1 |
|
三个全局原子变量①和五个线程。每一个线程循环10次,使用时memory_order_relaxed读取三个原子变量的值,并且将它们存储在一个数组上。其中三个线程每次通过循环④来更新其中一个原子变量,这时剩下的两个线程就负责读取。当线程都汇入主线程,就能打印出来每个线程存到数组上的值了。
原子变量go②用来确保线程同时退出。启动线程是昂贵的操作,并且没有明确的延迟,第一个线程可能在最后一个线程开始前结束。每个线程都在go变为true前,都在循环③⑤。并且当go设置为true时,所有线程都会开始运行⑥。
程序一种可能的输出为:
1 | (0,0,0),(1,0,0),(2,0,0),(3,0,0),(4,0,0),(5,7,0),(6,7,8),(7,9,8),(8,9,8),(9,9,10) |
前三行中线程都做了更新,后两行线程只是做读取。每三个值都是一组x,y和z,并按照这样的顺序依次循环。对于输出,需要注意的是:
- 第一组值中x增1,第二组值中y增1,第三组中z增1。
- x元素只在给定集中增加,y和z也一样,但是是不均匀增加,并且每个线程中的相对顺序都不同。
- 线程3看不到x或y的任何更新,它能看到的只有z的更新。这并不妨碍别的线程观察z的更新,并同时观察x和y的更新。
对于非限制操作,这个结果没毛病(但是不是唯一合法的输出)。任意组都用三个变量保持一致,从0到10依次递增,并且线程对相应变量进行递增操作,所以打印出的值在0到10的范围内都合理。
要想获取额外的同步,且不使用全局排序一致,可以使用获取-释放序(acquire-release ordering)。
获取-释放序
自由序(relaxed ordering)的加强版,虽然操作依旧没有统一顺序,但引入了同步。
原子加载就是获取(acquire)操作(memory_order_acquire),原子存储就是释放(memory_order_release)操作,原子读-改-写操作(例如fetch_add()或exchange())在这里,不是“获取”就是“释放”,或者两者兼有的操作(memory_order_acq_rel),同步在线程释放和获取间是成对的(pairwise),释放操作与获取操作同步就能读取已写入的值。
代码5.7 获取-释放不意味着统一操作顺序 使用获取-释放序(而非序列一致方式),对代码5.4的一次重写。
1 |
|
断言③可能会触发(就如同自由排序那样),因为在加载x②和y①时,可能读取到false。因为x和y是由不同线程写入,所以序列中的每一次释放和获取都不会影响到其他线程的操作。
图5.6 获取-释放,以及先行过程
对于读取的结果,两个(读取)线程看到的是两个完全不同的世界。
代码5.8 获取-释放序操作会影响释放操作
1 |
|
读取y③时会得到true,和存储时写入的一样②。存储使用的是memory_order_release,读取使用的是memory_order_acquire,存储与读取就同步了。因为这两个操作是由同一个线程串行完成的,所以存储x①的操作先行于存储y②的操作。
对y的存储同步与对y的加载,存储x也就先行于对y的加载,并且扩展先行于x的读取。因此,加载x的值必为true,并且断言⑤不会触发。如果对于y的加载不是在while循环中,情况可能就会有所不同。加载y的时候可能会读取到false,这种情况下对于读取到的x是什么值没有要求了。为了保证同步,加载和释放操作必须成对。
释放操作存储的值必须要让获取操作看到。当存储②或加载③都是一个释放操作时,对x的访问就无序了,也就无法保证④处读到的是true,并且还会触发断言。
获取-释放序传递同步
为了考虑传递顺序,至少需要三个线程。第一个线程用来修改共享变量,第二个线程使用“加载-获取”读取由“存储-释放”操作过的变量,并且再对第二个变量进行“存储-释放”操作。最后,由第三个线程通过“加载-获取”读取第二个共享变量,并提供“加载-获取”操作来读取被“存储-释放”操作写入的值。为了保证同步关系,即便是中间线程没有对共享变量做任何操作,第三个线程也可以读取第一个线程操作过的变量。
代码5.9 使用获取和释放序传递同步
1 | std::atomic<int> data[5]; |
thread_1将数据存储到data中先行于存储sync1①(它们在同一个线程内)。因为加载sync1①的是一个while循环,它最终会看到thread_1存储的值。因此,对于sync1的存储先行于最终对于sync1的加载(在while循环中)。thread_3的加载操作④,位于存储sync2③操作的前面。存储sync2③因此先行于thread_3的加载④,加载又先行于存储sync2③,存储sync2又先行于加载sync2④,加载syn2又先行于加载data。因此,thread_1存储数据到data的操作先行于thread_3中对data的加载,并且保证断言都不会触发。
将sync1和sync2通过在thread_2中使用“读-改-写”操作(memory_order_acq_rel)合并成一个独立的变量。其中会使用compare_exchange_strong()来保证thread_1对变量只进行一次更新:
1 | std::atomic<int> sync(0); |
使用memory_order_acq_rel语义的“读-改-写”操作,每一个动作都包含获取和释放操作,所以可以和之前的存储操作进行同步,并且可以对随后的加载操作进行同步,就像上面例子一样。
释放队列与同步
通过线程在存储和加载操作之间有(有序的)多个“读-改-写”操作(所有操作都已经做了适当的标记),所以可以获取原子变量存储与加载的同步关系。存储操作标记为memory_order_release,memory_order_acq_rel或memory_order_seq_cst,加载标记为memory_order_consum,memory_order_acquire或memory_order_sqy_cst,并且操作链上的每一加载操作都会读取之前操作写入的值,因此链上的操作构成了一个释放序列(release sequence),并且初始化存储同步(对应memory_order_acquire或memory_order_seq_cst)或是前序依赖(对应memory_order_consume)的最终加载,操作链上的任何原子“读-改-写”操作可以拥有任意个内存序(甚至是memory_order_relaxed)。
假设使用atomic<int>
对共享队列的元素进行计数:
代码5.11 使用原子操作从队列中读取数据
1 |
|
只有一个消费者线程时还好,fetch_sub()带有memory_order_acquire的读取操作,并且存储操作是带有memory_order_release语义,所以存储与加载同步,线程可以从缓存中读取元素。当有两个读取线程时,第二个fetch_sub()操作将看到第一个线程修改的值,且没有值通过store写入其中。先不管释放序列的规则,第二个线程与第一个线程不存在先行关系,并且对共享内存中值的读取也不安全。除非第一个fetch_sub()是带有memory_order_release语义,为两个消费者线程建立了不必要的同步。无论是释放序列的规则,还是具有memory_order_release语义的fetch_sub操作,第二个消费者看到的是一个空的queue_data,无法从其获取任何数据,并且还会产生条件竞争。
不过,第一个fetch_sub()对释放顺序做了一些事情,所以store()能同步与第二个fetch_sub()操作。两个消费者线程间不需要同步关系。过程在图5.7中展示,其中虚线表示的就是释放顺序,实线表示的是先行关系。
图5.7 代码5.11中对队列操作的释放顺序
操作链中可以有任意数量的链接,提供的都是“读-改-写”操作,比如fetch_sub(),store(),每一个都会与使用memory_order_acquire语义的操作进行同步。
栅栏
栅栏操作会对内存序列进行约束,使其无法对任何数据进行修改,典型的做法是与使用memory_order_relaxed约束序的原子操作一起使用。
栅栏属于全局操作,执行栅栏操作可以影响到在线程中的其他原子操作。因为这类操作就像画了一条任何代码都无法跨越的线一样,所以栅栏操作通常也被称为内存栅栏(memory barriers)。
代码5.12 栅栏可以让自由操作变的有序
1 |
|
因为加载y的操作④读取③处存储的值,所以释放栅栏②与获取栅栏⑤同步。①处存储x先行于⑥处加载x,最后x读取出来必为true,并且不会触发断言⑦。
这两个栅栏都是必要的:需要在一个线程中进行释放,然后在另一个线程中进行获取,这样才能构建同步关系。
当获取操作能看到释放栅栏操作后的存储结果,那么这个栅栏就与获取操作同步。并且,当加载操作在获取栅栏操作前,看到一个释放操作的结果,那么这个释放操作同步于获取栅栏。
虽然,栅栏同步依赖于读取/写入的操作发生于栅栏之前/后,但是这里有一点很重要:同步点,就是栅栏本身。当执行代码5.12中的write_x_then_y,并且在栅栏操作之后对x进行写入,就像下面的代码一样。触发断言的条件就不保证一定为true了,尽管写入x的操作在写入y的操作之前发生。
1 | void write_x_then_y() |
栅栏不会分开这里的两个操作,并且也不再有序。只有当栅栏出现在存储x和存储y操作之间时,顺序才是硬性的。当然,栅栏是否存在不会影响任何拥有先行关系的执行序列。
原子操作对非原子的操作排序
使用普通的非原子bool类型来替换代码5.12中的x,行为和替换前完全一样。
代码5.13 使用非原子操作执行序列
1 |
|
栅栏仍然为存储x①和存储y②,还为加载y③和加载x④提供一个执行序,并且这里存储x和加载x之间仍然有一个先行关系,所以不会触发断言⑤。②中的存储和③中对y的加载必须是原子操作,否则会在y上产生条件竞争。当读取线程看到存储到y的操作,栅栏将会对x执行有序的操作,这个执行序意味着x上不存在条件竞争。
不仅是栅栏可对非原子操作排序,memory_order_release/memory_order_consume也为非原子访问排序,可以动态分配对象,并且本章中的许多例子都可以使用普通的非原子操作,去替代memory_order_relaxed的操作。
非原子操作排序
某一内存位置上,lock()作为一个获取操作,在同样的位置上unlock()作为一个释放操作。
以下的工具都可以提供同步:
std::thread
- std::thread构造新线程时,构造函数与调用函数或新线程的可调用对象间的同步。
- 对std::thread对象调用join,可以和对应的线程进行同步。
std::mutex, std::timed_mutex, std::recursive_mutex, std::recursibe_timed_mutex
- 对给定互斥量对象调用lock和unlock,以及对try_lock,try_lock_for或try_lock_until,会形成该互斥量的锁序。
- 对给定的互斥量调用unlock,需要在调用lock或成功调用try_lock,try_lock_for或try_lock_until之后,这样才符合互斥量的锁序。
- 对try_lock,try_lock_for或try_lock_until失败的调用,不具有任何同步关系。
std::shared_mutex , std::shared_timed_mutex
- 对给定互斥量对象调用lock、unlock、lock_shared和unlock_shared,以及对 try_lock , try_lock_for , try_lock_until , try_lock_shared , try_lock_shared_for或 try_lock_shared_until的成功调用,会形成该互斥量的锁序。
- 对给定的互斥量调用unlock,需要在调用lock或shared_lock,亦或是成功调用try_lock , try_lock_for, try_lock_until, try_lock_shared, try_lock_shared_for或try_lock_shared_until之后,才符合互斥量的锁序。
- 对try_lock,try_lock_for,try_lock_until,try_lock_shared,try_lock_shared_for或try_lock_shared_until 失败的调用,不具有任何同步关系。
std::shared_mutex和std::shared_timed_mutex
- 成功的调用std::promise对象的set_value或set_exception与成功的调用wait或get之间同步,或是调用wait_for或wait_until的返回例如future状态std::future_status::ready与promise共享同步状态。
- 给定std::promise对象的析构函数,该对象存储了一个std::future_error异常,成功的调用wait或get后,共享同步状态与promise之间的同步,或是调用wait_for或wait_until返回的future状态std::future_status::ready时,与promise共享同步状态。
std::packaged_task , std::future和std::shared_future
- 成功的调用std::packaged_task对象的函数操作符与成功的调用wait或get之间同步,或是调用wait_for或wait_until的返回future状态std::future_status::ready与打包任务共享同步状态。
- std::packaged_task对象的析构函数,该对象存储了一个std::future_error异常,其共享同步状态与打包任务之间的同步在于成功的调用wait或get,或是调用wait_for或wait_until返回的future状态std::future_status::ready与打包任务共享同步状态。
std::async , std::future和std::shared_future
- 使用std::launch::async策略性的通过std::async启动线程执行任务与成功的调用wait和get之间是同步的,或调用wait_for或wait_until返回的future状态std::future_status::ready与产生的任务共享同步状态。
- 使用std::launch::deferred策略性的通过std::async启动任务与成功的调用wait和get之间是同步的,或调用wait_for或wait_until返回的future状态std::future_status::ready与promise共享同步状态。
std::experimental::future , std::experimental::shared_future和持续性
- 异步共享状态变为就绪的事件与该共享状态上调度延续函数的调用同步。
- 持续性函数的完成与成功调用wait或get的返回同步,或调用wait_for或wait_until返回的期望值状态std::future_status::ready与调用then构建的持续性返回的future同步,或是与在调度用使用这个future的操作同步。
std::experimental::latch
- 对std::experimental::latch实例调用count_down或count_down_and_wait与在该对象上成功的调用wait或count_down_and_wait之间是同步的。
std::experimental::barrier
- 对std::experimental::barrier实例调用arrive_and_wait或arrive_and_drop与在该对象上随后成功完成的arrive_and_wait之间是同步的。
std::experimental::flex_barrier
- 对std::experimental::flex_barrier实例调用arrive_and_wait或arrive_and_drop与在该对象上随后成功完成的arrive_and_wait之间是同步的。
- 对std::experimental::flex_barrier实例调用arrive_and_wait或arrive_and_drop与在该对象上随后完成的给定函数之间是同步的。
- 对std::experimental::flex_barrier实例的给定函数的返回与每次对arrive_and_wait的调用同步,当调用给定函数线程会在栅栏处阻塞等待。
std::condition_variable和std::condition_variable_any
- 条件变量不提供任何同步关系,它们是对忙等待的优化,所有同步都由互斥量提供。