0%

C++并发编程笔记


1.并发

并发,指两个或两个以上的独立活动同时发生。

计算机的并发,指在单个系统里同时执行多个独立的任务。

并发的两种方式:真正并行 vs 任务切换

1-1

四个任务在双核处理器上的任务切换,仍是将任务整齐地划分为同等大小子任务块的理想情况。实际上,许多因素会使得任务分割不均或调度不规则。

1-2

四个任务在两个核心之间的切换

并发的方式

  • 多进程并发
  • 多线程并发

多进程并发:将应用程序分为多个独立的进程同时运行,就像同时进行网页浏览和文字处理一样。

独立的进程可以通过进程间的通信渠道传递讯息(信号、套接字、文件、管道等等)。

1-3

缺点:

  • 进程间的通信非常复杂,速度很慢,因为操作系统会对进程进行保护,以避免一个进程去修改另一个进程的数据。
  • 运行多个进程的固定开销大:需要时间启动进程,操作系统需要资源来管理进程等等。

多线程并发:在单进程中运行多个线程。

线程很像轻量级的进程:每个线程相互独立运行,并且可以在不同的指令序列中运行。不过,进程中的所有线程都共享地址空间,并且能访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。

1-4

一个进程中的两个线程,正在通过共享内存进行通信。

地址空间共享,以及缺少线程间的数据保护,使得操作系统记录的工作量减小,所以使用多线程的开销远远小于多进程。不过,共享内存的灵活性是有代价的:如果多个线程访问数据,那么必须确保每个线程所访问到的数据一致,这就需要对线程通信做大量的工作。

使用并发的原因

原因有二:分离关注点(SOC)、性能

  • 分离关注点。通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能。即使一些操作需要同时进行,依旧可以使用并发,分离不同的功能区域。

  • 性能。两种利用并发提高性能的方式:

    • 任务并行:将一个单个任务分成几部分并行运行,从而降低总运行时间
    • 数据并行:一个线程执行算法的一部分,而另一个线程执行算法的另一个部分,每个线程在不同的数据块上执行相同的操作

什么时候不使用并发:

收益比不上成本。使用并发的代码在很多情况下难以理解,因此编写和维护多线程代码会产生脑力成本,而增加的复杂性也可能会引起更多的错误。

线程的资源有限。如果太多的线程同时运行,则会消耗很多操作系统资源,从而使得操作系统整体上运行得更加缓慢。不仅如此,因为每个线程都需要一个独立的堆栈,所以运行太多的线程也会耗尽进程的可用内存或地址空间。

运行越多的线程,操作系统就需要越多的上下文切换,每一次切换都需要耗费时间。所以在某些时候,增加线程实际上会降低应用的整体性能。

并发程序

C++多线程程序和其他C++程序差不多。唯一的区别在于某些函数可以并发运行,所以需要确保共享数据在并发访问时是安全的。

例子:打印“Hello World”的程序

单线程运行:

1
2
3
4
5
#include <iostream>
int main()
{
std::cout << "Hello World\n";
}

启动独立的线程显示信息:

1
2
3
4
5
6
7
8
9
10
11
#include <iostream>
#include <thread> // 1 标准库中对多线程支持的声明
void hello() // 2 独立的函数打印信息
{
std::cout << "Hello Concurrent World\n";
}
int main()
{
std::thread t(hello); // 3 构造名为t的std::thread对象拥有新函数hello()作为其执行函数
t.join(); // 4 当前线程等待t线程终止之后才从t.join()返回。创建线程等待std::thread对象创建的线程
}

2.线程管理

线程基本操作

启动线程

使用C++线程库启动线程,就是构造std::thread对象:

1
2
void do_some_work();
std::thread my_thread(do_some_work);

需要包含<thread>头文件,std::thread可以通过有函数操作符类型的实例进行构造:

1
2
3
4
5
6
7
8
9
10
11
12
class background_task
{
public:
void operator()() const
{
do_something();
do_something_else();
}
};

background_task f;
std::thread my_thread(f);

代码中,提供的函数对象会复制到新线程的存储空间中,函数对象的执行和调用都在线程的内存空间中进行。

Lambda表达式:

1
2
3
4
std::thread my_thread([]{
do_something();
do_something_else();
});

线程启动后是要等待线程结束,还是让其自主运行。

std::thread对象销毁之前还没有做出决定,程序就会终止。

即便是有异常存在,也需要确保线程能够正确汇入(joined)或分离(detached)。

如果不等待线程汇入 ,就必须保证线程结束之前,访问数据的有效性。

代码2.1:函数已经返回,线程依旧访问局部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
do_something(i); // 1 潜在访问隐患:空引用
}
}
};

void oops()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach(); // 2 不等待线程结束
} // 3 新线程可能还在运行

可能会出现新线程访问已经销毁的变量。过程如下:

主线程 新线程
使用some_local_state构造my_func
开启新线程my_thread
启动
调用func::operator()
将my_thread分离 执行func::operator();可能会在do_something中调用some_local_state的引用
销毁some_local_state 持续运行
退出oops函数 持续执行func::operator();可能会在do_something中调用some_local_state的引用 –> 导致未定义行为

常规处理方法:

  • 将数据复制到线程中。对于对象中包含的指针和引用需谨慎。使用访问局部变量的函数去创建线程是一个糟糕的主意。

  • 可以通过join()函数来确保线程在主函数完成前结束。

等待线程完成

如需等待线程,需要使用**join()**。

将例2-1中的my_thread.detach()替换为my_thread.join(),就可以确保局部变量在线程完成后才销毁。

只能对一个线程使用一次join(),一旦使用过join(),std::thread对象就不能再次汇入了。当对其使用joinable()时,将返回false。

特殊情况下的等待

避免应用被抛出的异常所终止。通常,在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免生命周期的问题。

代码2.2 等待线程完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct func; // 定义在代码2.1中
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join(); // 1 抛出异常
throw;
}
t.join(); // 2 正常退出
}

使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),提供一个类,在析构函数中使用join()。

代码2.3 使用RAII等待线程完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_):
t(t_)
{}
~thread_guard()
{
if(t.joinable()) // 1 判断线程是否可以汇入
{
t.join(); // 2 汇入
}
}
thread_guard(thread_guard const&)=delete; // 3 不让编译器自动生成拷贝或赋值
thread_guard& operator=(thread_guard const&)=delete;
};

struct func; // 定义在代码2.1中

void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
do_something_in_current_thread();
} // 4 thread_guard对象g是第一个被销毁的,
// 即使do_something_in_current_thread抛出一个异常,这个销毁依旧会发生。

后台运行线程

detach()会让线程在后台运行,这就意味着与主线程不能直接交互。

守护线程(daemon threads):分离线程,UNIX中是指没有任何显示的接口,在后台运行的线程。特点是长时间运行。

分离线程只能确定线程什么时候结束,发后即忘的任务使用分离线程。

代码2-4 使用分离线程处理文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void edit_document(std::string const& filename)
{
open_document_and_display_gui(filename);
while(!done_editing())
{
user_command cmd=get_user_input();
if(cmd.type==open_new_document)
{
std::string const new_name=get_filename_from_user();
std::thread t(edit_document,new_name); // 1 打开新文档,可以传入函数名和函数所需参数
t.detach(); // 2 分离线程,新线程打开另一个文件,edit_document函数可以复用,并通过传参的形式打开新的文件
}
else
{
process_user_input(cmd);
}
}
}

传递参数

向可调用对象或函数传递参数很简单,只需要将这些参数作为 std::thread构造函数的附加参数即可。

即使函数中的参数是引用的形式,拷贝操作也会执行。

1
2
void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

代码创建了一个调用f(3, “hello”)的线程。函数f需要一个std::string对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *类型,线程的上下文完成字面值向std::string的转化。

1
2
3
4
5
6
7
8
void f(int i,std::string const& s);
void not_oops(int some_param)
{
char buffer[1024]; // buffer是一个指针变量,指向局部变量
sprintf(buffer,"%i",some_param);
std::thread t(f,3,std::string(buffer)); // 使用std::string,将字面值转化为std::string避免悬空指针
t.detach();
}

内部代码会将拷贝的参数以右值的方式进行传递,这是为了那些只支持移动的类型

1
2
3
4
5
6
7
8
9
void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget,w,data); // 2
display_status();
t.join();
process_widget_data(data);
}

因为函数期望的是一个非常量引用作为参数(而非右值),所以会在编译时出错。可以使用std::ref将参数转换成引用的形式。因此可将线程的调用改为以下形式:

1
std::thread t(update_data_for_widget,w,std::ref(data));

update_data_for_widget就会收到data的引用,而非data的拷贝副本,这样代码就能顺利的通过编译了。

如果提供的参数支持移动(move),不能拷贝。

std::unique_ptr(C++11中的智能指针),为动态分配的对象提供内存自动管理机制(类似垃圾回收机制)。同一时间内,只允许一个std::unique_ptr实例指向一个对象,并且当这个实例销毁时,指向的对象也将被删除。

移动构造函数(move constructor)和移动赋值操作符(move assignment operator)允许一个对象的所有权在多个std::unique_ptr实例中传递。使用“移动”转移对象所有权后,就会留下一个空指针。

std::move转移动态对象的所有权到线程中:

1
2
3
4
5
6
void process_big_object(std::unique_ptr<big_object>);

std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));
// big_object 对象的所有权首先被转移到新创建线程的的内部存储中,之后再传递给process_big_object函数。

线程的所有权可以在多个std::thread实例中转移,这依赖于std::thread实例的可移动不可复制性。

可移动性:使得开发者可以自己决定,哪个实例拥有线程实际执行的所有权。

不可复制性:在某一时间点,一个std::thread实例只能关联一个执行线程。

转移所有权

例子,创建两个执行进程,并在std::thread实例之间(t1,t2和t3)转移所有权:

1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function); // 1 新线程与t1相关联
std::thread t2 = std::move(t1); // 2 t1所有权转移给t2,执行some_function的函数线程与t2关联。
t1=std::thread(some_other_function); // 3 临时std::thread对象相关的线程启动
std::thread t3; // 4 默认构造方式创建,没有与任何线程进行关联
t3=std::move(t2); // 5 移动操作后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。
t1=std::move(t3); // 6 赋值操作将使程序崩溃

最后的移动操作,将some_function线程的所有权转移给t1,但是t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。

std::terminate()不抛出异常,是noexpect函数,保证与std::thread的析构函数的行为一致。

线程对象析构前,显式的等待线程完成或分离它,即不能通过赋新值给std::thread对象的方式”丢弃“一个线程。

std::thread支持移动,线程的所有权可以在函数外进行转移。

代码2.5 函数返回std::thread对象

1
2
3
4
5
6
7
8
9
10
11
12
std::thread f()
{
void some_function();
return std::thread(some_function);
}

std::thread g()
{
void some_other_function(int);
std::thread t(some_other_function,42);
return t;
}

所有权可以在函数内部传递,允许std::thread实例作为参数进行传递:

1
2
3
4
5
6
7
8
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}

为了确保线程在程序退出前完成,定义了scoped_thread类。

代码2.6 scoped_thread的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class scoped_thread
{
std::thread t;
public:
explicit scoped_thread(std::thread t_): // 1
t(std::move(t_))
{
if(!t.joinable()) // 2 检测线程是否可汇入放在构造函数中,不可汇入则抛出异常
throw std::logic_error(“No thread”);
}
~scoped_thread()
{
t.join(); // 3
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};

struct func; // 定义在代码2.1中

void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state))); // 4 新线程会直接传递到scoped_thread中
do_something_in_current_thread();
} // 5 scoped_thread对象就会销毁,然后在析构函数中完成汇入3

代码2.7 joining_thread类的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class joining_thread
{
std::thread t;
public:
joining_thread() noexcept=default;
template<typename Callable,typename ... Args>
explicit joining_thread(Callable&& func,Args&& ... args):
t(std::forward<Callable>(func),std::forward<Args>(args)...)
{}
explicit joining_thread(std::thread t_) noexcept:
t(std::move(t_))
{}
joining_thread(joining_thread&& other) noexcept:
t(std::move(other.t))
{}
joining_thread& operator=(joining_thread&& other) noexcept
{
ifjoinable()){
join();
}
t = std::move(other.t);
return *this;
}
joining_thread& operator=(std::thread other) noexcept
{
if(joinable())
join();
t=std::move(other);
return *this;
}
~joining_thread() noexcept
{
if(joinable())
join();
}
void swap(joining_thread& other) noexcept
{
t.swap(other.t);
}
std::thread::id get_id() const noexcept{
return t.get_id();
}
bool joinable() const noexcept
{
return t.joinable();
}
void join()
{
t.join();
}
void detach()
{
t.detach();
}
std::thread& as_thread() noexcept
{
return t;
}
const std::thread& as_thread() const noexcept
{
return t;
}
};

