实现无锁算法的常见陷阱

翻译自:Common Pitfalls in Writing Lock-Free Algorithms

通常,只要两个操作之间的步骤是有穷的,一个多线程算法就认为可以实现为无锁(lock-free)。理论上无锁算法也早已被证明,看起来实现一个无锁算法也很简单。但其实不然,每一步都隐藏着陷阱:并发的线程可以修改共享的对象,甚至在执行一个操作时线程可以突然暂停或中止,而这是另一个线程当作好像若无其事。

线程同步是多线程程序设计的核心,传统的做法上就是代码临界区上加锁。锁可以防止多个线程同一时间进入临界区代码。在高度并发的程序里,锁可能成为严重的性能瓶颈。无锁编程的目标是不用锁也能解决并发问题。无锁编程一般依赖的是原子操作,比如“compare-and-swap”1原子的执行下面的操作:

1
2
3
4
5
6
7
8
1 bool CompareAndSwap(Value* addr, Value oldVal, Value newVal){
2     if(*addr == oldVal){
3         *addr = newVal;
4         return true;
5     }else{
6         return false;
7     }
8 }

使用无锁算法的最大缺陷是:

  • 无锁算法并不总是可实现的
  • 无锁算法的代码很难写
  • 写出正确的无锁算法代码更是难如登天

为了证明以上三点,我们来看一个错误实现的一个无锁栈(lock-free stack),可能大部分人第一次都会写出这样保护这些错误无锁栈。这个无锁栈算法主要是使用一个链表(linked-list)来存放节点,并用CompareAndSwap来修改链表的表头。 Push一个元素时,我们首先创建一个节点保存数据,并将这个节点设为栈顶,并使用CompareAndSwap将原栈顶指向新的元素。CompareAndSwap操作保证只有我们的新节点指向老的栈顶节点,才会替换老的栈顶(因为多线程可能改变老的栈顶)。当Pop一个元素时,我们快照当前的栈顶节点,然后替换当前的栈顶节点到下一个节点。我们再一次使用CompareAndSwap保证替换的节点等于快照的节点。

C++代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
 1 template <class Entry>
 2 class LockFreeStack{
 3     struct Node{
 4         Entry* entry;
 5         Node* next;
 6     };
 7
 8     Node* m_head;
 9
10     void Push(Entry* e){
11         Node* n = new Node;
12         n->entry = e;
13         do{
14             n->next = m_head;
15         }while(!CompareAndSwap(&m_head, n->next, n));
16     }
17
18     Entry* Pop(){
19         Node* old_head;
20         Entry* result;
21         do{
22             old_head = m_head;
23             if(old_head == NULL){
24                 return NULL;
25             }
26         }while(!CompareAndSwap(&m_head, old_head, old_head->next));
27
28         result = old_head->entry;
29         delete old_head;
30         return result;
31     }
32 }

遗憾的是,这个无锁栈充满的错误:

Segfault

Push操作分配内存保存节点信息,Pop操作释放这些内存。然而,线程T1在顺序执行22行和26行之间的时间里,另一个线程T2可能已经释放了这个节点,然后程序Crash了。

Corruption

仅仅对比新值与老值是否相等,CompareAndSwap方法并不能保证是否值发生了变化。假如快照在22行的值,被修改了,然后又被恢复了,然后CompareAndSwap会成功。这就是著名的ABA问题。假如栈中前两个节点是A和C,如果以下面的序列操作:

  • 线程1执行Pop,并在22行读到了m_head(A),在26行读到了old_head->next(C),然后突然阻塞在执行在CompareAndSwap之前。
  • 线程2执行pop,删掉节点A
  • 线程2调用push,push了一个新节点B
  • 线程2又调用一次push,这次push的新节点正好占用了原来节点A的内存。
  • 线程1被唤醒,调用CompareAndSwap

然后26行的CompareAndSwap会成功,虽然m_head已经被改变3次了,因为它只检测old_head是否等于m_head。这是有问题的,因为新的栈顶本应指向B,然而却指向了C

Not lock-free

C++标准并不保证new和delete是lock-free的。一个无锁的数据结构去调用非无锁的库函数不是什么好主意,所以我们需要一个无锁的内存分配子。

