Ruby 关于 Ruby 线程之间控制的一个问题

long_chn · 2013年10月15日 · 最后由 long_chn 回复于 2015年09月15日 · 5292 次阅读

线程 A 中有个数组,其余有多个 B,C 线程,同时并发,想要线程 A 中的数组遍历的时候,每个元素能在其它线程中同步输出,也就是每从线程 A 中的数组中弹出一个元素,其余线程就能把这个元素取出来,依次逐步迭代 基础代码如下:

thread = []
arr = [1,2,3,4,5]
thread << Thread.new{
  arr.each{|i|
     @value =i
}
}
thread << Thread.new{
p    @value
}
thread << Thread.new{
p    @value
}
thread.each{|t|t.join}

需要在这些代码上面加上控制结构,完成上面的需求,期待大牛!

这样不是顺序么,你多线程的目的是什么?

#1 楼 @jjym 我 A 线程的数组迭代的时候每个元素,会等待其他线程都读取完这个元素后,进行下一次迭代,直到数组遍历完全,也就是这三个线程中的值始终是相等的

用 Mutex 加锁,每个 thread pop array 的内容,要不无法保证运行顺序

用一个 Queue 不行么?

哥们,你这个需求不就是生产者 - 消费者模型吗,用队列吧。 我的这个视频里也介绍过:Ruby 的线程, http://edu.51cto.com/lesson/id-10742.html ,免费的噢。

不想用队列就用条件变量来同步吧。

#2 楼 @long_chn 同时把元素 push 到三个 queue 就行了

require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
arr = [1, 2, 3, 4, 5, 6, 7, 8]
thread =[]
thread<< Thread.new do
  mutex.synchronize do
    arr.each { |a|
      @value = a
      cv.broadcast
      cv.wait(mutex)
    }
  end
end

thread<< Thread.new do
    mutex.synchronize do
      while  thread[0].alive?
        puts  @value
      cv.wait(mutex)
    end
  end
end
thread<< Thread.new do
    mutex.synchronize do
      while  thread[0].alive?
        puts  @value
      cv.signal  if Thread.list.pop==Thread.current
      cv.wait(mutex)
    end
  end
end
thread<< Thread.new do
    mutex.synchronize do
      while  thread[0].alive?
      puts  @value
      cv.signal  if Thread.list.pop==Thread.current
      cv.wait(mutex)
    end
  end
end
sleep 2

########################################
1
1
1
2
2
2
3
3
3
4
4
4
5
5
5
6
6
6
7
7
7
8
8
8

Process finished with exit code 0

#7 楼 @long_chn 你这个代码不稳定,在不同 ruby 版本上得到了不同的结果。这段代码在 Ruby 2.0.0 版本上的稳定性最好,运行十次平均有 7 次能得到预期结果,三次不行。2.2.0 版本得到的结果一次都不满足预期,具体的测试代码见此: https://github.com/crhan/ruby_condition_variable_test

ci 结果见此: Build Status

#8 楼 @ruohanc 这代码写得有点绕... 按照需求来说,其实是协作调度或者是 reactor 的模式

def main_method
  3.times do |i|
    @thread << Fiber.new do
      while true
        @arr_mutex << @value
        Fiber.yield
      end
    end
  end

  @arr.each do |a|
    @value = a
    @thread.each do |t|
      t.resume
    end
  end
end

#9 楼 @luikore 你这个写法确实一下就看懂了,不过我的本意是想学习下 ConditionVariable 的用法才找到的这个帖子。。。。现在我还是对这个类有点摸不着头脑,不知道啥时候用得上,也不知道为什么 connection_pool 这个库会用这个玩意..

嗯。。然后你这个代码果断在所有版本上 Pass 了,除了 1.8.7 没有 Fiber 这个玩意以外: https://travis-ci.org/crhan/ruby_condition_variable_test/builds/80104965

#9 楼 @luikore 其实 Fiber 就是一个 coroutine,或者叫做 generator 也可以。这不是 reactor 吧。