std::thread中对移动语义的支持,也适用于使用std::thread移动敏感(move-aware)容器(比如,std::vector<>)。了解这些后,就可以量产了一些线程,并且等待它们结束,代码如下所示。

代码2.8 量产线程,等待它们结束

1
2
3
4
5
6
7
8
9
10
11
12
void do_work(unsigned id);

void f()
{
std::vector<std::thread> threads;
for (unsigned i = 0; i < 20; ++i)
{
threads.emplace_back(do_work,i); // 产生线程
}
for (auto& entry : threads) // 对每个线程调用 join()
entry.join();
}

代码2.8中每个线程所做的工作都是独立的,并且结果会受到共享数据的影响。

如果f()有返回值,这个返回值就依赖于线程得到的结果。写入返回值之前,程序会检查使用共享数据的线程是否终止。

确定线程数量

std::thread::hardware_concurrency()在新版C++中非常有用,返回并发线程的数量。

代码2.9将整体工作拆分成小任务,交给每个线程去做,并设置最小任务数,避免产生太多的线程,程序会在操作数量为0时抛出异常。比如,std::thread无法启动线程,就会抛出异常。

代码2.9 并行版的std::accumulate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length) // 1 输入范围为空,返回init的值
return init;

unsigned long const min_per_thread=25; // 线程中最小任务数
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2 启动线程最大数量

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads= // 3 std::thread::hardware_concurrency()返回0时,选择一个合适数字
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

unsigned long const block_size=length/num_threads; // 4 每个线程中处理的元素数量

std::vector<T> results(num_threads); // 存放中间结果
std::vector<std::thread> threads(num_threads-1); // 5 在启动之前已经有了一个线程(主线程),所以启动的线程数必须比num_threads少1

Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6 block_end迭代器指向当前块的末尾,把一个迭代器移动 n 个位置
threads[i]=std::thread( // 7 启动一个新线程为当前块累加结果
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // 8 当迭代器指向当前块的末尾时,启动下一个块
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9 处理最终块的结果

for (auto& entry : threads)
entry.join(); // 10 创建线程

return std::accumulate(results.begin(),results.end(),init); // 11 累加所有结果
}

注意:

  • T类型的加法不满足结合律(比如,对于float型或double型,在进行加法操作时,系统很可能会做截断操作),因为对范围中元素的分组,会导致parallel_accumulate得到的结果可能与std::accumulate的结果不同。

  • 迭代器必须是前向迭代器。

  • 对于results容器,需要保证T有默认构造函数。

线程标识

std::thread::id类型,获取方式:

  • 调用std::thread对象的成员函数get_id()来直接获取。如果std::thread对象没有与任何执行线程相关联,get_id()将返回std::thread::type默认构造值,这个值表示“无线程”。
  • 当前线程中调用std::this_thread::get_id()(这个函数定义在<thread>头文件中)也可以获得线程标识。

std::thread::id对象可以自由的拷贝和对比,因为标识符可以复用

如果两个对象的std::thread::id相等,那就是同一个线程,或者都“无线程”。

如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有线程。

标准库提供std::hash<std::thread::id>容器,std::thread::id也可以作为无序容器的键值。

启动其他线程前,可以通过std::this_thread::get_id()得到自己的线程ID。每个线程都要检查一下,其拥有的线程ID是否与初始线程的ID相同。

1
2
3
4
5
6
7
8
9
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}

线程ID在容器中可作为键值.

可以使用输出流(std::cout)来记录一个std::thread::id对象的值。

1
std::cout << std::this_thread::get_id();

3.共享数据

线程间的问题在于修改共享数据,会使不变量遭到破坏。

并行中常见错误:条件竞争(race condition)。

共享数据的问题

条件竞争

并发中的竞争条件,取决于一个以上线程的执行顺序,每个线程都抢着完成自己的任务。

当不变量遭到破坏时,才会产生条件竞争。

操作要访问两个独立的数据块,独立的指令会对数据块将进行修改,并且其中一个线程可能正在进行修改,另一个线程就对数据块进行了访问。因为出现的概率低,很难查找,也很难复现。

避免恶性条件竞争

  • 最简单的办法:对数据结构采用某种保护机制,确保只有修改线程才能看到不变量的中间状态。

    从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。

  • 对数据结构和不变量进行修改,修改完的结构必须能完成一系列不可分割的变化,也就保证了每个不变量的状态,这就是所谓的无锁编程

  • 使用事务的方式去处理数据结构的更新。一些数据和读取都存储在事务日志中,然后将之前的操作进行合并,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”(software transactional memory (STM))

使用互斥量

访问共享数据前,将数据锁住,在访问结束后,再将数据解锁。线程库需要保证,当线程使用互斥量锁住共享数据时,其他的线程都必须等到之前那个线程对数据进行解锁后,才能进行访问数据。

互斥量会造成死锁,或对数据保护的太多(或太少)

互斥量

实例化std::mutex创建互斥量实例,成员函数lock()可对互斥量上锁,unlock()为解锁。

不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用unlock()(包括异常的情况)。

std::lock_guard:在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。

代码3.1 使用互斥量保护列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list; // 1 全局变量
std::mutex some_mutex; // 2 全局的互斥量

void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 3 加锁,对数据访问互斥的
some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex); // 4 加锁后,看不到add_to_list()修改的列表
return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

C++17中添加了一个新特性,称为模板类参数推导,类似std::lock_guard这样简单的模板类型,其模板参数列表可以省略。③和④的代码可以简化成:

1
std::lock_guard guard(some_mutex);

C++17中的一种加强版数据保护机制——std::scoped_lock,所以在C++17的环境下,上面的这行代码也可以写成:

1
std::scoped_lock guard(some_mutex);

大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。

当其中一个成员函数返回的是保护数据的指针或引用时,也会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)保护数据,而不会被互斥锁限制。这就需要对接口谨慎设计,要确保互斥量能锁住数据访问,并且不留后门。

保护共享数据

代码3.2 无意中传递了保护数据的引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class some_data
{
int a;
std::string b;
public:
void do_something();
};

class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); // 1 传递“保护”数据给用户函数
}
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
unprotected=&protected_data;
}

data_wrapper x;
void foo()
{
x.process_data(malicious_function); // 2 传递一个恶意函数
unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}

例子中process_data看起来没有问题,std::lock_guard对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function传递进去②,可以在没有锁定互斥量的情况下调用do_something()

函数foo()中调用unprotected->do_something()的代码未能被标记为互斥。

切勿将受保护数据的指针或引用传递到互斥锁作用域之外。

接口间的条件竞争

代码3.3 std::stack容器的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);

bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
template <class... Args> void emplace(Args&&... args); // C++14的新特性
};

虽然empty()和size()可能在返回时是正确的,但结果不可靠。当返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的数值就有问题了。

非共享的栈对象,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:

1
2
3
4
5
6
stack<int> s;
if (! s.empty()){ // 1 判断栈是否为空
int const value = s.top(); // 2 非空调用栈顶
s.pop(); // 3 弹出栈顶元素
do_something(value);
}

对于共享的栈对象,这样的调用顺序就不再安全,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。

解决:改变接口的设计

先获取顶部元素(top()),然后从栈中移除(pop())。这样,在不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。当问题是堆空间不足,应用可能会释放一些内存,然后再进行尝试。

不幸的是,这样的分割却制造了本想避免的条件竞争。

选项1:传入一个引用

将变量的引用作为参数,传入pop()函数中获取“弹出值”:

1
2
std::vector<int> result;
some_stack.pop(result);

缺点:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。

选项2:无异常抛出的拷贝构造函数或移动构造函数

一个有用的选项可以限制对线程安全栈的使用,并且能让栈安全的返回所需的值,而不抛出异常。虽然安全,但非可靠。

那些有抛出异常的拷贝构造函数,但没有移动构造函数的类型往往更多。

选项3:返回指向弹出值的指针

指针的优势是自由拷贝,并且不会产生异常,这样就能避免Cargill提到的异常问题了。

缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。

使用std::shared_ptr,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,就不需要new和delete操作。

例:定义线程安全的堆栈

代码3.4中是一个接口没有条件竞争的堆栈类定义,它实现了选项1和选项3:重载了pop(),使用局部引用去存储弹出值,并返回std::shared_ptr<>对象。它有一个简单的接口,只有两个函数:push()和pop();

代码3.4 线程安全的堆栈类定义(概述)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <exception>
#include <memory> // For std::shared_ptr<>

struct empty_stack: std::exception
{
const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
public:
threadsafe_stack();
threadsafe_stack(const threadsafe_stack&);
threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除

void push(T new_value);
std::shared_ptr<T> pop();
void pop(T& value);
bool empty() const;
};

当栈为空时,pop()函数会抛出一个empty_stack异常,所以在empty()函数被调用后,其他部件还能正常工作。

使用std::shared_ptr可以避免内存分配管理的问题,并避免多次使用new和delete操作。堆栈中的五个操作,现在就剩下三个:push(), pop()和empty()(这里empty()都有些多余)。

简化接口更有利于数据控制,可以保证互斥量将操作完全锁住。

代码3.5 扩充(线程安全)堆栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception
{
const char* what() const throw() {
return "empty stack!";
};
};

template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;

public:
threadsafe_stack()
: data(std::stack<T>()){}

threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data; // 1 在构造函数体中的执行拷贝
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}

std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空

std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
data.pop();
return res;
}

void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();

value=data.top();
data.pop();
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

堆栈可以拷贝——拷贝构造函数对互斥量上锁,再拷贝堆栈。构造函数体中①的拷贝使用互斥量来确保复制结果的正确性,这样的方式比成员初始化列表好。

一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。

死锁

线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。因为他们都在等待对方释放互斥量,没有线程能工作。

避免死锁:让两个互斥量以相同顺序上锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。

选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!

std::lock——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。

代码3.6 交换操作中使用std::lock()std::lock_guard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}

friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); // 1 锁住两个互斥量
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2 创建实例
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
swap(lhs.some_detail,rhs.some_detail);
}
};

std::adopt_lock参数除了表示std::lock_guard可获取锁之外,还将锁交由std::lock_guard管理,就不需要std::lock_guard再去构建新的锁了。

std::lock要么将两个锁都锁住,要不一个都不锁。

C++17中,std::scoped_lock<>是一种新的RAII模板类型,与 std::lock_guard<>的功能相同,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。

互斥量支持构造时上锁,与std::lock的用法相同,解锁在析构中进行。代码3.6中swap()操作可以重写如下:

1
2
3
4
5
6
7
8
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m); // 1 通过传递的对象类型来构造实例,等价于
//std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
swap(lhs.some_detail,rhs.some_detail);
}

C++17新特性:自动推导模板参数。隐式参数模板类型推导机制。

std::scoped_lock的好处在于,可以将所有std::lock替换掉,从而减少错误的发生。

避免死锁

无锁的情况下,仅需要两个线程std::thread对象互相调用join()就能产生死锁。这种情况下,没有线程可以继续运行,因为他们正在互相等待。

避免嵌套锁

线程获得一个锁时,就别再去获取第二个。每个线程只持有一个锁,就不会产生死锁。

当需要获取多个锁,使用std::lock来做这件事(对获取锁的操作上锁),避免产生死锁。

避免在持有锁时调用外部代码

外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁。

使用固定顺序获取锁

当硬性要求获取两个或两个以上的锁,并且不能使用std::lock单独操作来获取它们时,最好在每个线程上,用固定的顺序获取它们(锁)。

例如,不同线程以相反顺序访问列表所造成的死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并试图获取节点A上的锁——经典的死锁场景。

线程1 线程2
锁住主入口的互斥量
读取头结点指针
锁住头结点互斥量
解锁主入口互斥量
锁住主入口互斥量
读取head->next指针 锁住尾结点互斥量
锁住next结点的互斥量 读取tail->prev指针
读取next->next指针 解锁尾结点的互斥量
锁住A结点的互斥量 锁住C结点的互斥量
读取A->next指针(也就是B结点) 读取C->next指针(也就是B结点)
锁住B结点互斥量
阻塞,尝试锁住B结点的互斥量 解锁C结点互斥量
读取B->prev指针(也就是A结点)
阻塞,尝试锁住A结点的互斥量
死锁!

当A、C节点中间的B节点删除时,有线程在已获取A和C上的锁后,还要获取B节点上的锁时,就可能发生死锁。

解决:定义遍历的顺序,一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁,不允许反向遍历链表。

使用层次锁结构

当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。可以通过每个互斥量对应的层数,以及每个线程使用的互斥量,在运行时检查锁定操作是否可以进行。

层级互斥量不可能死锁,因为互斥量本身会严格遵循约定进行上锁。当多个互斥量在是在同一级上时,不能同时持有多个锁。

