Ruby Understand Non-blocking Michael-Scott queue algorithm as a concurrency beginner

yfractal · 2019年02月04日 · 2421 次阅读

最近看了 Non-blocking Michael-Scott queue algorithm,介绍了一个 lock free queue 算法,有一页 slide 是这样的:

这篇文章就尝试从 beginner 的角度来理解这个 lock free queue。

这偏文章主要会尝试讲清楚并发问题和这个算法的原理。

Concurrency Counter

我们先来看并发问题是怎么产生的。

假设有一个人,这个人只能记住一个数字,别人可以问他这个数字是什么,也可以让他记住另一个数字。我们把这个人称为 S

这个时候,另外一个人 M,需要计数,每次都问 S,现在是几,然后在此基础上加 1,并让 S 记住这个结果。

比如:

M: 现在是几?
S: 1
M: 存 2

M: 现在是几?
S: 2
M: 存 3

目前 M 这个计数器工作完全正常。

这个时候,M1 加入了数数的行列。情况就会就变成了这样。

M: 现在是几?
S: 1

M1: 现在是几?
S: 1

M: 存 2
M1: 存 2

M: 现在是几?
S: 2

我们看到,M,M1 分别作了一次 + 1 的操作,但最后存的却是 2。显然这是有问题的。

只要不让 M、M1 同时读、存,就可以解决这个问题。

Locking Solution

为了不让 S 被多个人同时读取,我们把 S 放到一个带锁的房间里, 如果 M、M1 想问 S 问题,就需要先看门有没有上锁,没有锁上,就进入,并锁上锁,避免其他人再进来。

场景就变成了这样:

M 看到门没有锁,M 进门
M: 现在是几?
S: 1

M1 看到锁已经锁了,决定却睡觉,一会再来。

M: 存 2
M 把锁打开,出门

M1 醒来,看到门没有锁,进门

M1: 现在是几?
S: 2

锁避免了多个线程对同一个数据进行操作。

Compare and Set

从上面的场景,我们可以看到,如果 M、M1 发现 S 被锁上了,就会去睡觉,一会锁被打开了,才会被叫醒,这个过程相对来讲,十分耗时。

我们其实可以换一种方式来叫 S 存数据。

我们可以问 S,现在是 1 吗?是的话,请存 2,不是的话,请告诉我。

那么场景就是这样了:

M: 现在是几?
S: 1

M1: 现在是几?
S: 1

M: 现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
S: 是

M1: 现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
S: 不是

M1: 现在是几?
S: 2

M1: 现在是 2 吗?是的话,请存 3,不是的话,请告诉我。
S: 是

我们看到,最后得到了我们期望的结果 3。

用代码实现的话就是:

require "concurrent"
@counter = Concurrent::AtomicFixnum.new(0)

def inc
 current_val = @counter.value

 while @counter.compare_and_set(current_val, current_val + 1) != true
   current_val = @counter.value
 end
end

compare_and_set 的能力一般都是由硬件提供,硬件会保证这个操作的原子性,也就是 compare 和 set 这两个操作,要么都完成,要么都失败。

也就是 M1 完成 compare_and_set 之后,M2 才能做 compare_and_set 这个操作,这样就保证了数据的正确性。

Michael-Scott queue

理解 compare_and_set 的原理 Michael-Scott queue 也就非常好理解了。

首先,我们用链表来实现这个队列

class Node
  def initialize(item, successor)
    @item          = item
    self.successor = successor
  end
end

class LockFreeQueue
  def initialize
    dummy_node = Node.new(:dummy, nil)

    self.head = dummy_node
    self.tail = dummy_node
  end
end

入队操作就是

def push(item)
  new_node = Node.new(item, nil)

  tail.successor = new_node
  self.tail = new_node
end

当然,这种在多线程的情况下,是有问题的。我们按照 counter 的思路改造一下。

def push(item)
  new_node = Node.new(item, nil)
  while true
    # 读取当前的 tail
    current_tail = tail

    # 如果当前的 tail,如果 tail 的 successor 是 nil 的话,则将其设置为新节点
    if current_tail.compare_and_set_successor(nil, new_node)
      self.tail = new_node

      return true
    else
      current_tail = tail
    end
  end
end

现在,我们再来看 concurrent-ruby-edge 的实现。

def push(item)
  # allocate a new node with the item embedded
  new_node = Node.new(item, nil)

  # keep trying until the operation succeeds
  while true
    current_tail_node      = tail
    current_tail_successor = current_tail_node.successor

    # if our stored tail is still the current tail
    if current_tail_node == tail
      # if that tail was really the last node
      if current_tail_successor.nil?
        # if we can update the previous successor of tail to point to this new node
        if current_tail_node.compare_and_set_successor(nil, new_node)
          # then update tail to point to this node as well
          compare_and_set_tail(current_tail_node, new_node)
          # and return
          return true
          # else, start the loop over
        end
      else
        # in this case, the tail ref we had wasn't the real tail
        # so we try to set its successor as the real tail, then start the loop again
        compare_and_set_tail(current_tail_node, current_tail_successor)
      end
    end
  end
end

Why we need lock free queue?

mutex 会进入 kernel mode,如果有其他线程占用锁,则需要挂起当前线程,之后还要唤醒该线程,这些都是耗时的操作。mutex 更适合需要长时间锁的情况,比如 IO 操作。

我们还可以用 compare_and_set 来实现锁,这种锁被称为 spinlock,这种锁的特点是,如果发现被锁住,就会反复尝试直到获得锁为止。具体的可以看 自旋锁 spinlock 及 nginx 的实现使用 ruby 的一个简单实现

我们看到 spinlock 和 lock free queue 实现十分相似,如果我们使用 spinlock 替换掉 mutex 来实现队列,会怎样?

下一篇文章会尝试分析两种实现,并在多核 CPU 下进行比较。

链接

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请 注册新账号