Data races

当一个线程向内存中写入数据,而另一个线程同时从相同的内存读数据时,所产生的结果是未定义的,除非使用std::atomic。读和写操作都必须是原子的。在C++11以前一个通用的方法是使用volatile关键字来生命原子变量,然而这个关键字有很大的缺陷

在我们的例子中,多个线程读栈顶指针可能会引起竞争,push和pop操作都有可能,因为其它线程可能在修改他。

Memory reordering

印象中,代码会按照我们指定的顺序执行,最少也会满足”happens before“关系。不幸的是,不管理论还是实际上,下面代码的执行可能出现x,y都是0的结果。:

C++11以前标准对于多线程是讳莫如深的,所以编译器的优化是着眼于单线程的。上面的代码,交换执行顺序,并不会影响单线程中程序的语义。所以可能会产生这种结果。

如何写正确的lock-free栈

上面大部分问题都有多种解决方案,这里我会把自己工作中使用的方法描述出来。

Segfault

解引用节点之前,必须确保该节点没有被删掉。每一个线程都有一个全局可见的”hazard pointer”。当访问一个节点之前,会先设置Hazard pointer执行这个节点。只要设置过Hazard pointer就可以保证这个节点此时还是栈顶节点。如果其它线程此时移除这个栈顶节点,要检测没有Hazard pointer指向这个节点才能清除节点的内存。

Corruption

解决ABA问题的一个方法是确保栈顶不会有同样的值两次。我们使用“tagged pointers”来确保栈头值的唯一。一个“tagged pointers”包含一个指针和64位计数器。每当栈顶变化,计数器加一。

Not lock-free

Data races

我们目前使用的是boost::atomic。现在我们使用gcc4.6也已经支持std::atomic,但实现的效率没有boost高。在gcc4.6中,所有需要原子操作的地方都被应用了memory barriers,即使本不必使用的地方。

Memory reordering

C++11为原子操作提供了一种新的内存模型和内存序语义,以解决乱序的问题。CompareAndSwap需要顺序一致性(sequentially consistent)的语义保证。顺序一致性意味着所有的线程以一种一致的次序执行操作。事实证明hazard pointers也一样需要顺序一致性保证内存语义。 如果不使用内存一致性,下面这种情况下会有问题:

  • 线程1准备Pop操作,读取了栈顶节点
  • 线程1将当前节点写到hazard pointer中
  • 线程1再次读取栈顶指针
  • 线程2将栈顶指针移走,并传递到垃圾收集线程
  • 垃圾收集器扫描所有的hazard pointer节点,因为没有顺序一致性,可能看不到线程1的hazard pointer已经指向了这个节点
  • 垃圾收集器删除了这个节点
  • 线程1解引用这个节点,然后程序Crash

而如果有顺序一致性应用到hazard pointer的赋值和节点的修改,竞争就不会发生了。因为任意两个操作,所有线程看到的顺序都是一样的。如果线程2先移除这个节点,那么线程1第二次读时会看到一个不同的节点,也就不会去解引用它。假如线程1先将节点写到hazard pointer中,则垃圾收集器肯定可以看到这个值而不会去删除它。

性能

到现在我们解决了所有的问题。让我们看一下性能。测试使用的是一台8核Intel® Xeon® 处理器。每个线程的工作是随机的执行数量几乎相等的Push和Pop操作。每个线程不加限制的执行机器可以处理的操作。

我们修改栈顶的次数越多,CompareAndSwap失败的次数也会越多。一个简单有效的减少失败的方法是失败后Sleep一下,这可以调节Stack可以高效的处理数据。下面是每次失败后Sleep(250)的数据:

太好了,增加Sleep后栈的吞吐量增加了7倍。并且Sleep减少的处理器的消耗。让我们看一下处理器的使用情况:

加锁的栈:

无锁的栈,不加Sleep:

无锁的栈,Sleep(250):

看起来无锁更好?等等,锁一样可以达到好的性能,我们不用std::mutex,我们使用Sleep(250)的自旋锁:

结果

大量数据时,额外的线程会降低吞吐量。Sleep可以降低操作冲突,增加吞吐量的同时减小处理器消耗。3个线程以上的性能没有变化。单线程是性能最佳的。