代码3.7 使用层次锁来避免死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
hierarchical_mutex high_level_mutex(10000); // 1 3个hierarchical_mutex实例,逐渐递减的层级构造
hierarchical_mutex low_level_mutex(5000); // 2
hierarchical_mutex other_mutex(6000); // 3

int do_low_level_stuff(); // 假设do_low_level_stuff不会对任何互斥量进行上锁

int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 4 low_level_func为层级最低的函数,并且会对low_level_mutex进行上锁
return do_low_level_stuff();
}

void high_level_stuff(int some_param);

void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 6 锁的层级更高
high_level_stuff(low_level_func()); // 5
}

void thread_a() // 7 遵守规则,运行成功
{
high_level_func();
}

void do_other_stuff();

void other_stuff()
{
high_level_func(); // 10
do_other_stuff();
}

void thread_b() // 8 other_stuff()调用high_level_func()时,就违反了层级结构,运行失败
{
std::lock_guard<hierarchical_mutex> lk(other_mutex); // 9 中层级的数据已被保护
other_stuff();
}

代码3.8 简单的层级互斥量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class hierarchical_mutex
{
std::mutex internal_mutex;

unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;

static thread_local unsigned long this_thread_hierarchy_value; // 1 当前线程的层级值

void check_for_hierarchy_violation()
{
if(this_thread_hierarchy_value <= hierarchy_value) // 2 判断层级抛出异常
{
throw std::logic_error(“mutex hierarchy violated”);
}
}

void update_hierarchy_value()
{
previous_hierarchy_value=this_thread_hierarchy_value; // 3
this_thread_hierarchy_value=hierarchy_value;
}

public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0)
{}

void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock(); // 4 lock()代表内部互斥锁已锁住
update_hierarchy_value(); // 5 一旦成功锁住,更新层级值
}

void unlock()
{
if(this_thread_hierarchy_value!=hierarchy_value)
throw std::logic_error(“mutex hierarchy violated”); // 9
this_thread_hierarchy_value=previous_hierarchy_value; // 6 对层级值进行保存
internal_mutex.unlock();
}

bool try_lock()
{
check_for_hierarchy_violation();
if(!internal_mutex.try_lock()) // 7 try_lock()与lock()的功能相似,除了在调用internal_mutex的try_lock()失败时,不能持有对应锁,所以不必更新层级值,并直接返回false。
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // 8 初始化为最大值

超越锁的延伸扩展

死锁不仅仅会发生在锁之间,也会发生在同步构造中(可能会产生一个等待循环)

如果去等待一个线程结束,应该确定这个线程的层级,这样一个线程只需要等待比其层级低的线程结束即可。

std::lock()std::lock_guard可组成简单的锁,并覆盖大多数情况,但有时需要更多的灵活性,可以使用标准库提供的std::unique_lock模板。

std::unique_lock——灵活的锁

std::unique_lock实例不带互斥量:信息已存储,且已更新。在创建时自动加锁,在销毁时自动解锁****

unique_lock是对lock_guard的扩展,允许在生命周期内再调用lock和unlock来加解锁以切换锁的状态。

但是内存会占用比较多的空间,并且比std::lock_guard稍慢一些。

使用时std::lock_guard<std::mutex> lk(mtx);直接替换成std::unique_lock<std::mutex> lk(mtx);

lock_guard和unique_lock第二参数的作用:

  • std::adopt_lock: 假设调用一方已经拥有了互斥量的所有权(已经lock成功了);通知lock_guard不需要再构造函数中lock这个互斥量了。

    1
    2
    3
    4
    5
    {
    std::lock_guard<std::mutex> lock(g_mtx, std::adopt_lock);
    g_mtx.lock();
    临界区或临界资源
    }

    对于lock_guard第二参数类型只有一种,锁管理器构造的时候不会自动对可锁对象上锁;由可锁对象自己加锁;等锁管理器析构的时候自动解锁。

    如果指定了第二参数,但是没有lock,锁管理器析构的时候解锁了无拥有权的可锁对象,导致异常。

    多锁场景下,会调用std::lock避免死锁的出现,但是这个方法要求锁管理器不能拥有可锁对象,由std::lock方法执行锁操作。如果没有提供第二参数构造函数,那么就无法使用该方法。

    注意:使用该参数类型构造的锁管理器必须只能通过可锁对象进行lock,不可通过锁管理器进行lock,误用会导致程序异常。

  • std::defer_lock: 初始化了一个没有加锁的mutex。前提是自己不能先lock否则报异常。

    1
    2
    3
    4
    5
    {
    std::unique_lock<std::mutex> lock(g_mtx, std::defer_lock);
    lock.lock(); // 不能用g_mtx.lock(),第二次锁的时候会崩溃
    临界区或临界资源
    }

    锁管理器在构造的时候不主动lock且不拥有可锁对象;如果后续执行lock,锁管理器析构的时候自动解锁。

    注意:该类型构造的锁管理器只能通过锁管理器执行lock且拥有可锁对象。如果直接调用可锁对象进行锁操作后,会导致程序异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); // 1
std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); // 1 std::defer_lock 留下未上锁的互斥量
std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
swap(lhs.some_detail,rhs.some_detail);
}
};

std::unique_lock支持lock(), try_lock()和unlock()成员函数

不同域中互斥量的传递

std::unique_lock是可移动,但不可赋值的类型。

函数get_lock()锁住了互斥量,然后准备数据,返回锁的调用函数:

1
2
3
4
5
6
7
8
9
10
11
12
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk; // 1 lk在函数中被声明为自动变量,它不需要调用std::move(),可以直接返回
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock()); // 2 process_data()函数直接转移std::unique_lock实例的所有权
do_something();
}

std::unique_lock的灵活性同样也允许实例在销毁之前放弃拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock的成员函数提供类似于锁定和解锁的功能。

锁的粒度

锁的粒度:用来描述通过一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。

如果很多线程正在等待同一个资源(等待收银员对自己拿到的商品进行清点),当有线程持有锁的时间过长,这就会增加等待的时间(别等到结账的时候,才想起来蔓越莓酱没拿)。

std::unique_lock在这种情况下工作正常,调用unlock()时,代码不需要再访问共享数据。当再次需要对共享数据进行访问时,再调用lock()就可以了。

1
2
3
4
5
6
7
8
9
void get_and_process_data()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process=get_next_data_chunk();
my_lock.unlock(); // 1 不要让锁住的互斥量越过process()函数的调用
result_type result=process(data_to_process);
my_lock.lock(); // 2 为了写入数据,对互斥量再次上锁
write_result(data_to_process,result);
}

不需要让锁住的互斥量越过对process()函数的调用,所以可以在函数调用①前对互斥量进行手动解锁,之后对其再次上锁②。

代码3.10 比较操作符中一次锁住一个互斥量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Y
{
private:
int some_detail;
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m); // 1
return some_detail;
}
public:
Y(int sd):some_detail(sd){}

friend bool operator==(Y const& lhs, Y const& rhs)
{
if(&lhs==&rhs)
return true;
int const lhs_value=lhs.get_detail(); // 2
int const rhs_value=rhs.get_detail(); // 3
return lhs_value==rhs_value; // 4
}
};

比较操作符首先通过调用get_detail()成员函数检索要比较的值②③,函数在索引时被锁保护着①。比较操作符会在之后比较索引出来的值④。

虽然锁只持有一次的操作能减少锁持有的时间(这样能消除死锁的可能性),但这里有一个微妙的语义操作同时对两个锁住的值进行比较。

当操作符返回true时,就意味着在这个时间点上的lhs.some_detail与另一个时间点的rhs.some_detail相同。

这两个值在读取之后,可能会以任意方式修改。两个值会在②和③处进行交换,这样就会失去了比较的意义。比较可能会返回true,表明这两个值是相等的,实际上这两个值相等的情况可能就发生在一瞬间。

当持有锁的时间没有达到整个操作时间,就会让自己处于条件竞争的状态。

保护共享数据的方式

保护共享数据的初始化过程

假设有一个共享源,构建代价很昂贵,它可能会打开一个数据库连接或分配出很多的内存。

延迟初始化(Lazy initialization)在单线程代码很常见————每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:

1
2
3
4
5
6
7
8
9
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if(!resource_ptr)
{
resource_ptr.reset(new some_resource); // 1
}
resource_ptr->do_something();
}

转为多线程代码时,只有①处需要保护,这样共享数据对于并发访问就是安全的。

代码3.11 使用延迟初始化(线程安全)的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化
if(!resource_ptr)
{
resource_ptr.reset(new some_resource); // 只有初始化过程需要保护
}
lk.unlock();
resource_ptr->do_something();
}

代码3.12 使用std::call_once作为类成员的延迟初始化(线程安全)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;

void open_connection()
{
connection=connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_)
{}
void send_data(data_packet const& data) // 1
{
std::call_once(connection_init_flag,&X::open_connection,this); // 2
connection.send_data(data);
}
data_packet receive_data() // 3
{
std::call_once(connection_init_flag,&X::open_connection,this); // 2
return connection.receive_data();
}
};

例子中第一次调用send_data()①或receive_data()③的线程完成初始化过程。使用成员函数open_connection()去初始化数据,也需要将this指针传进去。

std::mutexstd::once_flag的实例不能拷贝和移动,需要通过显式定义相应的成员函数,对这些类成员进行操作。

保护不常更新的数据机构

互斥量“读者-作者锁”,其允许两种不同的使用方式:一个“作者”线程独占访问和共享访问,让多个“读者”线程并发访问。

c++17标准库提供了两种非常好的互斥量——std::shared_mutexstd::shared_timed_mutex

对于更新操作,可以使用std::lock_guard<std::shared_mutex>std::unique_lock<std::shared_mutex>上锁。

代码3.13 使用std::shared_mutex对数据结构进行保护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>

class dns_entry;

class dns_cache
{
std::map<std::string,dns_entry> entries;
mutable std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const
{
std::shared_lock<std::shared_mutex> lk(entry_mutex); // 1 使用std::shared_lock<>来保护共享和只读权限
std::map<std::string,dns_entry>::const_iterator const it=
entries.find(domain);
return (it==entries.end())?dns_entry():it->second;
}
void update_or_add_entry(std::string const& domain,
dns_entry const& dns_details)
{
std::lock_guard<std::shared_mutex> lk(entry_mutex); // 2 使用std::lock_guard<>实例,当表格需要更新时②,为其提供独占访问权限
entries[domain]=dns_details;
}
};

多线程可以同时调用find_entry(),且不会出错。

update_or_add_entry()函数调用时,独占锁会阻止其他线程对数据结构进行修改,并且阻止线程调用find_entry()。

嵌套锁

线程对已经获取的std::mutex(已经上锁)再次上锁是错误的,尝试这样做会导致未定义行为。

C++标准库提供了std::recursive_mutex类。除了可以在同一线程的单个实例上多次上锁,其他功能与std::mutex相同。

其他线程对互斥量上锁前,当前线程必须释放拥有的所有锁,所以如果你调用lock()三次,也必须调用unlock()三次。

使用std::lock_guard<std::recursive_mutex>std::unique_lock<std::recursive_mutex>

嵌套锁一般用在可并发访问的类上。每个公共成员函数都会对互斥量上锁,然后完成对应的操作后再解锁互斥量。不过,有时成员函数会调用另一个成员函数,这种情况下,第二个成员函数也会试图锁住互斥量,这就会导致未定义行为的发生。

解决方案为将互斥量转为嵌套锁,第二个成员函数就能成功的进行上锁,并且函数能继续执行。

缺点:对应类的不变量通常会被破坏。当不变量被破坏时,第二个成员函数还需要继续执行。

一个比较好的方式是,从中提取出一个函数作为类的私有成员,这个私有成员函数不会对互斥量进行上锁(调用前必须获得锁)。然后,需要仔细考虑一下,这种情况调用新函数时数据的状态。

4.同步操作

等待事件或条件

当一个线程等待另一个线程完成时,可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标识进行重置。但是,这种方式会消耗线程的执行时间检查标识,并且当互斥量上锁后,其他线程就没有办法获取锁,就会持续等待。

在等待线程在检查间隙,使用std::this_thread::sleep_for()进行周期性的间歇

1
2
3
4
5
6
7
8
9
10
11
12
13
bool flag;
std::mutex m;

void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再锁互斥量
}
}

当线程休眠时没有浪费执行时间,但很难确定正确的休眠时间。

条件变量:通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时)

条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行),终止线程将会向等待着的线程广播“条件达成”的信息。

等待条件达成

c++标准库条件变量的实现:std::condition_variablestd::condition_variable_any,这两个实现都包含在<condition_variable>头文件的声明中。

