Ruby 使用 Fiber 实现简单的 CSP (Goroutine channel)

yfractal · 2019年01月24日 · 2314 次阅读

Don't communicate by sharing memory; share memory by communicating.

并发有一个非常常见的问题 ---- 生产者/消费者问题,CSP 可以很很直观的解决这个问题。

Clojure Example

简单来说,Goroutine 通过创建 channel 来进行数据交互,生产者往 channel 里塞消息,消费者消费消息。

当 channel 为空的时候,消费者会“等待”生产者产生数据。

(ns example
  (:require [clojure.core.async
             :refer [chan <! >!! go]]))

(def ch (chan))

(go
  (let [x (<! ch)
        y (<! ch)]
    (println "Sum: " (+ x y))))

channels.core=> (>!! ch 3)
nil
channels.core=> (>!! ch 4)
nil
Sum: 7

我们创建 channel 后,从 channel 内拿出两个元素并相加。但在取元素的时候,channel 里并没有数据。

在我们往 channel 里塞入数据后,go block 内的代码才被执行。

由此我们可以看出,在 go block 内 <! 会等待 channel 数据塞入,且不会阻塞当前的线程,类似 callback。

但 channel 改变了代码运行的顺序,并不会向 callback 一样带来维护和理解成本。

我们可以把上面的代码改成 callback 的形式。

(take-one-item
  ch
  (fn [x]
    (take-one-item
      ch
      (fn [y]
        (println "Sum: " (+ x y))))))

Fiber 简介

Fiber 一般被翻译为纤程,可以理解为协程,用非抢占式来做纤程间切换。

Fiber 用 Fiber.yield 主动让出执行权,使用 resume 再次唤醒这个 fiber,从而改变代码的执行顺序。

由于 Fiber 这个特性,Fiber 可以做很多有趣的事情,比如 Midori 就用 Fibefr 实现了一个优雅的异步 web 框架。再比如,可以用 Fiber 实现 Actor 模型。

使用 Fiber 实现 Goroute

我们现在用 Fiber 实现一个简单的 one-to-one goroute channel。

Fiber 通过 fiber = Fiber.new 进行创建,在 fiber 内,可以通过 Fiber.yield 让出执行权,之后可通过 fiber.resume 唤醒。

我们再来看 goroutine 的 channel,在 go block 内调用 <! 方法时,如果 channel 里没有数据,则后面的代码不会被执行,等有数据写入,才会执行 <! 后面的代码。

所以,我们需要一个 go 方法来创建 Fiber,一个 Chanel 类处理 channel 相关状态及操作。用 Channel#take 取元素,Channel#add 存元素。

调用 Channel#take 时,需要将 go 创建的 fiber 传到 channel 里,当有元素塞入的时候,用以唤醒。

代码如下:

def take
  @go_fiber = Fiber.yield unless @go_fiber
end

def go(&block)
  fiber = Fiber.new &block
  # 唤起 fiber
  fiber.resume
  # 设置 fiber
  fiber.resume(fiber)
end

Channel#take 需要处理两种情况,如果 @queue 已有元素,直接将元素返回,如果没有元素,则让出执行权。

代码如下:

def take
  @go_fiber = Fiber.yield unless @go_fiber

  if @queue.empty?
    @waiting = true
    Fiber.yield
  else
    @queue.shift
  end
end

Channel#add 也很简单,先将元素入队,如果有 take 方法在等待元素,则将第一个元素传出去即可。 代码如下:

def add(e)
  @queue << e

  if @waiting
    @waiting = false
    @go_fiber.resume(@queue.shift)
  end
end

完整的代码

参考

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请 注册新账号