新手问题 遇到 Eventmachine 的 EM.run 不知如何解决阻塞问题

activeliang1 · 2018年07月03日 · 最后由 guange 回复于 2018年07月09日 · 1849 次阅读

我做了一工具,程序的工作流程是这样的:

  1. 利用 EventMachine 监听 2000 号端口开启 Server:

  1. 当程序收到 HTTP 请求时(大概每秒 20 次请求),进入相应的 Module 执行对应的方法:

这里的 process 操作简单描述下:先查找全局变量$depth_data里捞出对应数据返回结束此次请求。如果捞出来是空的,就开启一个 websocket 连接订阅数据,其后保持这个连接,每当收到远端服务发来的数据(大概每秒 10 次),把数据存入$depth_data

流程看起来不难,但问题来了:

我用的 websocket-client gem 是faye-websocket-ruby ,它官方介绍的用法需要使用 EM.run

想不通,全村人想不通:

对 Eventmachine、reactor 模式有点懵,谷歌了几天没能解决问题。

  1. 当我程序执行到这里就会被 EM.run 阻塞,就不会继续往下执行了。绞尽脑汁不知道怎么弄成异步的合理。
  2. 一开始的 Server 就是在 EM.run 里跑的,程序执行到 websocket-client 再开启多一个 EM.run 是不是有冲突。

菜鸟一枚,或者有更好的实现方式,或者解法。折腾了好久,望各大神不吝赐教。

我做过的尝试:

  1. 换一个已经实现异步的 gem: websocket-client-simple ,但这个 gem 在连接后首次收到数据会慢上 5 秒。因为拿的是交易所数据,不能忍,这个做备选。
  2. 直接加个 Thread.new 包起来实现异步,这个做法在 console 里还能跑起来。但真正开启 Http 就不会执行。猜测:EM.run 嵌套后出问题。

顶一个,有意思的问题。把你的代码放出来?

early 回复

项目地址:https://gitlab.com/activeliang/bv_helper

项目调试 websocket 和 EventMachine:

重要提示:终端需要能科学上网

在 console 调试:

$ bin/console #进入 console

$ item = Ex.bitmex.new('xbtusd')

$ item.depth{|x| puts x }
停止 websocket 的方法:item.stop

调试 EM.run 版:

$ bin/start #启动服务

$ curl -H "Content-Type: application/json" \ -X POST -d '{ "access_key": "access_key","secret_key": "secret_key", "nonce": "1500793319499", "method": "depth","params": {"symbol": "XBTUSD"}}' \ http://127.0.0.1:2000/bitmex

程序是为这个而生:https://www.fmz.com/bbs-topic/1052 换个角度描述下这个程序的需求:用 websocket 接(多个)数字货币交易所拿到订单薄数据。然后提供一个 API 接口http://127.0.0.1:2000/ex给同一主机的另一个程序提供数据。或许各位有更简单粗暴的实现思路。

阻塞怎么回事

假设一个请求,需要 2 秒,那这 2 秒,耗时在哪?

一个 request 的大体流程,三次握手,发请求,接收请求,关闭连接。

CPU、内存的操作,速度都很快,不需要考虑。

我们单独考虑请求发出去,等待放回这个过程。

请求已经被发出去,IO#read 读数据,由于数据还没从服务器返回,一直等数据返回后,才能读到数据,在等待数据返回这段时间,process 什么也没做的(被挂起了),所以是一个阻塞操作。

就好比,你要一个人帮忙买东西,你跟这个人说,帮忙买 A,然后你就睡觉了。A 回来的时候,上帝(操作系统)会再叫醒你。

如果想高效一点,很自然的可以让多个人同时去买,然后让他们把买好的东西放到一张桌子上,你时长去检查,桌子上有没有东西就可以了。这样就从,同一时间买一样东西变成了同一时间买多样东西。

你在系统中就是线程,跑腿的就是一个 IO 对象,检查桌子上是否有东西,就是 IO#select 操作。

这个 IO 模型就是 IO 多路复用。

yfractal 回复

这里的阻塞是这样,比如运行下面的代码:

#!/usr/bin/env ruby
require 'eventmachine'

EM.run{ puts "1" }
puts "2"

由于 EM 的 run 是主事件循环,相当于一个无限循环。底下的puts "2"永远不会被执行到。

可以试下 EventMachine.next_tick

看了一下你的代码,创建 Websockt 连接去获取数据的操作,是在用户的 http 请求中被动触发的。这里面数据和数据是打包耦合在一起的。你看能不能换一种实现方式,单独开几个线程来实现数据,让这两个过程分开。比如在BvHelper::Daemon.run方法中先创建几个线程去获取数据,然后才EventMachine.start_server:

require 'eventmachine'

module EchoServer

  def receive_data data
    puts data
    puts $data
  end

end

class Server
  def self.run
    3.times do |i|
      Thread.new do
        loop do
          $data ||= {}
          $data[i] = Time.now.to_i
          sleep 1
        end
      end
    end

    EventMachine.run {
      EventMachine.start_server "127.0.0.1", 8081, EchoServer
    }
  end
end

Server.run
early 回复

不错的思路,我遇到另外一个问题,在收到 request 之前并不知道需要启用哪些 websocket 的,一共有 8 条线,一般最多会启用随机的 3 条。如果 8 条全开,有 5 条是无用的。

aabbcc456aa 回复

试了下没成功,我再研究下这个的用法……🙏

或许可以考虑不将两个功能放在一起,websocket 收集为 A, 提供 API 接口为 B,AB 之间通过 rpc 消息来通信。

activeliang1 关闭了讨论。 05月18日 09:56
需要 登录 后方可回复, 如果你还没有账号请 注册新账号