Ruby 浅谈 Ruby 中的并发, 并行和全局锁

xiongbo · 发布于 2017年4月29日 · 最后由 xiaoping_rubyist 回复于 2017年5月19日 · 3322 次阅读
1731
本帖已被设为精华帖!

最近在看《Working With Ruby Thread》这本书, 以下是我对前几章内容的一点总结 : )

并发不等于并行

几乎所有谈到并发和并行的文章都会提到一点: 并发并不等于并行。 那么如何理解这句话呢, 这里以餐馆下订单为例子进行说明:

并发: 同时有2桌客人点了菜, 厨师同时接收到了两个菜单
顺序执行: 如果只有一个厨师, 那么他只能一个菜单, 一个菜单的去完成
并行执行: 如果有两个厨师, 那么就可以并行, 两个人一起做菜

将这个例子扩展到我们的web开发中, 就可以这样理解:

并发:同时有两个客户端对服务器发起了请求
顺序执行: 服务器只有一个进程(线程)处理请求, 完成了第一个请求才能完成第二个请求, 所以第二个请求就需要等待。
并行执行: 服务器有两个进程(线程)处理请求, 两个请求都能得到响应, 而不存在先后的问题。

线程的处理

那么, ruby中如何描述一个并发的行为呢, 看这样一段代码:

threads = 3.times.map do 
  Thread.new do
    sleep 3 
  end
end
puts "不用等3秒就可以看到我"
threads.map(&:join)

Thread的创建是非阻塞的, 所以文字立即就可以输出, 这样就模拟了一个并发的行为。
接下来, 对代码做一点修改:

time = Time.now
threads = 3.times.map do 
  Thread.new do
    sleep 3 
  end
end
threads.map(&:join)
puts "现在需要等3秒才可以看到我"
p Time.now - time

当我们执行join的时候, 只有等到所有线程的任务都执行完成, 才会最后输出。 所以我们需要等待3秒才能看到输出的文字。

但是, 等等, 这里是不是就是实现了并行了呢?
从表面上来看是这样, 但是很遗憾, 这是一种伪并行, 我们再对代码做一点修改:

require 'benchmark'
def multiple_threads
  count = 0
  threads = 4.times.map do 
    Thread.new do
      2_500_000.times { count += 1}
    end
  end
  threads.map(&:join)
end

def single_threads
  time = Time.now
  count = 0
  Thread.new do
    10_000_000.times { count += 1}
  end.join
end

Benchmark.bm do |b|
  b.report { multiple_threads }
  b.report { single_threads }
end

  user     system      total        real
0.510000   0.000000   0.510000 (  0.508958)
0.500000   0.000000   0.500000 (  0.506755)

从这里可以看出, 即便我们将同一个任务分成了4个线程并行, 但是时间并没有减少, 这是为什么呢?

因为有GIL的存在!!!

全局锁

MRI, 也就是我们通常使用的ruby采用了一种称之为GIL的机制, 看看它的解释:

The GIL is a global lock around the execution of Ruby code

If one of those MRI processes spawns multiple threads, that group of threads will share the GIL for that process.

If one of these threads wants to execute some Ruby code, it will have to acquire this lock. One, and only one, thread can hold the lock at any given time. While one thread holds the lock, other threads need to wait for their turn to acquire the lock

--------- Working With Ruby Threads By Jesse Storimer -----------

也就是说, 即便我们希望使用多线程来实现代码的并行, 由于这个全局锁的存在, 每次只有一个线程能够执行代码,至于哪个线程能够执行, 这个取决于底层操作系统的实现。
即便我们拥有多个CPU, 也只是为每个线程的执行多提供了几个选择而已。

但是我们之前sleep的时候, 明明实现了并行啊!

这个就是Ruby设计高级的地方——所有的阻塞操作是可以并行的, 也就是说包括读写文件, 网络请求在内的操作都是可以并行的, 有代码为证:)

require 'benchmark'
require 'net/http'

def multiple_threads
  uri = URI("http://www.baidu.com")
  threads = 4.times.map do 
    Thread.new do
      25.times { Net::HTTP.get(uri) }
    end
  end
  threads.map(&:join)
end

