Ruby Understand Non-blocking Michael-Scott queue algorithm as a concurrency beginner

yfractal · February 04, 2019 · 2365 hits

最近看了 Non-blocking Michael-Scott queue algorithm,介绍了一个 lock free queue 算法,有一页 slide 是这样的:

这篇文章就尝试从 beginner 的角度来理解这个 lock free queue。

这偏文章主要会尝试讲清楚并发问题和这个算法的原理。

Concurrency Counter

我们先来看并发问题是怎么产生的。

假设有一个人,这个人只能记住一个数字,别人可以问他这个数字是什么,也可以让他记住另一个数字。我们把这个人称为 S

这个时候,另外一个人 M,需要计数,每次都问 S,现在是几,然后在此基础上加 1,并让 S 记住这个结果。

比如:

M: 现在是几?
S: 1
M: 存 2

M: 现在是几?
S: 2
M: 存 3

目前 M 这个计数器工作完全正常。

这个时候,M1 加入了数数的行列。情况就会就变成了这样。

M: 现在是几?
S: 1

M1: 现在是几?
S: 1

M: 存 2
M1: 存 2

M: 现在是几?
S: 2

我们看到,M,M1 分别作了一次 + 1 的操作,但最后存的却是 2。显然这是有问题的。

只要不让 M、M1 同时读、存,就可以解决这个问题。

Locking Solution

为了不让 S 被多个人同时读取,我们把 S 放到一个带锁的房间里, 如果 M、M1 想问 S 问题,就需要先看门有没有上锁,没有锁上,就进入,并锁上锁,避免其他人再进来。

场景就变成了这样:

M 看到门没有锁,M 进门
M: 现在是几?
S: 1

M1 看到锁已经锁了,决定却睡觉,一会再来。

M: 存 2
M 把锁打开,出门

M1 醒来,看到门没有锁,进门

M1: 现在是几?
S: 2

锁避免了多个线程对同一个数据进行操作。

Compare and Set

从上面的场景,我们可以看到,如果 M、M1 发现 S 被锁上了,就会去睡觉,一会锁被打开了,才会被叫醒,这个过程相对来讲,十分耗时。

我们其实可以换一种方式来叫 S 存数据。

我们可以问 S,现在是 1 吗?是的话,请存 2,不是的话,请告诉我。

那么场景就是这样了:

M: 现在是几?
S: 1

M1: 现在是几?
S: 1

M: 现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
S: 是

M1: 现在是 1 吗?是的话,请存 2,不是的话,请告诉我。
S: 不是

M1: 现在是几?
S: 2

M1: 现在是 2 吗?是的话,请存 3,不是的话,请告诉我。
S: 是

我们看到,最后得到了我们期望的结果 3。

用代码实现的话就是:

require "concurrent"
@counter = Concurrent::AtomicFixnum.new(0)

def inc
 current_val = @counter.value

 while @counter.compare_and_set(current_val, current_val + 1) != true
   current_val = @counter.value
 end
end

compare_and_set 的能力一般都是由硬件提供,硬件会保证这个操作的原子性,也就是 compare 和 set 这两个操作,要么都完成,要么都失败。

也就是 M1 完成 compare_and_set 之后,M2 才能做 compare_and_set 这个操作,这样就保证了数据的正确性。

Michael-Scott queue

理解 compare_and_set 的原理 Michael-Scott queue 也就非常好理解了。

首先,我们用链表来实现这个队列

class Node
  def initialize(item, successor)
    @item          = item
    self.successor = successor
  end
end

class LockFreeQueue
  def initialize
    dummy_node = Node.new(:dummy, nil)

    self.head = dummy_node
    self.tail = dummy_node
  end
end

入队操作就是

def push(item)
  new_node = Node.new(item, nil)

  tail.successor = new_node
  self.tail = new_node
end

当然,这种在多线程的情况下,是有问题的。我们按照 counter 的思路改造一下。

def push(item)
  new_node = Node.new(item, nil)
  while true
    # 读取当前的 tail
    current_tail = tail

    # 如果当前的 tail,如果 tail 的 successor 是 nil 的话,则将其设置为新节点
    if current_tail.compare_and_set_successor(nil, new_node)
      self.tail = new_node

      return true
    else
      current_tail = tail
    end
  end
end

现在,我们再来看 concurrent-ruby-edge 的实现。

def push(item)
  # allocate a new node with the item embedded
  new_node = Node.new(item, nil)

  # keep trying until the operation succeeds
  while true
    current_tail_node      = tail
    current_tail_successor = current_tail_node.successor

    # if our stored tail is still the current tail
    if current_tail_node == tail
      # if that tail was really the last node
      if current_tail_successor.nil?
        # if we can update the previous successor of tail to point to this new node
        if current_tail_node.compare_and_set_successor(nil, new_node)
          # then update tail to point to this node as well
          compare_and_set_tail(current_tail_node, new_node)
          # and return
          return true
          # else, start the loop over
        end
      else
        # in this case, the tail ref we had wasn't the real tail
        # so we try to set its successor as the real tail, then start the loop again
        compare_and_set_tail(current_tail_node, current_tail_successor)
      end
    end
  end
end

Why we need lock free queue?

mutex 会进入 kernel mode,如果有其他线程占用锁,则需要挂起当前线程,之后还要唤醒该线程,这些都是耗时的操作。mutex 更适合需要长时间锁的情况,比如 IO 操作。

我们还可以用 compare_and_set 来实现锁,这种锁被称为 spinlock,这种锁的特点是,如果发现被锁住,就会反复尝试直到获得锁为止。具体的可以看 自旋锁 spinlock 及 nginx 的实现使用 ruby 的一个简单实现

我们看到 spinlock 和 lock free queue 实现十分相似,如果我们使用 spinlock 替换掉 mutex 来实现队列,会怎样?

下一篇文章会尝试分析两种实现,并在多核 CPU 下进行比较。

链接

No Reply at the moment.
You need to Sign in before reply, if you don't have an account, please Sign up first.