线程 A 中有个数组,其余有多个 B,C 线程,同时并发,想要线程 A 中的数组遍历的时候,每个元素能在其它线程中同步输出,也就是每从线程 A 中的数组中弹出一个元素,其余线程就能把这个元素取出来,依次逐步迭代 基础代码如下:
thread = []
arr = [1,2,3,4,5]
thread << Thread.new{
arr.each{|i|
@value =i
}
}
thread << Thread.new{
p @value
}
thread << Thread.new{
p @value
}
thread.each{|t|t.join}
需要在这些代码上面加上控制结构,完成上面的需求,期待大牛!
哥们,你这个需求不就是生产者 - 消费者模型吗,用队列吧。 我的这个视频里也介绍过:Ruby 的线程, http://edu.51cto.com/lesson/id-10742.html ,免费的噢。
不想用队列就用条件变量来同步吧。
require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
arr = [1, 2, 3, 4, 5, 6, 7, 8]
thread =[]
thread<< Thread.new do
mutex.synchronize do
arr.each { |a|
@value = a
cv.broadcast
cv.wait(mutex)
}
end
end
thread<< Thread.new do
mutex.synchronize do
while thread[0].alive?
puts @value
cv.wait(mutex)
end
end
end
thread<< Thread.new do
mutex.synchronize do
while thread[0].alive?
puts @value
cv.signal if Thread.list.pop==Thread.current
cv.wait(mutex)
end
end
end
thread<< Thread.new do
mutex.synchronize do
while thread[0].alive?
puts @value
cv.signal if Thread.list.pop==Thread.current
cv.wait(mutex)
end
end
end
sleep 2
########################################
1
1
1
2
2
2
3
3
3
4
4
4
5
5
5
6
6
6
7
7
7
8
8
8
Process finished with exit code 0
#7 楼 @long_chn 你这个代码不稳定,在不同 ruby 版本上得到了不同的结果。这段代码在 Ruby 2.0.0 版本上的稳定性最好,运行十次平均有 7 次能得到预期结果,三次不行。2.2.0 版本得到的结果一次都不满足预期,具体的测试代码见此: https://github.com/crhan/ruby_condition_variable_test
#9 楼 @luikore 你这个写法确实一下就看懂了,不过我的本意是想学习下 ConditionVariable 的用法才找到的这个帖子。。。。现在我还是对这个类有点摸不着头脑,不知道啥时候用得上,也不知道为什么 connection_pool 这个库会用这个玩意..
嗯。。然后你这个代码果断在所有版本上 Pass 了,除了 1.8.7 没有 Fiber 这个玩意以外: https://travis-ci.org/crhan/ruby_condition_variable_test/builds/80104965
#10 楼 @ruohanc 用 ConditionVariable 的这种方式其实统称为 Monitor。 https://en.wikipedia.org/wiki/Monitor_synchronization)( Monitor 可以看做是一个数据结构,用来处理同步问题,内部可以用 Semaphore 实现。Condition Variable 拥有一个 condition queue 保存正在 wait 这个 cond 的线程,如果 signal 这个 cond 的话,就会在下次调度中运行这个 queue 中的线程。
ConditionVariable 应该和 pthread_cond_wait(), pthread_cond_signal(), pthread_cond_broadcast() 行为一致才对...
Ruby 2.1 将 ConditionVariable 的实现改成了原生实现,可能是这个修改导致了行为不一致? JRuby 9000 直接抛错了所以感觉暂时还不太该用...
condition variable 的 using pattern 是
mutex.lock
while <<some-condition>>
cv.wait mutex
end
<<do-something>>
mutex.unlock
主要效用是 broadcast
对于这个一个生产者 - 多消费者问题,得用两个 condition variable, 一个生产者等待 empty_cv, 消费者等待 fill_cv
def main_method
mutex = Mutex.new
fill_cv = ConditionVariable.new
empty_cv = ConditionVariable.new
consumer_count = 3
product_count = 0
value = nil
producer = Thread.new do
@arr.each { |a|
mutex.synchronize do
while product_count > 0
empty_cv.wait mutex
end
value = a
product_count = consumer_count
fill_cv.broadcast
end
}
end
consumers = consumer_count.times.map do
Thread.new do
while producer.alive?
mutex.synchronize do
while product_count == 0
fill_cv.wait mutex
end
@arr_mutex << value
product_count -= 1
empty_cv.signal
end
end
end
end
empty_cv.signal
consumers.each &:join
end
7 楼代码的问题不稳定的原因在于cv.signal if Thread.list.pop==Thread.current
这行的逻辑假定了,Thread.list 的最后一个 thread 是每次循环里最后一个得到锁的。
不考虑代码很绕的问题,只是保证结果稳定的话,改个几行就出来了
require 'minitest/autorun'
require 'thread'
class ConditionVarTest < Minitest::Test
def main_method
@thread<< Thread.new do
@mutex.synchronize do
@arr.each { |a|
@value = a
@runned = 0
@cv.broadcast
@cv.wait(@mutex)
}
end
end
@thread<< Thread.new do
@mutex.synchronize do
while @thread[0].alive?
if @value
@arr_mutex << @value
@runned += 1
@cv.signal if @runned == 3
end
@cv.wait(@mutex)
end
end
end
@thread<< Thread.new do
@mutex.synchronize do
while @thread[0].alive?
if @value
@arr_mutex << @value
@runned += 1
@cv.signal if @runned == 3
end
@cv.wait(@mutex)
end
end
end
@thread<< Thread.new do
@mutex.synchronize do
while @thread[0].alive?
if @value
@arr_mutex << @value
@runned += 1
@cv.signal if @runned == 3
end
@cv.wait(@mutex)
end
end
end
sleep 0.5
end
def setup
@mutex = Mutex.new
@cv = ConditionVariable.new
@arr = [1, 2, 3, 4, 5, 6, 7, 8]
@thread =[]
@arr_mutex = []
@expected_arr = @arr.reduce([]) { |memo, obj| 3.times { memo << obj }; memo }
end
def test_1
main_method
assert_equal(@arr_mutex, @expected_arr)
end
end