def single_threads
  uri = URI("http://www.baidu.com")
  Thread.new do
    100.times { Net::HTTP.get(uri) }
  end.join
end

Benchmark.bm do |b|
  b.report { multiple_threads }
  b.report { single_threads }
end

  user     system      total        real
0.240000   0.110000   0.350000 (  3.659640)
0.270000   0.120000   0.390000 ( 14.167703)

那么, 既然有了这个锁的存在, 是否意味着我们的代码就是线程安全了呢? 很遗憾, 不是!因为我们无法控制什么时候操作系统会终止我们当前线程的执行, 并切换到另外一个线程上。

class MultipleThreadTest
  @n = 0
  def self.cal
    10000.times.map do
      Thread.start { @n += 1  }
    end
    @n
  end
end
p MultipleThreadTest.cal # 9584

对于n += 1这种非线程安全的代码, 即便有锁的存在, 依旧是不安全的。

最后, 我们用Sidekiq的作者Mike Perham的话来结束这篇入门文章:

As soon as you introduce the Thread constant, you've probably just introduced 5 new bugs into your code.

更多的内容请参考:《Working With Ruby Thread》这本书 : )

共收到 20 条回复
28316

结束语真是一把辛酸泪

De6df3 huacnlee 将本帖设为了精华贴 4月29日 23:01
16154

不错

273

被结束语“喷”笑。

11901

👍

96

如果用puma多线程来跑一个rails应用。假设该应用中有类似 n += 1 的代码。那会不会出现楼主提到的的错误呢?

845
32ningzhang422 回复

可能会,但很少出现。一般业务处理都是在一起请求内的,有状态也都在数据库里。除非用到了共享变量或者类变量。

8134
require 'thwait'

class MultipleThreadTest
  @n = 0

  def self.cal
    threads = 10000.times.map do
      Thread.start {
        @n += 1
      }
    end

    ThreadsWait.all_waits(*threads)
    @n
  end
end

p MultipleThreadTest.cal # 10000

出现 9584的原因,其实只是部分线程没有执行完,就输出结果了。如果等待所有线程执行完成,输出结果就是10000

1342

使用 java8 + jruby 9.1.7.0 运行如下代码

require "thwait"
require "java"

java_import "java.util.concurrent.atomic.LongAdder"

class MultipleThreadTest
  @n = LongAdder.new

  def self.cal
    threads = 10000.times.map do
      Thread.start {
        @n.increment
      }
    end

    ThreadsWait.all_waits(*threads)
    @n.sum
  end
end

p MultipleThreadTest.cal # 10000

可以得到 10000 这个正确结果哦

96

共享变量的问题 多个线程共用一个变量 这个过程是不可预测的 要想不出错貌似只能读不能写 除非能做到线程之间同步和互斥 要不以后可能就会出现一堆bug

15420
32hjf_coding 回复

存redis,利用redis的原子性

96
15420pathbox 回复

redis现在正在看 redis的原子操作只能保证 写是唯一的 最终的值是我们预期的 但是中间过程还是不可预测的吧

15420
32hjf_coding 回复

出现竞争错误的几率会有,但是很小,redis的性能还是很强的。不可预测指的是什么?或者说你在担忧什么问题。原子操作无非 不是成功就是失败

96
15420pathbox 回复

其实我说的不可预测是指的 线程调度不可预测 线程调度不可预测 所以我觉得不能用共享变量来参与计算 顶多是起到一个计数作用 例如:

@a = 1
10.times do |e|
Thread.new {
   @c = 1
   @c += @a
}
p "#{e}  #{@c}"
end

这段代码的输出就依赖 线程调度 不同的线程调度会出现不同的输出

这个跟redis的原子操作没有关系

96
32hjf_coding 回复

中间过程不一定要对吧,比如你统计票数,实时票数多一票少一票又有何关系?只要投票结束以后票数对了就行了嘛。

96
32hjf_coding 回复

你这个程序肯定是依赖于线程调度的,但实际使用中如果把变量初始化放在线程中,肯定要加锁的吧,否则不是作死么…… 而且我不确定 @c += @a 这行是不是原子的,如果是 Redis 的话就没问题了

96
32ningzhang422 回复

不用共享变量就不会。没事不要在线程中修改全局变量($foobar += 1)或者 static 属性(Foo.bar += 3)就好了,Rails 不作死的话是不会踩坑的。