#10 楼 @ruohanc 用 ConditionVariable 的这种方式其实统称为 Monitor。 https://en.wikipedia.org/wiki/Monitor_synchronization)( Monitor 可以看做是一个数据结构,用来处理同步问题,内部可以用 Semaphore 实现。Condition Variable 拥有一个 condition queue 保存正在 wait 这个 cond 的线程,如果 signal 这个 cond 的话,就会在下次调度中运行这个 queue 中的线程。

#10 楼 @ruohanc

ConditionVariable 应该和 pthread_cond_wait(), pthread_cond_signal(), pthread_cond_broadcast() 行为一致才对...

Ruby 2.1 将 ConditionVariable 的实现改成了原生实现,可能是这个修改导致了行为不一致? JRuby 9000 直接抛错了所以感觉暂时还不太该用...

@ruohanc

condition variable 的 using pattern 是

mutex.lock
while <<some-condition>>
  cv.wait mutex
end
<<do-something>>
mutex.unlock

主要效用是 broadcast

对于这个一个生产者 - 多消费者问题,得用两个 condition variable, 一个生产者等待 empty_cv, 消费者等待 fill_cv

def main_method
  mutex = Mutex.new
  fill_cv = ConditionVariable.new
  empty_cv = ConditionVariable.new
  consumer_count = 3
  product_count = 0
  value = nil

  producer = Thread.new do
    @arr.each { |a|
      mutex.synchronize do
        while product_count > 0
          empty_cv.wait mutex
        end
        value = a
        product_count = consumer_count
        fill_cv.broadcast
      end
    }
  end

  consumers = consumer_count.times.map do
    Thread.new do
      while producer.alive?
        mutex.synchronize do
          while product_count == 0
            fill_cv.wait mutex
          end
          @arr_mutex << value
          product_count -= 1
          empty_cv.signal
        end
      end
    end
  end

  empty_cv.signal
  consumers.each &:join
end

#10 楼 @ruohanc

7 楼代码的问题不稳定的原因在于cv.signal if Thread.list.pop==Thread.current 这行的逻辑假定了,Thread.list 的最后一个 thread 是每次循环里最后一个得到锁的。

不考虑代码很绕的问题,只是保证结果稳定的话,改个几行就出来了

require 'minitest/autorun'
require 'thread'

class ConditionVarTest < Minitest::Test

  def main_method

    @thread<< Thread.new do
      @mutex.synchronize do
        @arr.each { |a|
          @value = a
          @runned = 0
          @cv.broadcast
          @cv.wait(@mutex)
        }
      end
    end

    @thread<< Thread.new do
      @mutex.synchronize do
        while @thread[0].alive?
          if @value
            @arr_mutex << @value
            @runned += 1
            @cv.signal if @runned == 3
          end
          @cv.wait(@mutex)
        end
      end
    end

    @thread<< Thread.new do
      @mutex.synchronize do
        while @thread[0].alive?
          if @value
            @arr_mutex << @value
            @runned += 1
            @cv.signal if @runned == 3
          end
          @cv.wait(@mutex)
        end
      end
    end
    @thread<< Thread.new do
      @mutex.synchronize do
        while @thread[0].alive?
          if @value
            @arr_mutex << @value
            @runned += 1
            @cv.signal if @runned == 3
          end
          @cv.wait(@mutex)
        end
      end
    end
    sleep 0.5

  end

  def setup
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @arr = [1, 2, 3, 4, 5, 6, 7, 8]
    @thread =[]
    @arr_mutex = []
    @expected_arr = @arr.reduce([]) { |memo, obj| 3.times { memo << obj }; memo }
  end

  def test_1
    main_method
    assert_equal(@arr_mutex, @expected_arr)
  end
end

#8 楼 @ruohanc 这么老的帖子都被挖出来了。。 当时的本意是想通过一个线程顺序的分发数据给多个线程,实际操作结果来看,正如你所说不稳定而且互相等待性能很差,后面改用 C 实现的

需要 登录 后方可回复, 如果你还没有账号请 注册新账号