两者都需要与互斥量一起才能工作(互斥量是为了同步),前者仅能与std::mutex一起工作,而后者可以和合适的互斥量一起工作,从而加上了_any的后缀。

std::condition_variable_any更加通用,不过在性能和系统资源的使用方面会有更多的开销,所以通常会将std::condition_variable作为首选类型。

代码4.1 使用std::condition_variable处理数据等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
std::mutex mut;
std::queue<data_chunk> data_queue; // 1 队列中中有两个线程,两个线程之间会对数据进行传递
std::condition_variable data_cond;

void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // 2 数据压入队列
data_cond.notify_one(); // 3 对等待的线程(如果有等待线程)进行通知
}
}

void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut); // 4 对互斥量上锁
data_cond.wait(
lk,[]{return !data_queue.empty();}); // 5 传递一个锁和一个Lambda表达式(作为等待的条件)
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock(); // 6
process(data);
if(is_last_chunk(data))
break;
}
}

wait()会去检查这些条件(通过Lambda函数),当条件满足(Lambda函数返回true)时返回。如果条件不满足(Lambda函数返回false),wait()将解锁互斥量,并且将线程(处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠中苏醒,重新获取互斥锁,并且再次进行条件检查。在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并重新等待。

std::unique_lock而不使用std::lock_guard的原因:等待中的线程必须在等待期间解锁互斥量,并对互斥量再次上锁,而std::lock_guard没有这么灵活。

可以不使用lambda表达式,写一个检查函数传入到wait()。调用wait()的过程中,在互斥量锁定时,可能会去检查条件变量若干次,当提供测试条件的函数返回true就会立即返回。

伪唤醒:当等待线程重新获取互斥量并检查条件变量时,并非直接响应另一个线程的通知。

当多个线程同时在等待同一条件,此时条件满足发起唤醒则可能会唤醒多个线程,但是如果对应的资源不够所有唤醒线程使用,则剩余线程的唤醒就是无意义的,也就被称作虚假唤醒。

避免方法:在睡眠返回之后重新检查条件判断(while代替if)

std::condition_variable::wait是“忙碌-等待”的优化。下面用简单的循环实现了一个“忙碌-等待”:

1
2
3
4
5
6
7
template<typename Predicate>
void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred){
while(!pred()){
lk.unlock();
lk.lock();
}
}

构建线程安全队列

代码4.3 线程安全队列的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <memory> // 为了使用std::shared_ptr

template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(
const threadsafe_queue&) = delete; // 不允许简单的赋值

void push(T new_value);

bool try_pop(T& value); // 1 返回队列中值的状态
std::shared_ptr<T> try_pop(); // 2 直接返回检索值,当没有值可检索时,这个函数返回NULL。

void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();

bool empty() const;
};

裁剪了很多构造函数,并禁止简单赋值。需要提供两个版本的try_pop()和wait_for_pop()。

代码4.4 从代码4.1中提取push()和wait_and_pop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue; // 1

void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); // 2
}
}

void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data); // 3
process(data);
if(is_last_chunk(data))
break;
}
}

线程队列中有互斥量和条件变量,所以独立的变量就不需要了①,并且push()不需要外部同步②。当然,wait_and_pop()还要兼顾条件变量的等待③。

代码4.5 使用条件变量的线程安全队列(完整版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; // 1 互斥量必须是可变的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

empty()是一个const成员函数,并且传入拷贝构造函数的other形参是一个const引用。因为其他线程可能有非const引用对象,并调用变种成员函数,所以这里有必要对互斥量上锁。又因为锁住互斥量是个可变操作,所以互斥量成员必须为mutable①才能在empty()和拷贝构造函数中进行上锁。

当数据准备完成时,调用notify_one()将会唤醒一个正在wait()的线程,检查条件和wait()函数的返回状态(因为仅是向data_queue添加了一个数据项)。

使用future

future:当线程需要等待特定事件时,某种程度上来说就需要知道期望的结果。之后,线程会周期性(较短的周期)的等待或检查事件是否触发(检查信息板),检查期间也会执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间也可以先执行另外的任务,直到对应的任务触发,而后等待future的状态会变为就绪状态。

当事件发生时(状态为就绪),这个future就不能重置了。

c++标准库中,声明在<future>头文件中: unique future(std::future<>)和shared futures(std::shared_future<>),与了std::unique_ptrstd::shared_ptr非常类似。

std::future只能与指定事件相关联,而std::shared_future就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且可以访问与事件相关的数据。

当多个线程需要访问一个独立future对象时,必须使用互斥量或类似同步机制进行保护。

后台任务的返回值

当不着急让任务结果时,可以使用std::async启动一个异步任务。与std::thread对象等待的方式不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()成员函数,就会阻塞线程直到future为就绪为止,并返回计算结果。

代码4.6 std::future从异步任务中获取返回值

1
2
3
4
5
6
7
8
9
10
11
#include <future>
#include <iostream>

int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

std::async允许通过添加额外的调用参数,向函数传递额外的参数。第一个参数是指向成员函数的指针,第二个参数提供这个函数成员类的具体对象(是通过指针,也可以包装在std::ref中),剩余的参数可作为函数的参数传入。

代码4.7 使用std::async向函数传递参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <string>
#include <future>
struct X
{
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello"); // 调用p->foo(42, "hello"),p是指向x的指针
auto f2=std::async(&X::bar,x,"goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本
struct Y
{
double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
auto f4=std::async(std::ref(y),2.718); // 调用y(2.718)
X baz(X&);
std::async(baz,std::ref(x)); // 调用baz(x)
class move_only
{
public:
move_only();
move_only(move_only&&)
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;

void operator()();
};
auto f5=std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到

future的等待取决于std::async是否启动一个线程,或是否有任务在进行同步。

可以在函数调用之前向std::async传递一个额外参数:

std::launch::async,表明函数必须在其所在的独立线程上执行;

std::launch::defered,表明函数调用延迟到wait()或get()函数调用时才执行。

std::launch::deferred | std::launch::async表明实现可以选择这两种方式的一种(默认选项)。

1
2
3
4
5
6
7
auto f6=std::async(std::launch::async,Y(),1.2);  // 在新线程上执行
auto f7=std::async(std::launch::deferred,baz,std::ref(x)); // 在wait()或get()调用时执行
auto f8=std::async(
std::launch::deferred | std::launch::async,
baz,std::ref(x)); // 实现选择执行方式
auto f9=std::async(baz,std::ref(x));
f7.wait(); // 调用延迟函数

future与任务关联

std::packaged_task<>会将future与函数或可调用对象进行绑定。当调用std::packaged_task<>对象时,就会调用相关函数或可调用对象,当future状态为就绪时,会存储返回值。

构造std::packaged_task<>实例时,就必须传入函数或可调用对象。这个函数或可调用的对象,需要能接收指定的参数和返回(可转换为指定返回类型的)值。类型可以不完全匹配,因为这里类型可以隐式转换,可以用int类型参数和返回float类型的函数,来构建std::packaged_task<double(double)>实例。

函数签名的返回类型可以用来标识从get_future()返回的std::future<>的类型,而函数签名的参数列表,可用来指定packaged_task的函数调用操作符。例如,模板偏特化

代码4.8 std::packaged_task<>的偏特化

1
2
3
4
5
6
7
8
9
template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();
void operator()(std::vector<char>*,int);
};

std::packaged_task作为函数调用时,实参将由函数调用操作符传递至底层函数,并且返回值作为异步结果存储在std::future中,并且可通过get_future()获取。

std::packaged_task对任务进行打包,并适时的取回future。当异步任务需要返回值时,可以等待future状态变为“就绪”。

线程间传递任务

代码4.9 使用std::packaged_task执行一个图形界面线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()> > tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread() // 1
{
while(!gui_shutdown_message_received()) // 2 关闭图形界面的信息
{
get_and_process_gui_message(); // 3 关闭界面前,进行轮询界面消息处理
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty()) // 4 当队列中没有任务时,循环将继续
continue;
task=std::move(tasks.front()); // 5 能在队列中提取出一个任务
tasks.pop_front();
}
task(); // 6 释放队列上的锁,并且执行任务
}
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f); // 7 提供一个打包好的任务
std::future<void> res=task.get_future(); // 8 用get_future()成员函数获取future对象
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task)); // 9 任务推入列表
return res; // 10
}

std::packaged_task是一个类模板,顾名思义是用来打包的,将一个可调用对象封装起来,然后可以将其的返回值传给future。std::packaged_task<函数返回类型(参数类型)> 变量名(函数名)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <future>
#include <thread>

int fun(int x) {
x++;
x *= 10;
std::cout << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
return x;
}


int main()
{
std::packaged_task<int(int)> pt(fun); // 将函数打包起来
std::future<int> fu = pt.get_future(); // 并将结果返回给future
std::thread t(std::ref(pt), 1);
std::cout << fu.get() << std::endl;
std::cout << std::this_thread::get_id() << std::endl;
t.join();
return 0;
}

使用std::promises

当线程处理多个连接事件,来自不同的端口连接的数据包基本上以乱序方式进行处理。同样的,数据包也将以乱序的方式进入队列。很多情况下,一些应用不是等待数据成功的发送,就是等待(新的)指定网络接口数据的接收成功。

std::promise是一个类模板,它的作用是在不同的线程中实现数据的同步,与future结合使用,也间接实现了future在不同线程间的同步。

std::promise/std::future对提供一种机制:future可以阻塞等待线程,提供数据的线程可以使用promise对相关值进行设置,并将future的状态置为“就绪”。

当promise设置完毕(使用set_value()成员函数)时,对应的future状态就变为“就绪”,并且可用于检索已存储的值。当设置值之前销毁std::promise,将会存储一个异常。

代码4.10 使用promise解决单线程多连接问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <future>

void process_connections(connection_set& connections)
{
while(!done(connections)) // 1 循环直到done()返回true为止
{
for(connection_iterator // 2
connection=connections.begin(),end=connections.end();
connection!=end;
++connection)
{
if(connection->has_incoming_data()) // 3 检索是否有数据
{
data_packet data=connection->incoming();
std::promise<payload_type>& p=
connection->get_promise(data.id); // 4
p.set_value(data.payload);
}
if(connection->has_outgoing_data()) // 5 检索是否正在发送已入队的传出数据
{
outgoing_packet data=
connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true); // 6 当发送完成,传出数据相关的promise将置为true,来表明传输成功
}
}
}
}

set_value()则直接将future的状态设置为ready。需要注意的是在使用的过程中不能多次set_value(),也不能多次get_future()和多次get(),因为一个promise对象只能和一个对象相关联,否则就会抛出异常。

将异常存于future中

函数作为std::async的一部分时,当调用抛出一个异常时,这个异常就会存储到future中,之后future的状态置为“就绪”,之后调用get()会抛出已存储的异常(注意:标准级别没有指定重新抛出的这个异常是原始的异常对象,还是一个拷贝。不同的编译器和库将会在这方面做出不同的选择)。

将函数打包入std::packaged_task任务包后,当任务调用时,同样的事情也会发生。打包函数抛出一个异常,这个异常将存储在future中,在get()调用时会再次抛出。

std::promise也能提供同样的功能。当存入的是异常而非数值时,就需要调用set_exception()成员函数,而非set_value()。这通常是用在一个catch块中,并作为算法的一部分。为了捕获异常,这里使用异常填充promise:

1
2
3
4
5
6
7
8
9
extern std::promise<double> some_promise;
try
{
some_promise.set_value(calculate_value());
}
catch(...)
{
some_promise.set_exception(std::current_exception());
}

使用std::current_exception()来检索抛出的异常,可用std::copy_exception()作为替代方案,std::copy_exception()会直接存储新的异常而不抛出:

1
some_promise.set_exception(std::copy_exception(std::logic_error("foo ")));

这比使用try/catch块更加清晰,当异常类型已知,就应该优先使用。

多个线程的等待

多线程在没有额外同步的情况下,访问独立std::future对象时,就会有数据竞争和未定义行为。因为std::future独享同步结果,并且通过调用get()函数,一次性的获取数据,这就让并发访问变的毫无意义。

std::shared_future:其所有权可以在不同的实例中互相传递,但只有一个实例可以获得特定的同步结果,而std::shared_future实例是可拷贝的,所以多个对象可以引用同一关联期望值的结果。

每一个std::shared_future的独立对象上,成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时避免数据竞争,必须使用锁来对访问进行保护。优先使用的办法:为了替代只有一个拷贝对象的情况,可以让每个线程都拥有自己对应的拷贝对象。这样,当每个线程都通过自己拥有的std::shared_future对象获取结果,那么多个线程访问共享同步结果就是安全的。

std::shared_future的实例同步std::future实例的状态。当std::future对象没有与其他对象共享同步状态所有权,那么所有权必须使用std::move将所有权传递到std::shared_future

