Don't communicate by sharing memory; share memory by communicating.
并发有一个非常常见的问题 ---- 生产者/消费者问题,CSP 可以很很直观的解决这个问题。
简单来说,Goroutine 通过创建 channel 来进行数据交互,生产者往 channel 里塞消息,消费者消费消息。
当 channel 为空的时候,消费者会“等待”生产者产生数据。
(ns example
(:require [clojure.core.async
:refer [chan <! >!! go]]))
(def ch (chan))
(go
(let [x (<! ch)
y (<! ch)]
(println "Sum: " (+ x y))))
channels.core=> (>!! ch 3)
nil
channels.core=> (>!! ch 4)
nil
Sum: 7
我们创建 channel 后,从 channel 内拿出两个元素并相加。但在取元素的时候,channel 里并没有数据。
在我们往 channel 里塞入数据后,go block 内的代码才被执行。
由此我们可以看出,在 go
block 内 <!
会等待 channel 数据塞入,且不会阻塞当前的线程,类似 callback。
但 channel 改变了代码运行的顺序,并不会向 callback 一样带来维护和理解成本。
我们可以把上面的代码改成 callback 的形式。
(take-one-item
ch
(fn [x]
(take-one-item
ch
(fn [y]
(println "Sum: " (+ x y))))))
Fiber 一般被翻译为纤程,可以理解为协程,用非抢占式来做纤程间切换。
Fiber 用 Fiber.yield
主动让出执行权,使用 resume
再次唤醒这个 fiber,从而改变代码的执行顺序。
由于 Fiber 这个特性,Fiber 可以做很多有趣的事情,比如 Midori 就用 Fibefr 实现了一个优雅的异步 web 框架。再比如,可以用 Fiber 实现 Actor 模型。
我们现在用 Fiber 实现一个简单的 one-to-one goroute channel。
Fiber 通过 fiber = Fiber.new
进行创建,在 fiber 内,可以通过 Fiber.yield
让出执行权,之后可通过 fiber.resume
唤醒。
我们再来看 goroutine 的 channel,在 go block 内调用 <!
方法时,如果 channel 里没有数据,则后面的代码不会被执行,等有数据写入,才会执行 <!
后面的代码。
所以,我们需要一个 go
方法来创建 Fiber,一个 Chanel 类处理 channel 相关状态及操作。用 Channel#take
取元素,Channel#add
存元素。
调用 Channel#take 时,需要将 go
创建的 fiber 传到 channel
里,当有元素塞入的时候,用以唤醒。
代码如下:
def take
@go_fiber = Fiber.yield unless @go_fiber
end
def go(&block)
fiber = Fiber.new &block
# 唤起 fiber
fiber.resume
# 设置 fiber
fiber.resume(fiber)
end
Channel#take
需要处理两种情况,如果 @queue
已有元素,直接将元素返回,如果没有元素,则让出执行权。
代码如下:
def take
@go_fiber = Fiber.yield unless @go_fiber
if @queue.empty?
@waiting = true
Fiber.yield
else
@queue.shift
end
end
Channel#add
也很简单,先将元素入队,如果有 take
方法在等待元素,则将第一个元素传出去即可。
代码如下:
def add(e)
@queue << e
if @waiting
@waiting = false
@go_fiber.resume(@queue.shift)
end
end