注:本文只是一个 demo,对用线程和 nio 做网络请求做了一个简单的比较,所以压测方式和代码并不十分严谨,仅供参考。
由于业务需要,应用往往需要访问外部资源,如果资源又恰好很慢(比如 trello、github),就很容易拖慢服务器。
本文使用线程池和 nio4r 做 HTTP get 请求,并进行比较。
考虑到网络的延迟,假设一个请求从建立到返回,需要 1s。
测试方法为发送 1000 个请求,并等待返回结果,通过总共所花费的时间,来进行比较。
因此 http 服务要有较好的并发能力,而网络的延迟不好模拟,需要 http 服务器接收到请求的时候,sleep 1s,再返回响应。
基于这两点,选择 phoenix 做为 http 服务的技术框架,因为 Erlang 有非常强的并发能力和软实时的 timer(sleep 只会阻塞当前 erlang process,而不是整个应用),。
相关代码非常简单:
def index(conn, _params) do
:timer.sleep(1000)
json(conn, "Hello Word")
end
简单压测了一下,如果没有 sleep 的话,大约有 6 ~ 8k 的并发能力 (本文只是一个简单的 demo,所以 http 服务和压测代码都跑在本地)。
代码如下
require 'curb'
url = "http://127.0.0.1:4000"
REQUESTS = 1000
start_time = Time.now
REQUESTS.times do |i|
Curl.get url
end
end_time = Time.now
spend = end_time - start_time
puts "Spend: #{spend} seconds"
测试结果
Spend: 1017.561072 seconds
大约花了 16 分钟,可以推测请求是串行的。
由于网络请求,大部分时间,都是花在傻等 I/O 上,所以我们可以通过线程来做优化。
在这里为了方便使用 concurrent-ruby 的 future 来实现。
require 'httparty'
require 'concurrent'
require 'curb'
@counter = 0
@lock = Mutex.new
def request
url = "http://127.0.0.1:4000"
Curl.get url
@lock.synchronize do
@counter += 1
end
end
REQUESTS = 1000
start_time = Time.now
REQUESTS.times do |i|
Concurrent::Future.execute {
request
}
end
# wait part of requests finished
while @counter < REQUESTS * 0.99
end
end_time = Time.now
spend = end_time - start_time
puts "Spend: #{spend} seconds"
puts "Request per second #{ REQUESTS.to_f / spend }"
puts "Finished #{@counter}"
Spend: 2.326971 seconds
Request per second 429.74321553642056
Finished 993
ruby 默认使用系统线程,而系统线程并不是免费的资源,所以 concurrent 使用线程池来做这件事。
Concurrent.global_io_executor.max_length
2.5.1 :004 > Concurrent.global_io_executor.max_length
=> 2147483647
我们看到,这个值非常大,但实际情况是,服务器的线程一般会设置为几百,所以实际性能要比我们看到的要差。
在优化之前,我们先看一下,请求的 timeline
通过抓包,sudo tcpdump -i lo0 -vv port 4000 -A
,
获得以下输出(为了方便看,只保留了重要信息)
13:39:57.216375 localhost.63934 > localhost.terabase: Flags [S]
13:39:57.216594 localhost.terabase > localhost.63934: Flags [S.]
13:39:57.216634 localhost.63934 > localhost.terabase: Flags [.]
13:39:57.216669 localhost.terabase > localhost.63934: Flags [.]
13:39:57.217850 localhost.63934 > localhost.terabase: Flags [P.]
GET / HTTP/1.1
Host: localhost:4000
13:39:57.217892 localhost.terabase > localhost.63934: Flags [.]
13:39:58.281002 localhost.terabase > localhost.63934: Flags [P.]
HTTP/1.1 200 OK
"Hello Word"
我们看到,连接建立后,在 13:39:57.217850 发起请求,在 13:39:58.281002 才收到返回。 也就是说,这 1s 内,客户端完全是在傻等的。
如果可以同时发 N 个请求,再同时等待这些请求的返回,就会大大提高请求的效率,这就是所谓的 I/O 多路复用。
不同的系统,有不同的系统方法来提供 I/O 多路复用能力,nio4r 对此进行了适配,并提供了一层封装,方便开发者使用。ActionCable 和 midori 都是使用的 nio4r 来做这件事。
nio 通过 NIO::Selector#register
对所感兴趣 I/O 事件进行注册,
之后通过 NIO::Selector#select
方法找出已经 ready 的事件,开发者可以通过遍历,对这些事件进行处理,从而避免了单独傻等某个 I/O 的情况。
@nio = NIO::Selector.new
@nio.register(io, :w)
monitors = @nio.select(0.1)
参考 ActionCable 的实现,这里使用单独的线程在做 monitor 相关的工作。
@thread = Thread.new { run }
run
的核心代码如下
def run
loop do
monitors = @nio.select(0.1)
next unless monitors
monitors.each do |monitor|
io = monitor.io
stream = monitor.value
if monitor.writable?
stream.writable
end
if monitor.readable?
incoming = io.read_nonblock(4096, exception: false)
case incoming
when :wait_readable
next
else
stream.readable incoming
end
end
end
end
end
需要监控的 I/O 时间被注册后 (EventLoop#attach),通过 @nio.select(0.1)
选出 ready monitors,然后对这些 monitor 进行遍历,如果可读或者可写,调用相应的方法。
相比 ActionCable 的实现,剥离了具体处理 I/O 的业务,代码也就更简单些。
对外的方法主要有,attach
, detach
和 update_interest
,用来对所感兴趣的 I/O 进行操作。
网络请求有两个地方是 block 操作,一个是创建连接,一个是发送请求后,等待请求返回。
所以需要一个 HttpClient 提供 nonblock 的能力,对应的代码如下:
def connect_nonblock
remote_addr = Socket.pack_sockaddr_in(@host.port, @host.host)
socket.connect_nonblock(remote_addr, exception: false)
end
def get(path)
lines = ["GET #{path} HTTP/1.0"]
lines << "Host: #{@host.host}\r\n"
lines << "\r\n"
line = lines.join("\r\n")
socket.write line
end
为了和 EventLoop
配合,所以实现了AsyncGetClient
,提供异步请求的能力。
AsyncGetClient
的状态变化是 connect_nonblock -> connected -> send get request -> receive response
AsyncGetClient
通过 start 方法开始请求,
def start
@event_loop.attach(@client.socket, self, :w)
@client.connect_nonblock
end
start 方法首先将 socket 的 write 事件挂在在 event_loop 上,并执行 connect_nonblock
操作。
当 writable 的时候,意味着链接已经创建成功,更新关心的事件为 readable,发起 get 请求,代码如下:
def writable
@event_loop.update_interest(@client.socket, :r)
@client.get(@path)
end
当 I/O readable 的时候,接收请求的返回,detach 掉相应的 I/O 代码如下:
def readable(data)
puts "receive #{data}"
@event_loop.detach(@client.socket, self)
end
使用方法如下:
event_loop = EventLoop.new
client = AsyncGetClient.new("http://127.0.0.1:4000", "/", event_loop)
client.start
sleep(10)
这里需要发送 1000 个请求,并等待其结果返回。所以需要一个 counter 来判断请求的执行情况。
常规的方法有全局变量或者将 counter 作为参数传进去,但无论哪种都会让代码耦合在一起。
Erlang 对类似的事情提供了 link 和 monitor。简单来说就是一个人(process)死了,需要告诉其关心的人。
所以我引入了 Supervisor 这个概念。
代码详见:benchmark.rb,不是本文关注的重点,不做详述。
压测结果:
Spend: 1.318983 seconds
Request per second 758.1598853055725
因为使用 nio 做超过 1000 次请求的时候,会有 Connection reset by peer 的异常(服务器发了 RST packet),说明请求超出了服务器的上限。
猜测是由于 cowboy 启动 ranch 的时候的 max_connections 参数是默认的 1024,并发请求超过 1024 时候,服务器会 rest。
但没找到 phoenix 设置这个参数的地方(ranch 还可以动态改这个参数,我也不知道 phoenix 要咋改。。。),这些也不是本分要关注的地方,所以就没深究。
nio 版的实现,直接使用字符串拼接做来做请求,也没对返回进行解析,和 future 比较性能,是有作弊的成分的。。。
压测也是本地 http 服务器 + 本地跑脚本,更好的是,两台服务器,走内网来做,没这么做除了懒外,还因为穷。
所以这篇文章只是个 demo。。。
一般来讲,一个对象只有存储状态,就是对象在内存里存储的内容。其运行状态,往往不是开发人员所关心的(除非内存泄露了)。
但加入 Supervisor 之后,开发者需要关注其运行状态,既这个对象是否完成了自身的任务。而 AsyncGetClient
本身也是有运行状态的,既连接是否创建,请求是否返回。并发变成本身就是要处理这些运行状态。
Erlang 为了处理这两个状态,采用 pure function programing + pattern match 来实现语言。
而运行状态非常复杂,所以 Erlang 提出了 let it crash + restart process。
Erlang 的 let it crash 不是说,代码可以有类似字符串 + 数字这样的错误,而是说,在运行时,有意外的话,与其修复,不如重启。
并发编程和编程是两件事情。
再扯远一点,Kubernetes 对 pod 的操作,可以理解为 Erlang/OTP 对 process 的操作。
使用 I/O 多路复用会大大提高请求的效率,但却引入了很大的复杂度。