1
2
3
4
5
6
std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid()); // 1 期望值 f 是合法的
std::shared_future<int> sf(std::move(f));
assert(!f.valid()); // 2 期望值 f 现在是不合法的
assert(sf.valid()); // 3 sf 现在是合法的

期望值f开始是合法的①,因为引用的是promise p的同步状态,但是在转移sf的状态后,f就不合法了②,而sf就是合法的了③。

如其他可移动对象一样,转移所有权是对右值的隐式操作,所以可以通过std::promise对象的成员函数get_future()的返回值,直接构造一个std::shared_future对象,例如:

1
2
std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future()); // 1 隐式转移所有权

std::future有一个share()成员函数,可用来创建新的std::shared_future ,并且可以直接转移future的所有权。这样也就能保存很多类型,并且使得代码易于修改:

1
2
3
std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator,
SomeAllocator>::iterator> p;
auto sf=p.get_future().share();

当比较器或分配器有所改动,只需要对promise的类型进行修改即可。future的类型会自动与promise的修改进行匹配。

限时等待

时钟

时钟和与时间相关的工具,都在<chrono>库头文件中定义。

c++标准库中,时钟是一个类,提供了四种不同的信息:

当前时间可以通过静态成员函数now()从获取。例如,std::chrono::system_clock::now()会返回系统的当前时间。返回类型为some_clock::time_point

通常情况下,因为std::chrono::system_clock可调,所以是不稳定的。这可调可能造成首次调用now()返回的时间要早于上次调用now()所返回的时间,这就违反了节拍频率的均匀分布。

稳定时钟std::chrono::steady_clock

时间段

std::chrono::duration<>函数模板能够对时间段进行处理(线程库使用到的所有C++时间处理工具,都在std::chrono命名空间内)。第一个模板参数是一个类型表示(比如,int,long或double),第二个模板参数是定制部分,表示每一个单元所用秒数。

例如,std::chrono::duration<short, std::ratio<60, 1>>,几分钟的时间要存在short类型中,因为60秒是才是1分钟,所以第二个参数写成std::ratio<60, 1>。当需要将毫秒级计数存在double类型中时,可以写成std::chrono::duration<double, std::ratio<1, 1000>>,因为1秒等于1000毫秒

标准库在std::chrono命名空间内为时间段变量提供一系列预定义类型:nanoseconds[纳秒] , microseconds[微秒] , milliseconds[毫秒] , seconds[秒] , minutes[分]和hours[时]。

当不要求截断值的情况下(时转换成秒是没问题,但是秒转换成时就不行)时间段的转换是隐式的,显示转换可以由std::chrono::duration_cast<>来完成。

1
2
3
std::chrono::milliseconds ms(54802);
std::chrono::seconds s=
std::chrono::duration_cast<std::chrono::seconds>(ms);

这里的结果就是截断的,而不是进行了舍入,所以s最后的值为54。

时间值支持四则运算,所以能够对两个时间段进行加减,或者是对一个时间段乘除一个常数(模板的第一个参数)来获得一个新时间段变量。例如,5*seconds(1)与seconds(5)或minutes(1)-seconds(55)是一样。

在时间段中可以通过count()成员函数获得单位时间的数量。例如,std::chrono::milliseconds(1234).count()就是1234。

例如:等待future状态变为就绪需要35毫秒:

1
2
3
std::future<int> f=std::async(some_task);
if(f.wait_for(std::chrono::milliseconds(35))==std::future_status::ready)
do_something_with(f.get());

等待函数会返回状态值,表示是等待是超时,还是继续等待。等待future时,超时时会返回std::future_status::timeout。当future状态改变,则会返回std::future_status::ready。当与future相关的任务延迟了,则会返回std::future_status::deferred

系统调度的不确定性和不同操作系统的时钟精度意味着:线程调用和返回的实际时间间隔可能要比35毫秒长。

时间点

时间点可用std::chrono::time_point<>来表示,第一个参数用来指定使用的时钟,第二个函数参数用来表示时间单位(特化的std::chrono::duration<>)。

可以通过对std::chrono::time_point<>实例进行加/减,来获得一个新的时间点,所以std::chrono::hight_resolution_clock::now() + std::chrono::nanoseconds(500)将得到500纳秒后的时间,这对于计算绝对时间来说非常方便。

可以减去一个时间点(二者需要共享同一个时钟),结果是两个时间点的时间差:

1
2
3
4
5
6
auto start=std::chrono::high_resolution_clock::now();
do_something();
auto stop=std::chrono::high_resolution_clock::now();
std::cout<<”do_something() took “
<<std::chrono::duration<double,std::chrono::seconds>(stop-start).count()
<<” seconds”<<std::endl;

代码4.11 等待条件变量满足条件——有超时功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <condition_variable>
#include <mutex>
#include <chrono>

std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop()
{
auto const timeout= std::chrono::steady_clock::now()+
std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while(!done) // while循环处理假唤醒
{
if(cv.wait_until(lk,timeout)==std::cv_status::timeout) // 等待函数wait_until()传入一个时间点
break;
}
return done;
}

当没有什么可以等待时,可在一定时限中等待条件变量。这种方式中,循环的整体长度有限。当循环中使用wait_for()时,可能在等待了足够长的时间后结束等待(在假唤醒之前),且下一次等待又开始了。这可能重复很多次,出现无限等待的情况。

使用超时

最简单方式,对特定线程添加延迟处理。

例如循环检查“done”标志,两个处理函数分别是std::this_thread::sleep_for()std::this_thread::sleep_until()。当线程因为指定时长而进入睡眠时,可使用sleep_for()唤醒,可指定休眠的时间点,之后可使用sleep_until唤醒。

std::mutexstd::recursive_mutex都不支持超时,而std::timed_mutexstd::recursive_timed_mutex支持超时。这两种类型也有try_lock_for()和try_lock_until()成员函数,可以在一段时期内尝试获取锁,或在指定时间点前获取互斥锁。

表4.1 可接受超时的函数

类型/命名空间 函数 返回值
std::this_thread 命名空间 sleep_for(duration) N/A
sleep_until(time_point)
std::condition_variable 或 std::condition_variable_any wait_for(lock, duration) std::cv_status::time_out 或 std::cv_status::no_timeout
wait_until(lock, time_point)
wait_for(lock, duration, predicate) bool —— 当唤醒时,返回谓词的结果
wait_until(lock, duration, predicate)
std::timed_mutex 或 std::recursive_timed_mutex try_lock_for(duration) bool —— 获取锁时返回true,否则返回fasle
try_lock_until(time_point)
std::unique_lock<TimedLockable> unique_lock(lockable, duration) N/A —— 对新构建的对象调用owns_lock();
unique_lock(lockable, time_point) 当获取锁时返回true,否则返回false
try_lock_for(duration) bool —— 当获取锁时返回true,否则返回false
try_lock_until(time_point)
std::future<ValueType>或std::shared_future<ValueType> wait_for(duration) 当等待超时,返回std::future_status::timeout
wait_until(time_point) 当期望值准备就绪时,返回std::future_status::ready
当期望值持有一个为启动的延迟函数,返回std::future_status::deferred

5.内存模型和原子操作

内存模型

对象和内存位置

图5.1 将一个struct分解为多个对象,展示每个对象的内存位置。

5-1

完整的struct是一个有多个子对象(每一个成员变量)组成的对象。

四个原则:

  1. 每个变量都是对象,包括其成员变量的对象。
  2. 每个对象至少占有一个内存位置。
  3. 基本类型都有确定的内存位置(无论类型大小如何,即使他们是相邻的,或是数组的一部分)。
  4. 相邻位域是相同内存中的一部分。

对象、内存位置和并发

当两个线程访问同一个内存位置,如果线程不更新数据,只读数据不需要保护或同步。当线程对内存位置上的数据进行修改,就可能会产生条件竞争

为了避免条件竞争,线程就要以一定的顺序执行:

  • 使用互斥量来确定访问的顺序。当同一互斥量在两个线程同时访问前锁住,那么在同一时间内就只有一个线程能够访问对应的内存位置。
  • 使用原子操作决定两个线程的访问顺序,当多个线程访问同一个内存地址时,对每个访问者都需要设定顺序。

如果不规定对同一内存地址访问的顺序,那么访问就不是原子的。当两个线程都是“写入者”时,就会产生数据竞争和未定义行为。

使用原子操作避免未定义行为。

修改顺序

C++程序中的对象都有(由程序中的所有线程对象)在初始化开始阶段确定好修改顺序的。大多数情况下,这个顺序不同于执行中的顺序,但在给定的程序中,所有线程都需要遵守这个顺序。

如果使用原子操作,编译器就有责任去做同步。

原子操作和原子类型

标准原子类型

标准原子类型定义在头文件<atomic>中。

原子操作可以替代互斥量,来完成同步操作。

表5.1 标准原子类型的备选名和与其相关的std::atomic<>特化类

原子类型 相关特化类
atomic_bool std::atomic<bool>
atomic_char std::atomic<char>
atomic_schar std::atomic<signed char>
atomic_uchar std::atomic<unsigned char>
atomic_int std::atomic<int>
atomic_uint std::atomic<unsigned>
atomic_short std::atomic<short>
atomic_ushort std::atomic<unsigned short>
atomic_long std::atomic<long>
atomic_ulong std::atomic<unsigned long>
atomic_llong std::atomic<long long>
atomic_ullong std::atomic<unsigned long long>
atomic_char16_t std::atomic<char16_t>
atomic_char32_t std::atomic<char32_t>
atomic_wchar_t std::atomic<wchar_t>

C++标准库不仅提供基本原子类型,还定义了与原子类型对应的非原子类型,就如同标准库中的std::size_t。如表5.2所示这些类型:

表5.2 标准原子类型定义(typedefs)和对应的内置类型定义(typedefs)

原子类型定义 标准库中相关类型定义
atomic_int_least8_t int_least8_t
atomic_uint_least8_t uint_least8_t
atomic_int_least16_t int_least16_t
atomic_uint_least16_t uint_least16_t
atomic_int_least32_t int_least32_t
atomic_uint_least32_t uint_least32_t
atomic_int_least64_t int_least64_t
atomic_uint_least64_t uint_least64_t
atomic_int_fast8_t int_fast8_t
atomic_uint_fast8_t uint_fast8_t
atomic_int_fast16_t int_fast16_t
atomic_uint_fast16_t uint_fast16_t
atomic_int_fast32_t int_fast32_t
atomic_uint_fast32_t uint_fast32_t
atomic_int_fast64_t int_fast64_t
atomic_uint_fast64_t uint_fast64_t
atomic_intptr_t intptr_t
atomic_uintptr_t uintptr_t
atomic_size_t size_t
atomic_ptrdiff_t ptrdiff_t
atomic_intmax_t intmax_t
atomic_uintmax_t uintmax_t

对于标准类型进行typedef T,相关的原子类型就在原来的类型名前加上atomic_的前缀:atomic_T。

通常,标准原子类型不能进行拷贝和赋值,它们没有拷贝构造函数和拷贝赋值操作符。但是,可以隐式转化成对应的内置类型,所以这些类型依旧支持赋值,可以使用load()store()exchange()compare_exchange_weak()compare_exchange_strong()

它们都支持复合赋值符:+=, -=, *=, |= 等等。并且使用整型和指针的特化类型还支持++和–操作。

每种函数类型的操作都有一个内存序参数,这个参数可以用来指定存储的顺序。操作分为三类:

  1. Store操作,可选如下内存序:memory_order_relaxed, memory_order_release, memory_order_seq_cst
  2. Load操作,可选如下内存序:memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_seq_cst
  3. Read-modify-write(读-改-写)操作,可选如下内存序:memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst

std::atomic_flag

std::atomic_flag是最简单的原子类型,这个类型的对象可以在两个状态间切换:设置和清除。

std::atomic_flag类型的对象必须被ATOMIC_FLAG_INIT初始化。初始化标志位是“清除”状态。这里没得选择,这个标志总是初始化为“清除”:

1
std::atomic_flag f = ATOMIC_FLAG_INIT;

这适用于任何对象的声明,是唯一需要以如此特殊的方式初始化的原子类型,但也是唯一保证无锁的类型。首次使用时,需要初始化。如果std::atomic_flag是静态存储的,那么就的保证其是静态初始化的,也就意味着没有初始化顺序问题。

当标志对象已初始化,只能做三件事情:

  • 销毁,clear()成员函数,是一个存储操作,所以不能有memory_order_acquire或memory_order_acq_rel语义
  • 清除或设置(查询之前的值),test_and_set()成员函数,“读-改-写”操作,可以应用于任何内存顺序。

每一个原子操作,默认的内存序都是memory_order_seq_cst。例如:

