最近看了 Non-blocking Michael-Scott queue algorithm,介绍了一个 lock free queue 算法,有一页 slide 是这样的:
这篇文章就尝试从 beginner 的角度来理解这个 lock free queue。
这偏文章主要会尝试讲清楚并发问题和这个算法的原理。
我们先来看并发问题是怎么产生的。
假设有一个人,这个人只能记住一个数字,别人可以问他这个数字是什么,也可以让他记住另一个数字。我们把这个人称为 S
这个时候,另外一个人 M,需要计数,每次都问 S,现在是几,然后在此基础上加 1,并让 S 记住这个结果。
比如:
M: 现在是几?
S: 1
M: 存 2
M: 现在是几?
S: 2
M: 存 3
目前 M 这个计数器工作完全正常。
这个时候,M1 加入了数数的行列。情况就会就变成了这样。
M: 现在是几?
S: 1
M1: 现在是几?
S: 1
M: 存 2
M1: 存 2
M: 现在是几?
S: 2
我们看到,M,M1 分别作了一次 + 1 的操作,但最后存的却是 2。显然这是有问题的。
只要不让 M、M1 同时读、存,就可以解决这个问题。
为了不让 S 被多个人同时读取,我们把 S 放到一个带锁的房间里, 如果 M、M1 想问 S 问题,就需要先看门有没有上锁,没有锁上,就进入,并锁上锁,避免其他人再进来。
场景就变成了这样:
M 看到门没有锁,M 进门
M: 现在是几?
S: 1
M1 看到锁已经锁了,决定却睡觉,一会再来。
M: 存 2
M 把锁打开,出门
M1 醒来,看到门没有锁,进门
M1: 现在是几?
S: 2
锁避免了多个线程对同一个数据进行操作。
从上面的场景,我们可以看到,如果 M、M1 发现 S 被锁上了,就会去睡觉,一会锁被打开了,才会被叫醒,这个过程相对来讲,十分耗时。
我们其实可以换一种方式来叫 S 存数据。
我们可以问 S,现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
那么场景就是这样了:
M: 现在是几?
S: 1
M1: 现在是几?
S: 1
M: 现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
S: 是
M1: 现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
S: 不是
M1: 现在是几?
S: 2
M1: 现在是 2 吗?是的话,请存 3,不是的话,请告诉我。
S: 是
我们看到,最后得到了我们期望的结果 3。
用代码实现的话就是:
require "concurrent"
@counter = Concurrent::AtomicFixnum.new(0)
def inc
current_val = @counter.value
while @counter.compare_and_set(current_val, current_val + 1) != true
current_val = @counter.value
end
end
compare_and_set 的能力一般都是由硬件提供,硬件会保证这个操作的原子性,也就是 compare 和 set 这两个操作,要么都完成,要么都失败。
也就是 M1 完成 compare_and_set 之后,M2 才能做 compare_and_set 这个操作,这样就保证了数据的正确性。
理解 compare_and_set 的原理 Michael-Scott queue 也就非常好理解了。
首先,我们用链表来实现这个队列
class Node
def initialize(item, successor)
@item = item
self.successor = successor
end
end
class LockFreeQueue
def initialize
dummy_node = Node.new(:dummy, nil)
self.head = dummy_node
self.tail = dummy_node
end
end
入队操作就是
def push(item)
new_node = Node.new(item, nil)
tail.successor = new_node
self.tail = new_node
end
当然,这种在多线程的情况下,是有问题的。我们按照 counter 的思路改造一下。
def push(item)
new_node = Node.new(item, nil)
while true
# 读取当前的 tail
current_tail = tail
# 如果当前的 tail,如果 tail 的 successor 是 nil 的话,则将其设置为新节点
if current_tail.compare_and_set_successor(nil, new_node)
self.tail = new_node
return true
else
current_tail = tail
end
end
end
现在,我们再来看 concurrent-ruby-edge 的实现。
def push(item)
# allocate a new node with the item embedded
new_node = Node.new(item, nil)
# keep trying until the operation succeeds
while true
current_tail_node = tail
current_tail_successor = current_tail_node.successor
# if our stored tail is still the current tail
if current_tail_node == tail
# if that tail was really the last node
if current_tail_successor.nil?
# if we can update the previous successor of tail to point to this new node
if current_tail_node.compare_and_set_successor(nil, new_node)
# then update tail to point to this node as well
compare_and_set_tail(current_tail_node, new_node)
# and return
return true
# else, start the loop over
end
else
# in this case, the tail ref we had wasn't the real tail
# so we try to set its successor as the real tail, then start the loop again
compare_and_set_tail(current_tail_node, current_tail_successor)
end
end
end
end
mutex
会进入 kernel mode
,如果有其他线程占用锁,则需要挂起当前线程,之后还要唤醒该线程,这些都是耗时的操作。mutex
更适合需要长时间锁的情况,比如 IO 操作。
我们还可以用 compare_and_set 来实现锁,这种锁被称为 spinlock,这种锁的特点是,如果发现被锁住,就会反复尝试直到获得锁为止。具体的可以看 自旋锁 spinlock 及 nginx 的实现,使用 ruby 的一个简单实现。
我们看到 spinlock 和 lock free queue 实现十分相似,如果我们使用 spinlock 替换掉 mutex 来实现队列,会怎样?
下一篇文章会尝试分析两种实现,并在多核 CPU 下进行比较。