2376
8134zjyzxun 回复

你这个答案是靠谱的, 其实题主最后一个代码是不存在线程安全问题的

GIL的工作机制不像大家想象的一样, 在ruby代码的任何一个点都会切换到另外一个线程去工作

而是有几个明确的工作点的

  • 方法的调用和方法的返回, 在这两个地方都会检查一下当前线程的gil的锁是否超时,是否要调度到另外线程去工作
  • 所有io相关的操作, 也会释放gil的锁让其它线程来工作
  • 在c扩展的代码中手动释放gil的锁
  • 还有一个比较难理解, 就是ruby stack 进入 c stack的时候也会触发gil的检测
static void
gvl_acquire_common(rb_vm_t *vm)
{
    if (vm->gvl.acquired) {

    vm->gvl.waiting++;
    if (vm->gvl.waiting == 1) {
        /*
         * Wake up timer thread iff timer thread is slept.
         * When timer thread is polling mode, we don't want to
         * make confusing timer thread interval time.
         */
        rb_thread_wakeup_timer_thread_low();
    }

    while (vm->gvl.acquired) {
        native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
    }

    vm->gvl.waiting--;

    if (vm->gvl.need_yield) {
        vm->gvl.need_yield = 0;
        native_cond_signal(&vm->gvl.switch_cond);
    }
    }

    vm->gvl.acquired = 1;
}

static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
    native_mutex_lock(&vm->gvl.lock);
    gvl_acquire_common(vm);
    native_mutex_unlock(&vm->gvl.lock);
}

static void
gvl_release_common(rb_vm_t *vm)
{
    vm->gvl.acquired = 0;
    if (vm->gvl.waiting > 0)
    native_cond_signal(&vm->gvl.cond);
}

static void
gvl_release(rb_vm_t *vm)
{
    native_mutex_lock(&vm->gvl.lock);
    gvl_release_common(vm);
    native_mutex_unlock(&vm->gvl.lock);
}

这几个方法是用来控制gil的锁的, 可以参考一下gil在c里面的调用情况

2376
32hjf_coding 回复

你这个代码的线程调度是不会影响 @a@c的值得, 可以用这个代码来验证

@a = 1
r = []
10.times do |e|

Thread.new {
   @c = 1
   @c += @a
   r << [e, @c]
}
end

r 里面的值, 虽然e的前后顺序不一样, 但是, @c的值是一致的

但是你在@c=1@c += @a中间加一个puts的话, 这个就会触发gil的lock, 数据异常了

@a = 1
r = []
10.times do |e|

Thread.new {
   @c = 1
   puts 1
   @c += @a
   r << [e, @c]
}
end
96

以前用ruby写一些测试脚本,使用thread来测试并发代码能力,结果遇到了文章中描述的情况。现在写脚本执行点并发的东西,喜欢用node+go。

96

为什么我这边测得跟楼主测得完全不一样。看得我云里雾里,哪位大神解释下。

require 'benchmark'
def multiple_threads
  count = 0
  threads = 4.times.map do 
    Thread.new do
      2_500_000.times { count += 1}
    end
  end
  threads.map(&:join)
end

def single_threads
  time = Time.now
  count = 0
  Thread.new do
    10_000_000.times { count += 1}
  end.join
end

Benchmark.bm do |b|
  b.report { multiple_threads }
  b.report { single_threads }
end

       user     system      total        real
   0.820000   0.000000   0.820000 (  0.822396)
   0.520000   0.000000   0.520000 (  0.518962)
require 'benchmark'
require 'net/http'

def multiple_threads
  uri = URI("http://www.baidu.com")
  threads = 4.times.map do 
    Thread.new do
      25.times { Net::HTTP.get(uri) }
    end
  end
  threads.map(&:join)
end

def single_threads
  uri = URI("http://www.baidu.com")
  Thread.new do
    100.times { Net::HTTP.get(uri) }
  end.join
end

Benchmark.bm do |b|
  b.report { multiple_threads }
  b.report { single_threads }
end
       user     system      total        real
   0.460000   0.130000   0.590000 (  2.501665)
   0.400000   0.070000   0.470000 (  2.867489)
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册