1
2
f.clear(std::memory_order_release);  // 1 使用释放语义清除标志
bool x=f.test_and_set(); // 2 使用默认内存序设置表示,并且检索旧值

不能拷贝构造std::atomic_flag对象,不能将一个对象赋予另一个std::atomic_flag对象。这不是std::atomic_flag特有的属性,而是所有原子类型共有的属性。

原子类型的所有操作都是原子的,而赋值和拷贝调用了两个对象,这就就破坏了操作的原子性。

互斥锁与自旋锁

同一时刻只能有一个线程获取到锁,没有获取到锁的线程通常有两种处理方式:

  • 没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁叫做自旋锁,它不用将线程阻塞起来(NON-BLOCKING)
  • 把自己阻塞起来,等待重新调度请求,这种叫做互斥锁。

自旋锁的定义:当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)。

因为自旋锁避免了操作系统进程调度和线程切换,所以自旋锁通常适用在时间比较短的情况下。由于这个原因,操作系统的内核经常使用自旋锁。但是,如果长时间上锁的话,自旋锁会非常耗费性能,它阻止了其他线程的运行和调度。线程持有锁的时间越长,则持有该锁的线程将被 OS(Operating System) 调度程序中断的风险越大。

代码5.1 使用std::atomic_flag实现自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class spinlock_mutex
{
std::atomic_flag flag;
public:
spinlock_mutex():
flag(ATOMIC_FLAG_INIT)
{}
void lock()
{
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock()
{
flag.clear(std::memory_order_release);
}
};

由于std::atomic_flag的局限性太强,没有非修改查询操作,甚至不能像普通的布尔标志那样使用。所以,实际操作中最好使用std::atomic<bool>

std::atomic<bool>

最基本的原子整型类型就是std::atomic<bool>,它有着比std::atomic_flag更加齐全的布尔标志特性。

不能拷贝和拷贝赋值,但可以使用非原子的bool类型进行构造,所以可以初始化为true或false,并且可以从非原子bool变量赋值给std::atomic<bool>

1
2
std::atomic<bool> b(true);
b=false;

store()是一个存储操作,而load()是一个加载操作,exchange()是一个“读-改-写”操作:

1
2
3
4
std::atomic<bool> b;
bool x=b.load(std::memory_order_acquire);
b.store(true); // 使用store()写入(true或false)
x=b.exchange(false, std::memory_order_acq_rel); // exchange()允许使用新选的值替换已存储的值,并且会自动检索原始值

另一种存储方式:当前值与预期值一致时,存储新值的操作。存储一个新值(或旧值)取决于当前值

比较/交换:表现为compare_exchange_weak()和compare_exchange_strong()

比较原子变量的当前值和期望值,当两值相等时,存储所提供值。当两值不等,期望值就会被更新为原子变量中的值。

“比较/交换”函数值是一个bool变量,当返回true时执行存储操作,false则更新期望值。当存储完成(因为只相等),则操作是成功的,否则即为失败。操作成功是返回true,失败时返回false。

因为compare_exchange_weak()可以伪失败,所以通常会配合一个循环使用:

1
2
3
bool expected=false;
extern atomic<bool> b; // 设置些什么
while(!b.compare_exchange_weak(expected,true) && !expected);

循环中expected的值始终是false,表示compare_exchange_weak()会莫名的失败。

另一方面,当实际值与expected不符,compare_exchange_strong()就能保证值返回false。这就能消除对循环的需要,就可以知道是否成功的改变了一个变量,或已让另一个线程完成。

“compare/exchange”另一点不同的是,它拥有对两个内存序的参数进行操作的能力,这就允许内存序语义在成功和失败的例子中有所不同。可能成功时使用memory_order_acq_rel,而失败时使用memory_order_relaxed。失败的“compare/exchange”将不会进行存储,所以“compare/exchange”操作不能拥有meory_order_release或memory_order_acq_rel。

下面对compare_exchange_weak()的两次调用是等价的:

1
2
3
4
5
std::atomic<bool> b;
bool expected;
b.compare_exchange_weak(expected,true,
memory_order_acq_rel,memory_order_acquire);
b.compare_exchange_weak(expected,true,memory_order_acq_rel);

std::atomic<bool>std::atomic_flag的不同之处在于,std::atomic<bool>可能不是无锁的。为了保证操作的原子性,其实现中可能需要内置的互斥量。特殊情况时,可以使用is_lock_free()成员函数,检查std::atomic<bool>上的操作是否无锁。这是除了std::atomic_flag之外,另一个所有原子类型都拥有的特征(is_lock_free)。

std::atomic<T*>

原子指针类型,可以使用内置类型或自定义类型T,通过特化std::atomic<T*>进行定义,操作是针对于相关类型的指针。

不能拷贝构造,也不能拷贝赋值,但是可以通过合适的类型指针进行构造和赋值。std::atomic<T*>也有load(), store(), exchange(), compare_exchange_weak()和compare_exchage_strong()成员函数,获取与返回的类型都是T*。

std::atomic<T*>为指针运算提供新的操作。基本操作有fetch_add()和fetch_sub(),它们在存储地址上做原子加法和减法,为+=, -=, ++和–提供简易的封装。对于内置类型的操作,例如:如果x是std::atomic<Foo*>类型的数组的首地址,然后x+=3让其偏移到第四个元素的地址,并返回一个普通的Foo*类型值,这个指针值是指向数组中第四个元素。

fetch_add()和fetch_sub()的返回值略有不同(所以x.ftech_add(3)让x指向第四个元素,并且函数返回指向第一个元素的地址)。这种操作也被称为“交换-相加”,并且这是一个原子的“读-改-写”操作,如同exchange()和compare_exchange_weak()/compare_exchange_strong()一样。正像其他操作那样,返回值是一个普通的T*值,而非是std::atomic<T*>对象的引用,所以调用代码可以基于之前的值进行操作:

1
2
3
4
5
6
7
8
9
class Foo{};
Foo some_array[5];
std::atomic<Foo*> p(some_array);
Foo* x=p.fetch_add(2); // p加2,并返回原始值
assert(x==some_array);
assert(p.load()==&some_array[2]);
x=(p-=1); // p减1,并返回原始值
assert(x==&some_array[1]);
assert(p.load()==&some_array[1]);

函数也允许内存序作为给定函数的参数:

1
p.fetch_add(3,std::memory_order_release);

因为fetch_add()和fetch_sub()都是“读-改-写”操作,可以使用任意的内存序,以及加入到一个释放序列中。因为没办法提供必要的信息(这些形式都具有memory_order_seq_cst语义),所以指定的语序不支持操作符形式。

标准原子整型的相关操作

如同普通的操作集合一样(load(), store(), exchange(), compare_exchange_weak(), 和compare_exchange_strong()),std::atomic<int>std::atomic<unsigned long long>也是有一套完整的操作可以供使用:fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor(),还有复合赋值方式((+=, -=, &=, |=和^=),以及++和–(++x, x++, –x和x–)。

虽然对于普通的整型来说,这些复合赋值方式还不完全:除法、乘法和移位操作不在其中。因为,整型原子值通常用来作计数器,或者是掩码,所以以上操作的缺失显得不是那么重要。如果需要,可以使用compare_exchange_weak()完成。

对于std::atomic<T*>类型,紧密相关的两个函数就是fetch_add()和fetch_sub()。函数原子化操作,并且返回旧值,而符合赋值运算会返回新值。前缀加减和后缀加减与普通用法一样:++x对变量进行自加,并且返回新值;而x++对变量自加,返回旧值。

std::atomic<>类模板

模板允许用户使用自定义类型创建一个原子变量(除了标准原子类型之外),需要满足一定的标准才可以使用std::atomic<>

为了使用std::atomic<UDT>(UDT是用户定义类型),这个类型必须有拷贝赋值运算符。这就意味着这个类型不能有任何虚函数或虚基类,以及必须使用编译器创建的拷贝赋值操作。自定义类型中所有的基类和非静态数据成员也都需要支持拷贝赋值操作。这(基本上)就允许编译器使用memcpy()或赋值操作的等价操作,因为实现中没有用户代码。

不要将锁定区域内的数据以引用或指针的形式,作为参数传递给用户提供的函数。

创建一个std::atomic<std::vector<int>>类型。不能使用包含有计数器,标志指针和简单数组的类型,作为特化类型。虽然这不会导致任何问题,但是越是复杂的数据结构,就有越多的操作,而非只有赋值和比较。如果这种情况发生了,最好使用std::mutex保护数据。

当使用用户定义类型T进行实例化时,std::atomic<T>的可用接口就只有: load(), store(), exchange(), compare_exchange_weak(), compare_exchange_strong()和赋值操作,以及向类型T转换的操作。

表5.3 每一个原子类型所能使用的操作

5-3-table

原子操作的非成员函数

大多数非成员函数的命名与对应成员函数有关,需要atomic_作为前缀(比如,std::atomic_load())。这些函数都会重载不同的原子类型,指定内存序时会分成两种:一种没有标签,另一种以_explicit为后缀,并且需要额外的参数,或将内存序作为标签,亦或只有标签(例如,std::atomic_store(&atomic_var,new_value)std::atomic_store_explicit(&atomic_var,new_value,std::memory_order_release)。

成员函数隐式引用原子对象,所有非成员函数都持有一个指向原子对象的指针(作为第一个参数)。

例如,std::atomic_is_lock_free()只有一种类型(虽然会被其他类型所重载),并且对于同一个对象a,std::atomic_is_lock_free(&a)返回值与a.is_lock_free()相同。

同样的,std::atomic_load(&a)和a.load()的作用一样。需要注意的是,a.load(std::memory_order_acquire)std::atomic_load_explicit(&a, std::memory_order_acquire)的操作相同。

非成员函数的设计是为了与C语言兼容,C语言中没有引用。

C++标准库也对原子类型中的std::shared_ptr<>智能指针类型提供非成员函数,这打破了“只有原子类型,才能提供原子操作”的原则。std::shared_ptr<>不是原子类型,但是C++标准委员会认为这很重要。可使用的原子操作有:load, store, exchange和compare/exchange,这些操作重载了标准原子类型的操作,并且可获取std::shared_ptr<>*作为第一个参数:

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<my_data> p;
void process_global_data()
{
std::shared_ptr<my_data> local=std::atomic_load(&p);
process_data(local);
}
void update_global_data()
{
std::shared_ptr<my_data> local(new my_data);
std::atomic_store(&p,local);
}

标准原子类型不仅仅是为了避免数据竞争所造成的未定义行为,还允许用户对不同线程上的操作进行强制排序。这种强制排序是数据保护和同步操作的基础,例如:std::mutexstd::future

同步操作和强制排序

假设两个线程,一个向数据结构中填充数据,另一个读取数据结构中的数据。为了避免恶性条件竞争,第一个线程设置一个标志,用来表明数据已经准备就绪,从而第二个线程在这个标志设置前不能读取数据。

代码5.2 不同线程对数据的读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <vector>
#include <atomic>
#include <iostream>

std::vector<int> data;
std::atomic<bool> data_ready(false);

void reader_thread()
{
while(!data_ready.load()) // 1
{
std::this_thread::sleep(std::milliseconds(1));
}
std::cout<<"The answer="<<data[0]<<"\m"; // 2
}
void writer_thread()
{
data.push_back(42); // 3
data_ready=true; // 4
}

访问顺序通过对std::atomic<bool>类型的data_ready变量进行操作完成,这些操作通过*先行(happens-before)和同发*(synchronizes-with)确定顺序。写入数据③在写入data_ready④前发生,读取①发生在读取数据②之前。

强制了顺序:写入数据先行于读取数据。

当data_ready①为true,写操作就会与读操作同步,建立一个“先行”的关系。因为“先行”关系是可传递的,所以写入③先行于写入④,这两个行为又先行于读取操作①,之前的操作都先行于读取数据②。

5-2

图5.2 对非原子操作,使用原子操作对操作进行强制排序

同步发生

“同发”只在原子类型之间进行。例如:操作一个数据结构(对互斥量上锁),如果数据结构包含有原子类型,并且操作内部执行了一定的原子操作,那这些操作就是“同发”关系。

“同发”的基本想法:原子写操作W对变量x进行标记,同步与对x进行原子读操作,读取的是W操作写入的内容,或是W之后,同一线程上的原子写操作对x写入的值,亦或是任意线程对x的一系列原子读-改-写操作(例如,fetch_add()或compare_exchange_weak())。

因为对原子类型的操作默认都有“适当的标记”,如果线程A存储了一个值,并且线程B读取了这个值,线程A的存储操作与线程B的载入操作就是同步发生关系。

所有细微的差别都在“适当的标记”中,C++内存模型允许为原子类型提供各种约束顺序。

先行发生

“先行”关系是一个程序中基本构建块的操作顺序:指定了某个操作去影响另一个操作。

对于单线程来说:一个操作排在另一个之后,那这个操作就先执行。

如果源码中操作A发生在操作B之前,那A就先行于B。可以回看代码5.2:对data的写入③先于对data_ready④的写入。如果操作在同时发生,因为操作间无序执行,通常情况下就没有先行关系了。

代码5.3 对于参数中的函数调用顺序未指定顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <iostream>
void foo(int a,int b)
{
std::cout<<a<<”,”<<b<<std::endl;
}
int get_num()
{
static int i=0;
return ++i;
}
int main()
{
foo(get_num(),get_num()); // 无序调用get_num()
}

程序会输出“1,2”或“2,1”,因为两个get_num()的执行顺序未指定。

这种情况下,操作在单一声明中可测序,例如:逗号操作符的使用或是表达式的结果作为参数传给另一个表达式。通常情况下,操作在单一声明中不可排序,所以无法先行安排顺序(也就没有先行发生了)。

线程间先行可以与排序先行相结合:如果操作A排序先行于操作B,并且操作B线程间先行于操作C,那么A线程间先行于C。同样的,如果A同步于B,并且B排序先于C,那么A线程间先行于C。当对数据进行一系列修改(单线程)时,只需要对数据进行一次同步即可。

如果操作A与操作B同步,或操作A的顺序在操作B之前,那么A就是强先行于B。也适用于顺序传递:如果A强先行于B,并且B强先行于C,那么A就肯定强先行于C。

原子操作的内存序

有六个内存序列选项可应用于对原子类型的操作:

  1. memory_order_relaxed
  2. memory_order_consume
  3. memory_order_acquire
  4. memory_order_release
  5. memory_order_acq_rel
  6. memory_order_seq_cst

除非为特定的操作指定一个序列选项,要不内存序列默认都是memory_order_seq_cst。

三种内存模型:

  • 顺序一致性(sequentially consistent)
  • 获取-释放序(memory_order_consume, memory_order_acquire, memory_order_release和memory_order_acq_rel)
  • 自由序(memory_order_relaxed)。

不同的内存序在不同的CPU架构下功耗不同。不同种类的内存序,允许使用其提升相关操作的性能。使用顺序一致序(相较于其他序列,它是最简单的)时,对于在通常情况来说就够用了。

顺序一致性

默认序命名为顺序一致性,因为程序中的行为从任意角度去看,序列都保持一定顺序。

如果原子实例的所有操作都是序列一致的,那么多线程就会如单线程那样以某种特殊的排序执行

不同的操作也要遵守相同的顺序。因为行为简单,可以使用原子变量进行编写。通过不同的线程,可以写出所有可能的操作消除那些不一致,以及确认代码的行为是否与预期相符。所以,操作都不能重排;如果代码在一个线程中,将一个操作放在另一个操作前面,那其他线程也需要了解这个顺序。

缺点:因为整个序列中的操作都必须在多个处理器上保持一致,可能需要对处理器间的同步操作进行扩展(代价很昂贵!)

代码5.4 全序——序列一致性

#include
#include
#include <assert.h>

std::atomic x,y;
std::atomic z;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;

void write_x()
{
x.store(true,std::memory_order_seq_cst); // 1
}

void write_y()
{
y.store(true,std::memory_order_seq_cst); // 2
}
void read_x_then_y()
{
while(!x.load(std::memory_order_seq_cst));
if(y.load(std::memory_order_seq_cst)) // 3
++z;
}
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst));
if(x.load(std::memory_order_seq_cst)) // 4
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load()!=0); // 5
}