结论

无锁不会阻碍进度,但也并不会提高效率。当你想在你的项目中使用无锁算法时,切记要衡量值不值的-性能还有复杂度。

代码

加锁的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 1 #include <mutex>
 2 #include <stack>
 3
 4 template<class T>
 5 class LockedStack{
 6 public:
 7     void Push(T* entry){
 8         std::lock_guard<std::mutex> lock(m_mutex);
 9         m_stack.push(entry);
10     }
11
12     // For compatability with the LockFreeStack interface,
13     // add an unused int parameter.
14     //
15     T* Pop(int){
16         std::lock_guard<std::mutex> lock(m_mutex);
17         if(m_stack.empty()){
18             return nullptr;
19         }
20         T* ret = m_stack.top();
21         m_stack.pop();
22         return ret;
23     }
24
25 private:
26     std::stack<T*> m_stack;
27     std::mutex m_mutex;
28 };

Lock-Free的: (垃圾收集相关的代码没贴出来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
 1 class LockFreeStack{
 2 public:
 3     // The elements we wish to store should inherit Node
 4     //
 5     struct Node{
 6         boost::atomic<Node*> next;
 7     };
 8
 9     // Unfortunately, there is no platform independent way to
10     // define this class.  The following definition works in
11     // gcc on x86_64 architectures
12     //
13     class TaggedPointer{
14     public:
15         TaggedPointer(): m_node(nullptr), m_counter(0) {}
16
17         Node* GetNode(){
18             return m_node.load(boost::memory_order_acquire);
19         }
20
21         uint64_t GetCounter(){
22             return m_counter.load(boost::memory_order_acquire);
23         }
24
25         bool CompareAndSwap(Node* oldNode, uint64_t oldCounter, Node* newNode, uint64_t newCounter){
26             bool cas_result;
27             __asm__ __volatile__
28             (
29                 "lock;"           // This makes the following instruction atomic (it is non-blocking)
30                 "cmpxchg16b %0;"  // cmpxchg16b sets ZF on success
31                 "setz       %3;"  // if ZF set, set cas_result to 1
32
33                 : "+m" (*this), "+a" (oldNode), "+d" (oldCounter), "=q" (cas_result)
34                 : "b" (newNode), "c" (newCounter)
35                 : "cc", "memory"
36             );
37             return cas_result;
38         }
39     private:
40         boost::atomic<Node*> m_node;
41         boost::atomic<uint64_t> m_counter;
42     }
43     // 16-byte alignment is required for double-width
44     // compare and swap
45     //
46     __attribute__((aligned(16)));
47
48     bool TryPushStack(Node* entry){
49         Node* oldHead;
50         uint64_t oldCounter;
51
52         oldHead = m_head.GetNode();
53         oldCounter = m_head.GetCounter();
54         entry->next.store(oldHead, boost::memory_order_relaxed);
55         return m_head.CompareAndSwap(oldHead, oldCounter, entry, oldCounter + 1);
56     }
57
58     bool TryPopStack(Node*& oldHead, int threadId){
59         oldHead = m_head.GetNode();
60         uint64_t oldCounter = m_head.GetCounter();
61         if(oldHead == nullptr){
62             return true;
63         }
64         m_hazard[threadId*8].store(oldHead, boost::memory_order_seq_cst);
65         if(m_head.GetNode() != oldHead){
66             return false;
67         }
68         return m_head.CompareAndSwap(oldHead, oldCounter, oldHead->next.load(boost::memory_order_acquire), oldCounter + 1);
69     }
70
71     void Push(Node* entry){
72         while(true){
73             if(TryPushStack(entry)){
74                 return;
75             }
76             usleep(250);
77         }
78     }
79
80     Node* Pop(int threadId){
81         Node* res;
82         while(true){
83             if(TryPopStack(res, threadId)){
84                 return res;
85             }
86             usleep(250);
87         }
88     }
89
90 private:
91     TaggedPointer m_head;
92     // Hazard pointers are separated into different cache lines to avoid contention
93     //
94     boost::atomic<Node*> m_hazard[MAX_THREADS*8];
95 };

  1. 硬件相关,Windows已提供函数InterlockedCompareExchange

读《寻找家园》

诚然,这是一本文辞优美的自传集。自以为博学杂收,但此前竟从未听说过高尔泰,概和主流媒介打压有关。80后的我,有幸避过了那段岁月,所以对于书中,高先生凄苦的人生,只能报以同情,很难产生共鸣。高先生悲惨的际遇和余华《活着》中的福贵的一生很像,波诡云谲的时代,至亲“莫名其妙”的离去,种种不公平待遇,最后还活着,随波逐流。

但逆水的鱼,这是天生的命运。高先生最终去了标榜自由平等的米国,广阔天地,但难以大有作为,老来无依,故国不堪回首,所以整理出这寻找家园来。

读此书,看高尔泰传奇一生,随便拈来一件小事都是我们当代常人一生难逢的际遇,而且即使选中了你我,也并非你我可以承受。反观我们的生活,钢筋水泥的城市,一成不变的工作,三两个不勾心斗角的同事,做着不着调的工作,两点一线,直至老去,有幸的留下荒冢一堆,无运的骨灰都不是自己的!

都市中渺小的我,搞不清高尔泰追求的自由为何物,自由和活着孰重孰轻。年轻的高尔泰似乎没有追求自由的能力,只能任命运摆弄,想着往上爬的,却成了别人的梯子,80年代后,环境宽松,如果懂的迎合,以他的名望和学识,想必日子不会艰难,也不会老来流落米国。

在我看来,自由是,不妨碍他人的前提下,随意做自己想做的事情。所以活着是自由的前提,“不自由,毋宁死”,死了不还是得不到自由!如果高尔泰可以顺从,是否可以摆脱暴力的关照,安心做自己喜欢的事情呢,这不就实现了他的自由了吗?

这种想法有些小人之心度君子之腹了,高尔泰是不会仰人鼻息的!

关于重构的一些想法

很早之前听说过这样一个故事:

一个漂亮的女子与马戏团的小丑坠入爱河,并迅速结婚。小丑十分珍惜得来的幸福,努力挣钱,瞒着女子做了整容手术,为了给女子惊喜。整容后的小丑出现在女子眼前时,女子提出离婚,因为他喜欢的是原来长的并不帅的小丑。

一个项目重构的动机(目的)无外乎两种:1. 给用户带来更好的体验、2. 项目代码失控(难以维护、添加新特性)。重构必然导致变化,用户接受现在的产品,重构后的版本并不一定喜欢。至于极端,哪怕重构后的版本比老版本好的多,也会有用户高声喊:给我老版本。这和小丑的故事一样了,哪怕整容帅的像郭德纲一样,女子照样甩一甩衣袖走了。

现实生活中的重构往往是程序员自己发起的,Martin Fowler都有一本书起名为《重构》,并被许多程序员奉为圭臬。程序员的初衷是好的,但往往会低估重构的难度,项目越大耦合越多,往往牵一发而动全身,当老板跑过来问你进度时,你只能回以尴尬的笑,因为进度会进入无法掌控的地步。最近自己差点陷入这种进退维谷的境地,还好项目比较小,最后算是挺了过来。

但大的项目移筋动骨的重构就真的很难成功了。记得在核新软件的时候,项目庞大臃肿,添加新功能困难,软件的开发与执行效率都很低。记得一段时间内,大家重构的呼声很高,不过最后不了了之,因为工程浩大、重构后的兼容性、重构后软件多长时间可以稳定、重构后版本的测试等等需要考虑的各种问题已不单单是几个开发可以掌控的了的。而同时产品的需求还是不会减少,因为产品根本不关心软件的技术架构、可靠性、可维护性等等。

产品不关心重构是正常的。确实,一个可以正常运行,用户没有太多投诉、谨慎的编码也不会出太大问题的项目为什么要重构呢?假如某地发现了一个煤矿,第一批人来开采,一年时间采掉了50%的煤; 第二批人来了,煤已不那么好采,必须小心谨慎的对待,因为矿难不断,不过还好花了两年时间采到了30%的煤; 第三批人来了,矿上留下的全是前人留下的坑,无处落脚,须将前面的坑填掉,才能保证采得到煤,先花费了三年填坑,最后又花了一年时间采到了10%的煤。软件开发也一样,软件的生命期有限,是否需要花费人力将软件重构的完美值得商榷,可能重构完成之日,为项目终结之时,留下完美的架构与设计又有何用呢?

既然程序员呼吁重构的理由是,项目代码难以维护,那为什么不从一开始写出易维护的代码?

Linux下开启TRIM

新买得一块SSD,听说开启TRIM才能更好的发挥SSD的性能,Linux并没有默认开启TRIM,但开启还是比较简单。

首先要检测SSD是否支持TRIM:

1
sudo hdparm -I /dev/sda | grep "TRIM supported"

如果支持则会出现:”Data Set Management TRIM supported”

如果SSD支持TRIM, 则可以开启TRIM了, 这里介绍常用的两种方法。

方法1:修改fstab, 添加discard属性

1
sudo vim /etc/fstab

下面是我机器上fstab的配置

UUID=27dd31b4-8aa4-4043-b921-540a312c056c / ext4 rw,relatime,data=ordered,discard 0 1

UUID=42f79958-0776-4b2f-8aa3-db827bf257b6 /home ext4 rw,relatime,data=ordered,discard 0 2

方法2:使用fstrim定期执行trim任务

以我用的archlinux为例,首先安装并运行cron服务:

1
2
3
sudo pacman -S cronie
sudo systemctl start cronie
sudo systemctl enable cronie

然后创建一个任务配置:

1
sudo gedit /etc/cron.daily/trim

并将下面的内容拷贝到里面:

1
2
3
4
5
#!/bin/sh
LOG=/var/log/trim.log
echo "*** $(date -R) ***" >> $LOG
fstrim -v / >> $LOG
fstrim -v /home >> $LOG

参考:

  1. http://www.webupd8.org/2013/01/enable-trim-on-ssd-solid-state-drives.html

  2. https://wiki.archlinux.org/index.php/Solid_State_Drives

Dock项目个人总结

Dock说白了是电脑桌面上常驻的一个工作区,Dock的目的是占领用户的桌面,况且我们追赶的团队和追赶我们的团队都已做出Dock,所以应用宝必须也要做。关于应用宝Dock的具体进度、功能与界面不做介绍,现在从开发的角度讲一讲Dock项目两个月来的一点体会。

自己的角色:负责逻辑部分接口和整体框架的设计,及相关功能开发。

可圈可点之处:


1. 逻辑与界面完全分离

Dock项目应该是,我们团队第一次在多人协作下,采用了界面与逻辑分离的开发方案。这样的好处是:降低了耦合,使开发简单化(做UI的无需关心逻辑,反之依然)。当然这样做必然会增加一些工作量,尤其是后来界面和逻辑分属不同进程,逻辑的接口必须做IPC转发。

逻辑与界面分离的方案,最重要的是定义好接口。

2. 最小完备接口

最小完备接口是我一贯主张的一种接口风格,因为一旦增加冗余的接口,并被客户端代码使用,那就很难移除了,势必造成接口膨胀。项目期间,主管提出接口要提前考虑、功能要丰富的建议,不过自己还是坚持了自己想法。因为加一个接口容易,去掉一个就难了,好比韩愈曾说的,从善如登、从恶如崩。

败笔之处


1. 任务分配粗略

需求接下来,四个人开始做,一开始定下两个人做界面另两个做逻辑,但没有细化具体的产品细节、开发难度耗时等,也没有分配具体谁做什么事情,只是口头上大概分了一下。这极易导致任务分配不均和任务遗漏的现象。好的方法应该是,细化任务并明确责任,公诸于众。但团队积习如此,非朝夕可改。

2. 线程的滥用

线程滥用当然指逻辑部分。从进入应用宝团队初期,就发现代码中线程滥用,好似每天不开几个线程吃不下饭一般。所以最初Dock的逻辑代码的主框架考虑到了这点,以使后面添加具体业务时无需创建线程。但后期,需求的演变和功能的要求迫使不得不又增加了几个线程,其实有些线程是非必须的。而这些不必要的线程,往往会滋生一批Bug。