新手问题 nio4r 的 Selector 为什么在 B 线程中 select,然后在 A 线程中 register 会造成两个线程都阻塞?

Dounx · 2020年06月02日 · 最后由 Dounx 回复于 2020年06月03日 · 1431 次阅读

在服务器代码中,每当 Thread A 运行至 monitor = selector.register(socket, :r) 这一行时,线程 A、B 都会阻塞(看起来像是死锁?

请教下阻塞是因为什么原因,以及正确的用法应该是怎么样的?

服务端代码:

# frozen_string_literal: true

require "socket"
require "nio"

def thread_loop
  Thread.new do
    loop do 
      yield
    end
  end
end

def run(host, port)
  server = TCPServer.new(host, port)
  puts "Listening to #{host}:#{port}"

  selector = NIO::Selector.new

  threads = []
  sockets = []

  # Thread A
  threads << thread_loop do
    ios = IO.select [server]
    ios.first.each do |server|
      socket = server.accept
      sockets << socket
      puts "New client #{socket}"
      monitor = selector.register(socket, :r)
      monitor.value = proc { puts "Got: #{monitor.io.read_nonblock(4096)}" }
    end
  end

  # Thread B
  threads << thread_loop do
    selector.select do |monitor|
      monitor.value.call
    end
  end

  threads.each(&:join)
rescue Interrupt
  sockets.each do |socket|
    socket.close
    selector.deregister(socket)
  end
end

run('localhost', 1234)

客户端代码:

# frozen_string_literal: true

require "socket"

def test(host, port, cnt = 10, interval = 1)
  sockets = []

  cnt.times do
    sockets << TCPSocket.new(host, port)
  end

  loop do
    index = rand(cnt)
    socket = sockets[index]
    puts "Try to send 'Hello from no.#{index} socket!'"
    socket << "Hello from no.#{index} socket!"
    sleep(interval)
  end
rescue Interrupt
  puts "Disconnected!"
ensure
  sockets.each(&:close)
end

test("localhost", 1234, 3)

是 Thread B 里的selector.select do |monitor| block Thread A 里的monitor = selector.register(socket, :r)

  • NIO:: Selector#selectIO.select的替代品,不应该同时用.
  • TCPServer 先 register 到 nio selector, 然后再调用NIO:: Selector#select。server monitor 的 callback 里接收客户端 io 对象,然后把 io 对象再注册到 nio selector 里。
piecehealth 回复
  • Puma 中有同时使用 IO.select 和 NIO::Selector#select。
  • IO.select 用的应该是 select,NIO::Selector#select 用的是 epoll/kqueue,这两者应该不会有冲突吧?
Dounx 回复

Puma 是多个地方用到IO.select,reactor 从IO.select替换到NIO::Selector#select https://github.com/puma/puma/commit/e83a4954e4c0214d18beb594ddf598fafdf058d7#diff-8b7db4d5cc4b8f6dc8feb7030baa2478

IO.select 跟 NIO::Selector 的区别官方 wiki 就有 https://tonyarcieri.com/a-gentle-introduction-to-nio4r

require "socket"
require "nio"

def run(host, port)
  server = TCPServer.new(host, port)
  puts "Listening to #{host}:#{port}"

  selector = NIO::Selector.new
  monitor = selector.register(server, :r)
  monitor.value = ->(_) do
    socket = server.accept
    puts "New client #{socket}"
    client_monitor = selector.register(socket, :r)
    client_monitor.value = ->(mon) do
      s = mon.io
      puts "Got: #{s.read_nonblock(4096)}"
    end
  end

  loop do
    selector.select { |monitor| monitor.value.call(monitor) }
  end
end

run('localhost', 1234)
piecehealth 回复

是 Thread B 里的 selector.select do |monitor| block Thread A 里的 monitor = selector.register(socket, :r)

所以如果一旦进行了 select 操作,就只能在 select 块内再进行 register 吗?

你给 selector 加个等待时间, selector.select(2)这样,不然 selector 会一直堵塞。 而且推荐 selector 各自在各自线程里,交互用 queue,不然线程安全怎么都是个问题。

nickoan 回复

好的

piecehealth 回复

明白了,当 select 的时候会用 lock 锁住,其它的操作无法进行;只有当获取到就绪 IO 开始执行 select 块中代码时,才会释放 lock,然后就可以在块中继续 register 或者其它操作。

9楼 已删除
Dounx 关闭了讨论 06月04日 14:59
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册