assert⑤语句是永远不会触发的,因为不是存储x的操作①发生,就是存储y的操作②发生。

5-3

图5.3 序列一致与先行关系

序列一致性是最简单、直观的序列,因为需要对所有线程进行全局同步,所以也是开销最大的内存序。多处理器设备上需要在处理期间,在信息交换上耗费大量的时间。

为了避免这种消耗,就需考虑使用其他内存序。

非顺序一致性内存

不同线程看到相同操作,不一定有着相同的顺序。

不仅是考虑事情同时发生的问题,还有线程没办法保证一致性

自由序

原子类型上的操作以自由序执行。同一线程中对于同一变量的操作还是遵从先行关系,但不同线程不需要规定顺序。唯一的要求是在访问同一线程中的单个原子变量不能重排序,当给定线程看到原子变量的值时,随后线程的读操作就不会去检索较早的那个值。当使用memory_order_relaxed时,不需要任何额外的同步,对于每个变量的修改顺序只存在于线程间共享。

代码5.5 非限制操作只有非常少的顺序要求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;

void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
y.store(true,std::memory_order_relaxed); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3
if(x.load(std::memory_order_relaxed)) // 4
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 5
}

这次assert⑤可能会触发,因为加载x的操作④可能读取到false,即使加载y的操作③读取到true,并且存储x的操作①先发与存储y的操作②。x和y是两个不同的变量,所以没有顺序去保证每个操作产生相关值的可见性。

非限制操作对于不同变量可以重排序,只要服从任意的先行关系即可(比如,在同一线程中)。

5-4

图5.4 非限制原子操作与先发执行

代码5.6 非限制操作——多线程版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <thread>
#include <atomic>
#include <iostream>

std::atomic<int> x(0),y(0),z(0); // 1
std::atomic<bool> go(false); // 2

unsigned const loop_count=10;

struct read_values
{
int x,y,z;
};

read_values values1[loop_count];
read_values values2[loop_count];
read_values values3[loop_count];
read_values values4[loop_count];
read_values values5[loop_count];

void increment(std::atomic<int>* var_to_inc,read_values* values)
{
while(!go)
std::this_thread::yield(); // 3 自旋,等待信号
for(unsigned i=0;i<loop_count;++i)
{
values[i].x=x.load(std::memory_order_relaxed);
values[i].y=y.load(std::memory_order_relaxed);
values[i].z=z.load(std::memory_order_relaxed);
var_to_inc->store(i+1,std::memory_order_relaxed); // 4
std::this_thread::yield();
}
}

void read_vals(read_values* values)
{
while(!go)
std::this_thread::yield(); // 5 自旋,等待信号
for(unsigned i=0;i<loop_count;++i)
{
values[i].x=x.load(std::memory_order_relaxed);
values[i].y=y.load(std::memory_order_relaxed);
values[i].z=z.load(std::memory_order_relaxed);
std::this_thread::yield();
}
}

void print(read_values* v)
{
for(unsigned i=0;i<loop_count;++i)
{
if(i)
std::cout<<",";
std::cout<<"("<<v[i].x<<","<<v[i].y<<","<<v[i].z<<")";
}
std::cout<<std::endl;
}

int main()
{
std::thread t1(increment,&x,values1);
std::thread t2(increment,&y,values2);
std::thread t3(increment,&z,values3);
std::thread t4(read_vals,values4);
std::thread t5(read_vals,values5);

go=true; // 6 开始执行主循环的信号

t5.join();
t4.join();
t3.join();
t2.join();
t1.join();

print(values1); // 7 打印最终结果
print(values2);
print(values3);
print(values4);
print(values5);
}

三个全局原子变量①和五个线程。每一个线程循环10次,使用时memory_order_relaxed读取三个原子变量的值,并且将它们存储在一个数组上。其中三个线程每次通过循环④来更新其中一个原子变量,这时剩下的两个线程就负责读取。当线程都汇入主线程,就能打印出来每个线程存到数组上的值了。

原子变量go②用来确保线程同时退出。启动线程是昂贵的操作,并且没有明确的延迟,第一个线程可能在最后一个线程开始前结束。每个线程都在go变为true前,都在循环③⑤。并且当go设置为true时,所有线程都会开始运行⑥。

程序一种可能的输出为:

1
2
3
4
5
(0,0,0),(1,0,0),(2,0,0),(3,0,0),(4,0,0),(5,7,0),(6,7,8),(7,9,8),(8,9,8),(9,9,10)
(0,0,0),(0,1,0),(0,2,0),(1,3,5),(8,4,5),(8,5,5),(8,6,6),(8,7,9),(10,8,9),(10,9,10)
(0,0,0),(0,0,1),(0,0,2),(0,0,3),(0,0,4),(0,0,5),(0,0,6),(0,0,7),(0,0,8),(0,0,9)
(1,3,0),(2,3,0),(2,4,1),(3,6,4),(3,9,5),(5,10,6),(5,10,8),(5,10,10),(9,10,10),(10,10,10)
(0,0,0),(0,0,0),(0,0,0),(6,3,7),(6,5,7),(7,7,7),(7,8,7),(8,8,7),(8,8,9),(8,8,9)

前三行中线程都做了更新,后两行线程只是做读取。每三个值都是一组x,y和z,并按照这样的顺序依次循环。对于输出,需要注意的是:

  1. 第一组值中x增1,第二组值中y增1,第三组中z增1。
  2. x元素只在给定集中增加,y和z也一样,但是是不均匀增加,并且每个线程中的相对顺序都不同。
  3. 线程3看不到x或y的任何更新,它能看到的只有z的更新。这并不妨碍别的线程观察z的更新,并同时观察x和y的更新。

对于非限制操作,这个结果没毛病(但是不是唯一合法的输出)。任意组都用三个变量保持一致,从0到10依次递增,并且线程对相应变量进行递增操作,所以打印出的值在0到10的范围内都合理。

要想获取额外的同步,且不使用全局排序一致,可以使用获取-释放序(acquire-release ordering)。

获取-释放序

自由序(relaxed ordering)的加强版,虽然操作依旧没有统一顺序,但引入了同步。

原子加载就是获取(acquire)操作(memory_order_acquire),原子存储就是释放(memory_order_release)操作,原子读-改-写操作(例如fetch_add()或exchange())在这里,不是“获取”就是“释放”,或者两者兼有的操作(memory_order_acq_rel),同步在线程释放和获取间是成对的(pairwise),释放操作与获取操作同步就能读取已写入的值。

代码5.7 获取-释放不意味着统一操作顺序 使用获取-释放序(而非序列一致方式),对代码5.4的一次重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;
void write_x()
{
x.store(true,std::memory_order_release);
}
void write_y()
{
y.store(true,std::memory_order_release);
}
void read_x_then_y()
{
while(!x.load(std::memory_order_acquire));
if(y.load(std::memory_order_acquire)) // 1
++z;
}
void read_y_then_x()
{
while(!y.load(std::memory_order_acquire));
if(x.load(std::memory_order_acquire)) // 2
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load()!=0); // 3
}

断言③可能会触发(就如同自由排序那样),因为在加载x②和y①时,可能读取到false。因为x和y是由不同线程写入,所以序列中的每一次释放和获取都不会影响到其他线程的操作。

5-6

图5.6 获取-释放,以及先行过程

对于读取的结果,两个(读取)线程看到的是两个完全不同的世界。

代码5.8 获取-释放序操作会影响释放操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;

void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
y.store(true,std::memory_order_release); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_acquire)); // 3 自旋,等待y被设置为true
if(x.load(std::memory_order_relaxed)) // 4
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 5
}

读取y③时会得到true,和存储时写入的一样②。存储使用的是memory_order_release,读取使用的是memory_order_acquire,存储与读取就同步了。因为这两个操作是由同一个线程串行完成的,所以存储x①的操作先行于存储y②的操作。

对y的存储同步与对y的加载,存储x也就先行于对y的加载,并且扩展先行于x的读取。因此,加载x的值必为true,并且断言⑤不会触发。如果对于y的加载不是在while循环中,情况可能就会有所不同。加载y的时候可能会读取到false,这种情况下对于读取到的x是什么值没有要求了。为了保证同步,加载和释放操作必须成对。

释放操作存储的值必须要让获取操作看到。当存储②或加载③都是一个释放操作时,对x的访问就无序了,也就无法保证④处读到的是true,并且还会触发断言。

获取-释放序传递同步

为了考虑传递顺序,至少需要三个线程。第一个线程用来修改共享变量,第二个线程使用“加载-获取”读取由“存储-释放”操作过的变量,并且再对第二个变量进行“存储-释放”操作。最后,由第三个线程通过“加载-获取”读取第二个共享变量,并提供“加载-获取”操作来读取被“存储-释放”操作写入的值。为了保证同步关系,即便是中间线程没有对共享变量做任何操作,第三个线程也可以读取第一个线程操作过的变量。

代码5.9 使用获取和释放序传递同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::atomic<int> data[5];
std::atomic<bool> sync1(false), sync2(false);

void thread_1()
{
data[0].store(42,std::memory_order_relaxed);
data[1].store(97,std::memory_order_relaxed);
data[2].store(17,std::memory_order_relaxed);
data[3].store(-141,std::memory_order_relaxed);
data[4].store(2003,std::memory_order_relaxed);
sync1.store(true,std::memory_order_release); // 1.设置sync1
}

