这个系列的文章从 puma 的总体结构、启动流程开始分析起,到单进程/集群模式分析、IO 处理、http 协议解析,终于到了最后的完结篇。最后这部分主要讨论两个问题:1.Puma 是如何执行 rack app 来处理客户端请求的;2.运行时 puma server 如何接受和处理命令行工具 pumactl 发过来的请求。
Puma 是一个支持 rack 接口的 web 服务器,本节分析 puma 的 rack 接口部分,看看 puma 是如何执行 rack app 并输出 http 响应的。客户端请求的处理由 Server 类的 handle_request 方法完成。其中,req 代表客户端连接,lines 是输出缓冲区。本方法调用 rack app 并把响应写回给客户端。本方法有三种返回值:true/false/:async,分别代表启用 keep_alive, 不启用 keep_alive 和使用 socket-hijacking。
class Server
# Given the request +env+ from +client+ and a partial request body
# in +body+, finish reading the body if there is one and invoke
# the rack app. Then construct the response and write it back to
# +client+
#
# +cl+ is the previously fetched Content-Length header if there
# was one. This is an optimization to keep from having to look
# it up again.
#
def handle_request(req, lines)
env = req.env
client = req.io
normalize_env env, client #添加遵循Rack规范的一些env值
env[PUMA_SOCKET] = client
env[HIJACK_P] = true
env[HIJACK] = req
body = req.body
head = env[REQUEST_METHOD] == HEAD
env[RACK_INPUT] = body
env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP
# A rack extension. If the app writes #call'ables to this
# array, we will invoke them when the request is done.
#
after_reply = env[RACK_AFTER_REPLY] = []
begin
begin
status, headers, res_body = @app.call(env) #这里调用了rack app
return :async if req.hijacked
status = status.to_i
if status == -1
unless headers.empty? and res_body == []
raise "async response must have empty headers and body"
end
return :async
end
rescue StandardError => e
@events.unknown_error self, e, "Rack app"
status, headers, res_body = lowlevel_error(e)
end
content_length = nil
no_body = head
if res_body.kind_of? Array and res_body.size == 1
content_length = res_body[0].bytesize
end
cork_socket client
line_ending = LINE_END
colon = COLON
if env[HTTP_VERSION] == HTTP_11
allow_chunked = true
keep_alive = env[HTTP_CONNECTION] != CLOSE
include_keepalive_header = false
# An optimization. The most common response is 200, so we can
# reply with the proper 200 status without having to compute
# the response header.
#
if status == 200
lines << HTTP_11_200
else
lines.append "HTTP/1.1 ", status.to_s, " ",
fetch_status_code(status), line_ending
no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
end
else
......
end
response_hijack = nil
headers.each do |k, vs|
case k
when CONTENT_LENGTH2
content_length = vs
next
when TRANSFER_ENCODING
allow_chunked = false
content_length = nil
when HIJACK
response_hijack = vs
next
end
if vs.respond_to?(:to_s)
vs.to_s.split(NEWLINE).each do |v|
lines.append k, colon, v, line_ending
end
else
lines.append k, colon, line_ending
end
end
if no_body
if content_length and status != 204
lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
end
lines << line_ending
fast_write client, lines.to_s
return keep_alive
end
if include_keepalive_header
lines << CONNECTION_KEEP_ALIVE
elsif !keep_alive
lines << CONNECTION_CLOSE
end
unless response_hijack
if content_length
lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
chunked = false
elsif allow_chunked
lines << TRANSFER_ENCODING_CHUNKED
chunked = true
end
end
lines << line_ending
fast_write client, lines.to_s
if response_hijack
response_hijack.call client
return :async
end
begin
res_body.each do |part|
if chunked
fast_write client, part.bytesize.to_s(16)
fast_write client, line_ending
fast_write client, part
fast_write client, line_ending
else
fast_write client, part
end
client.flush
end
if chunked
fast_write client, CLOSE_CHUNKED
client.flush
end
rescue SystemCallError, IOError
raise ConnectionError, "Connection error detected during write"
end
ensure
uncork_socket client
body.close
res_body.close if res_body.respond_to? :close
after_reply.each { |o| o.call }
end
return keep_alive
end
整个方法很长,但是逻辑不复杂。主要就是根据 rack app 的调用情况和 keep_alive/chunked/hijiack 等 http 选项,输出 http 响应的状态码/响应头部/响应体。其中最核心的执行 rack app 的代码只有一行:status, headers, res_body = @app.call(env)
。此外,normalize_env 方法用于增加和修正一些 Rack 的 env 变量,代码如下:
# Given a Hash +env+ for the request read from +client+, add
# and fixup keys to comply with Rack's env guidelines.
#
def normalize_env(env, client)
if host = env[HTTP_HOST]
if colon = host.index(":")
env[SERVER_NAME] = host[0, colon]
env[SERVER_PORT] = host[colon+1, host.bytesize]
else
env[SERVER_NAME] = host
env[SERVER_PORT] = default_server_port(env)
end
else
env[SERVER_NAME] = LOCALHOST
env[SERVER_PORT] = default_server_port(env)
end
unless env[REQUEST_PATH]
# it might be a dumbass full host request header
uri = URI.parse(env[REQUEST_URI])
env[REQUEST_PATH] = uri.path
raise "No REQUEST PATH" unless env[REQUEST_PATH]
end
env[PATH_INFO] = env[REQUEST_PATH]
# From http://www.ietf.org/rfc/rfc3875 :
# "Script authors should be aware that the REMOTE_ADDR and
# REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9)
# may not identify the ultimate source of the request.
# They identify the client for the immediate request to the
# server; that client may be a proxy, gateway, or other
# intermediary acting on behalf of the actual source client."
#
unless env.key?(REMOTE_ADDR)
addr = client.peeraddr.last
# Set unix socket addrs to localhost
addr = "127.0.0.1" if addr.empty?
env[REMOTE_ADDR] = addr
end
end
最后看看处理客户端请求的循环的 process_client 方法。这个方法的核心是调用 handle_request 方法,然后根据返回值决定下一步的处理。如果 handle_request 返回 false,也就是没有 keep_alive,那么本方法执行完立刻关闭客户端连接;如果返回 true,那么就是 keep_alive,就会循环调用 handle_request,此时会重用同一个客户端连接处理多个 http 请求;如果 handle_request 返回 async,那么就是 socket hijack 模式,此时由 rack app 来直接处理 socket 连接,puma 也不关闭客户端 socket。关于 hijacking 的说明可以参考这篇文章rack socket hijacking。Socket hijack 模式可以用来实现 WebSocket 功能。
# Given a connection on +client+, handle the incoming requests.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
def process_client(client, buffer)
begin
close_socket = true
while true
case handle_request(client, buffer)
when false
return
when :async
close_socket = false
return
when true
return unless @queue_requests
buffer.reset
unless client.reset(@status == :run)
close_socket = false
client.set_timeout @persistent_timeout
@reactor.add client
return
end
end
end
# The client disconnected while we were reading data
rescue ConnectionError
# Swallow them. The ensure tries to close +client+ down
# The client doesn't know HTTP well
rescue HttpParserError => e
client.write_400
@events.parse_error self, client.env, e
# Server error
rescue StandardError => e
client.write_500
@events.unknown_error self, e, "Read"
ensure
buffer.reset
begin
client.close if close_socket
rescue IOError, SystemCallError
# Already closed
rescue StandardError => e
@events.unknown_error self, e, "Client"
end
end
end
至此把 puma 处理 rack app 的流程总体上过了一遍。
Pumactl 是一个命令行工具,用来控制 puma 的运行状态。Pumactl 可以通过两种手段来控制 puma,一个是 pid,另一个是 control_url。
#!/usr/bin/env ruby
require 'puma/control_cli'
cli = Puma::ControlCLI.new ARGV.dup
cli.run
class ControlCLI
def run
start if @options[:command] == "start"
prepare_configuration
@options.has_key?(:control_url) ? send_request : send_signal
end
我们先看通过 pid 控制 puma 的运行,这个方式简单一些,但是功能也受限。从代码中可以看到,通过 pid 控制 puma 其实使用的是标准的 unix 信号机制。但是仅使用进程 pid 无法支持 stats 和 reload-worker-directory 命令。
def send_signal
......
case @options[:command]
when "restart"
Process.kill "SIGUSR2", pid
when "halt"
Process.kill "QUIT", pid
when "stop"
Process.kill "SIGTERM", pid
when "stats"
puts "Stats not available via pid only"
return
when "reload-worker-directory"
puts "reload-worker-directory not available via pid only"
return
when "phased-restart"
Process.kill "SIGUSR1", pid
else
message "Puma is started"
return
end
message "Command #{@options[:command]} sent success"
end
要使用 control_url 来控制 puma,必须在启动 puma 的时候提供 control 相关的参数,比如:
puma --control tcp://127.0.0.1:9293 --control-token foo
然后,在启动 puma 的时候,会根据配置的 control 参数启动一个单独的 puma server,并执行一个单独的 rack app。
class Runner
def start_control
str = @options[:control_url]
require 'puma/app/status'
uri = URI.parse str
app = Puma::App::Status.new @cli #响应control请求时执行的rack app
if token = @options[:control_auth_token]
app.auth_token = token unless token.empty? or token == :none
end
control = Puma::Server.new app, @cli.events #启动一个独立的puma server, 并执行内置的app
control.min_threads = 0
control.max_threads = 1
case uri.scheme
when "tcp"
log "* Starting control server on #{str}"
control.add_tcp_listener uri.host, uri.port
when "unix"
log "* Starting control server on #{str}"
path = "#{uri.host}#{uri.path}"
control.add_unix_listener path
else
error "Invalid control URI: #{str}"
end
control.run
@control = control
end
从代码中可以看出,如果使用 control_url 机制,会启动一个 puma server,并执行 Puma::App::Status 这个 app。也就是说,control_url 机制下的 puma 进程会监听两个不同的端口,一个执行启动者提供的 rack app,另一个执行自己内部的 Status 这个 app。
class Status
def call(env)
unless authenticate(env)
return rack_response(403, 'Invalid auth token', 'text/plain')
end
case env['PATH_INFO']
when /\/stop$/
@cli.stop
return rack_response(200, OK_STATUS)
when /\/halt$/
@cli.halt
return rack_response(200, OK_STATUS)
when /\/restart$/
@cli.restart
return rack_response(200, OK_STATUS)
when /\/phased-restart$/
if !@cli.phased_restart
return rack_response(404, '{ "error": "phased restart not available" }')
else
return rack_response(200, OK_STATUS)
end
when /\/reload-worker-directory$/
if !@cli.reload_worker_directory
return rack_response(404, '{ "error": "reload_worker_directory not available" }')
else
return rack_response(200, OK_STATUS)
end
when /\/stats$/
return rack_response(200, @cli.stats)
else
rack_response 404, "Unsupported action", 'text/plain'
end
end
可以看到,Status 是一个标准的 rack app 实现,提供一个 call 方法,接受一个 env 参数并返回 rack_response。下面看一下 stats 的实现:
class Single < Runner
def stats
b = @server.backlog
r = @server.running
%Q!{ "backlog": #{b}, "running": #{r} }!
end
class Server
def backlog
@thread_pool and @thread_pool.backlog
end
def running
@thread_pool and @thread_pool.spawned
end
end
class ThreadPool
def backlog
@mutex.synchronize { @todo.size }
end
可以看到,backlog 是线程池中等待执行的任务的个数,running 指当前线程池中的线程数量。