void thread_2()
{
while(!sync1.load(std::memory_order_acquire)); // 2.直到sync1设置后,循环结束
sync2.store(true,std::memory_order_release); // 3.设置sync2
}
void thread_3()
{
while(!sync2.load(std::memory_order_acquire)); // 4.直到sync1设置后,循环结束
assert(data[0].load(std::memory_order_relaxed)==42);
assert(data[1].load(std::memory_order_relaxed)==97);
assert(data[2].load(std::memory_order_relaxed)==17);
assert(data[3].load(std::memory_order_relaxed)==-141);
assert(data[4].load(std::memory_order_relaxed)==2003);
}

thread_1将数据存储到data中先行于存储sync1①(它们在同一个线程内)。因为加载sync1①的是一个while循环,它最终会看到thread_1存储的值。因此,对于sync1的存储先行于最终对于sync1的加载(在while循环中)。thread_3的加载操作④,位于存储sync2③操作的前面。存储sync2③因此先行于thread_3的加载④,加载又先行于存储sync2③,存储sync2又先行于加载sync2④,加载syn2又先行于加载data。因此,thread_1存储数据到data的操作先行于thread_3中对data的加载,并且保证断言都不会触发。

将sync1和sync2通过在thread_2中使用“读-改-写”操作(memory_order_acq_rel)合并成一个独立的变量。其中会使用compare_exchange_strong()来保证thread_1对变量只进行一次更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::atomic<int> sync(0);
void thread_1()
{
// ...
sync.store(1,std::memory_order_release);
}

void thread_2()
{
int expected=1;
while(!sync.compare_exchange_strong(expected,2,
std::memory_order_acq_rel))
expected=1;
}
void thread_3()
{
while(sync.load(std::memory_order_acquire)<2);
// ...
}

使用memory_order_acq_rel语义的“读-改-写”操作,每一个动作都包含获取和释放操作,所以可以和之前的存储操作进行同步,并且可以对随后的加载操作进行同步,就像上面例子一样。

释放队列与同步

通过线程在存储和加载操作之间有(有序的)多个“读-改-写”操作(所有操作都已经做了适当的标记),所以可以获取原子变量存储与加载的同步关系。存储操作标记为memory_order_release,memory_order_acq_rel或memory_order_seq_cst,加载标记为memory_order_consum,memory_order_acquire或memory_order_sqy_cst,并且操作链上的每一加载操作都会读取之前操作写入的值,因此链上的操作构成了一个释放序列(release sequence),并且初始化存储同步(对应memory_order_acquire或memory_order_seq_cst)或是前序依赖(对应memory_order_consume)的最终加载,操作链上的任何原子“读-改-写”操作可以拥有任意个内存序(甚至是memory_order_relaxed)。

假设使用atomic<int>对共享队列的元素进行计数:

代码5.11 使用原子操作从队列中读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <atomic>
#include <thread>

std::vector<int> queue_data;
std::atomic<int> count;

void populate_queue()
{
unsigned const number_of_items=20;
queue_data.clear();
for(unsigned i=0;i<number_of_items;++i)
{
queue_data.push_back(i);
}

count.store(number_of_items,std::memory_order_release); // 1 初始化存储
}

void consume_queue_items()
{
while(true)
{
int item_index;
if((item_index=count.fetch_sub(1,std::memory_order_acquire))<=0) // 2 一个“读-改-写”操作
{
wait_for_more_items(); // 3 等待更多元素
continue;
}
process(queue_data[item_index-1]); // 4 安全读取queue_data
}
}

int main()
{
std::thread a(populate_queue);
std::thread b(consume_queue_items);
std::thread c(consume_queue_items);
a.join();
b.join();
c.join();
}

只有一个消费者线程时还好,fetch_sub()带有memory_order_acquire的读取操作,并且存储操作是带有memory_order_release语义,所以存储与加载同步,线程可以从缓存中读取元素。当有两个读取线程时,第二个fetch_sub()操作将看到第一个线程修改的值,且没有值通过store写入其中。先不管释放序列的规则,第二个线程与第一个线程不存在先行关系,并且对共享内存中值的读取也不安全。除非第一个fetch_sub()是带有memory_order_release语义,为两个消费者线程建立了不必要的同步。无论是释放序列的规则,还是具有memory_order_release语义的fetch_sub操作,第二个消费者看到的是一个空的queue_data,无法从其获取任何数据,并且还会产生条件竞争。

不过,第一个fetch_sub()对释放顺序做了一些事情,所以store()能同步与第二个fetch_sub()操作。两个消费者线程间不需要同步关系。过程在图5.7中展示,其中虚线表示的就是释放顺序,实线表示的是先行关系。

5-7

图5.7 代码5.11中对队列操作的释放顺序

操作链中可以有任意数量的链接,提供的都是“读-改-写”操作,比如fetch_sub(),store(),每一个都会与使用memory_order_acquire语义的操作进行同步。

栅栏

栅栏操作会对内存序列进行约束,使其无法对任何数据进行修改,典型的做法是与使用memory_order_relaxed约束序的原子操作一起使用。

栅栏属于全局操作,执行栅栏操作可以影响到在线程中的其他原子操作。因为这类操作就像画了一条任何代码都无法跨越的线一样,所以栅栏操作通常也被称为内存栅栏(memory barriers)。

代码5.12 栅栏可以让自由操作变的有序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;

void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
std::atomic_thread_fence(std::memory_order_release); // 2
y.store(true,std::memory_order_relaxed); // 3
}

void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 4
std::atomic_thread_fence(std::memory_order_acquire); // 5
if(x.load(std::memory_order_relaxed)) // 6
++z;
}

int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 7
}

因为加载y的操作④读取③处存储的值,所以释放栅栏②与获取栅栏⑤同步。①处存储x先行于⑥处加载x,最后x读取出来必为true,并且不会触发断言⑦。

这两个栅栏都是必要的:需要在一个线程中进行释放,然后在另一个线程中进行获取,这样才能构建同步关系。

当获取操作能看到释放栅栏操作后的存储结果,那么这个栅栏就与获取操作同步。并且,当加载操作在获取栅栏操作前,看到一个释放操作的结果,那么这个释放操作同步于获取栅栏。

虽然,栅栏同步依赖于读取/写入的操作发生于栅栏之前/后,但是这里有一点很重要:同步点,就是栅栏本身。当执行代码5.12中的write_x_then_y,并且在栅栏操作之后对x进行写入,就像下面的代码一样。触发断言的条件就不保证一定为true了,尽管写入x的操作在写入y的操作之前发生。

1
2
3
4
5
6
void write_x_then_y()
{
std::atomic_thread_fence(std::memory_order_release);
x.store(true,std::memory_order_relaxed);
y.store(true,std::memory_order_relaxed);
}

栅栏不会分开这里的两个操作,并且也不再有序。只有当栅栏出现在存储x和存储y操作之间时,顺序才是硬性的。当然,栅栏是否存在不会影响任何拥有先行关系的执行序列。

原子操作对非原子的操作排序

使用普通的非原子bool类型来替换代码5.12中的x,行为和替换前完全一样。

代码5.13 使用非原子操作执行序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <atomic>
#include <thread>
#include <assert.h>

bool x=false; // x现在是一个非原子变量
std::atomic<bool> y;
std::atomic<int> z;

void write_x_then_y()
{
x=true; // 1 在栅栏前存储x
std::atomic_thread_fe nce(std::memory_order_release);
y.store(true,std::memory_order_relaxed); // 2 在栅栏后存储y
}

void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3 在#2写入前,持续等待
std::atomic_thread_fence(std::memory_order_acquire);
if(x) // 4 这里读取到的值,是#1中写入
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); // 5 断言将不会触发
}

栅栏仍然为存储x①和存储y②,还为加载y③和加载x④提供一个执行序,并且这里存储x和加载x之间仍然有一个先行关系,所以不会触发断言⑤。②中的存储和③中对y的加载必须是原子操作,否则会在y上产生条件竞争。当读取线程看到存储到y的操作,栅栏将会对x执行有序的操作,这个执行序意味着x上不存在条件竞争。

不仅是栅栏可对非原子操作排序,memory_order_release/memory_order_consume也为非原子访问排序,可以动态分配对象,并且本章中的许多例子都可以使用普通的非原子操作,去替代memory_order_relaxed的操作。

非原子操作排序

某一内存位置上,lock()作为一个获取操作,在同样的位置上unlock()作为一个释放操作。

以下的工具都可以提供同步:

std::thread

  • std::thread构造新线程时,构造函数与调用函数或新线程的可调用对象间的同步。
  • 对std::thread对象调用join,可以和对应的线程进行同步。

std::mutex, std::timed_mutex, std::recursive_mutex, std::recursibe_timed_mutex

  • 对给定互斥量对象调用lock和unlock,以及对try_lock,try_lock_for或try_lock_until,会形成该互斥量的锁序。
  • 对给定的互斥量调用unlock,需要在调用lock或成功调用try_lock,try_lock_for或try_lock_until之后,这样才符合互斥量的锁序。
  • 对try_lock,try_lock_for或try_lock_until失败的调用,不具有任何同步关系。

std::shared_mutex , std::shared_timed_mutex

  • 对给定互斥量对象调用lock、unlock、lock_shared和unlock_shared,以及对 try_lock , try_lock_for , try_lock_until , try_lock_shared , try_lock_shared_for或 try_lock_shared_until的成功调用,会形成该互斥量的锁序。
  • 对给定的互斥量调用unlock,需要在调用lock或shared_lock,亦或是成功调用try_lock , try_lock_for, try_lock_until, try_lock_shared, try_lock_shared_for或try_lock_shared_until之后,才符合互斥量的锁序。
  • 对try_lock,try_lock_for,try_lock_until,try_lock_shared,try_lock_shared_for或try_lock_shared_until 失败的调用,不具有任何同步关系。

std::shared_mutex和std::shared_timed_mutex

  • 成功的调用std::promise对象的set_value或set_exception与成功的调用wait或get之间同步,或是调用wait_for或wait_until的返回例如future状态std::future_status::ready与promise共享同步状态。
  • 给定std::promise对象的析构函数,该对象存储了一个std::future_error异常,成功的调用wait或get后,共享同步状态与promise之间的同步,或是调用wait_for或wait_until返回的future状态std::future_status::ready时,与promise共享同步状态。

std::packaged_task , std::future和std::shared_future

  • 成功的调用std::packaged_task对象的函数操作符与成功的调用wait或get之间同步,或是调用wait_for或wait_until的返回future状态std::future_status::ready与打包任务共享同步状态。
  • std::packaged_task对象的析构函数,该对象存储了一个std::future_error异常,其共享同步状态与打包任务之间的同步在于成功的调用wait或get,或是调用wait_for或wait_until返回的future状态std::future_status::ready与打包任务共享同步状态。

std::async , std::future和std::shared_future

  • 使用std::launch::async策略性的通过std::async启动线程执行任务与成功的调用wait和get之间是同步的,或调用wait_for或wait_until返回的future状态std::future_status::ready与产生的任务共享同步状态。
  • 使用std::launch::deferred策略性的通过std::async启动任务与成功的调用wait和get之间是同步的,或调用wait_for或wait_until返回的future状态std::future_status::ready与promise共享同步状态。

std::experimental::future , std::experimental::shared_future和持续性

  • 异步共享状态变为就绪的事件与该共享状态上调度延续函数的调用同步。
  • 持续性函数的完成与成功调用wait或get的返回同步,或调用wait_for或wait_until返回的期望值状态std::future_status::ready与调用then构建的持续性返回的future同步,或是与在调度用使用这个future的操作同步。

std::experimental::latch

  • 对std::experimental::latch实例调用count_down或count_down_and_wait与在该对象上成功的调用wait或count_down_and_wait之间是同步的。

std::experimental::barrier

  • 对std::experimental::barrier实例调用arrive_and_wait或arrive_and_drop与在该对象上随后成功完成的arrive_and_wait之间是同步的。

std::experimental::flex_barrier

  • 对std::experimental::flex_barrier实例调用arrive_and_wait或arrive_and_drop与在该对象上随后成功完成的arrive_and_wait之间是同步的。
  • 对std::experimental::flex_barrier实例调用arrive_and_wait或arrive_and_drop与在该对象上随后完成的给定函数之间是同步的。
  • 对std::experimental::flex_barrier实例的给定函数的返回与每次对arrive_and_wait的调用同步,当调用给定函数线程会在栅栏处阻塞等待。

std::condition_variable和std::condition_variable_any

  • 条件变量不提供任何同步关系,它们是对忙等待的优化,所有同步都由互斥量提供。