分享 Puma 源代码分析 - 完结篇

ylt · 2015年03月25日 · 最后由 ylt 回复于 2016年07月03日 · 4699 次阅读

puma 源代码分析之完结篇

这个系列的文章从 puma 的总体结构、启动流程开始分析起,到单进程/集群模式分析、IO 处理、http 协议解析,终于到了最后的完结篇。最后这部分主要讨论两个问题:1.Puma 是如何执行 rack app 来处理客户端请求的;2.运行时 puma server 如何接受和处理命令行工具 pumactl 发过来的请求。

处理客户端请求

Puma 是一个支持 rack 接口的 web 服务器,本节分析 puma 的 rack 接口部分,看看 puma 是如何执行 rack app 并输出 http 响应的。客户端请求的处理由 Server 类的 handle_request 方法完成。其中,req 代表客户端连接,lines 是输出缓冲区。本方法调用 rack app 并把响应写回给客户端。本方法有三种返回值:true/false/:async,分别代表启用 keep_alive, 不启用 keep_alive 和使用 socket-hijacking。

class Server
  # Given the request +env+ from +client+ and a partial request body
  # in +body+, finish reading the body if there is one and invoke
  # the rack app. Then construct the response and write it back to
  # +client+
  #
  # +cl+ is the previously fetched Content-Length header if there
  # was one. This is an optimization to keep from having to look
  # it up again.
  #
  def handle_request(req, lines)
    env = req.env
    client = req.io

    normalize_env env, client   #添加遵循Rack规范的一些env

    env[PUMA_SOCKET] = client
    env[HIJACK_P] = true
    env[HIJACK] = req
    body = req.body
    head = env[REQUEST_METHOD] == HEAD
    env[RACK_INPUT] = body
    env[RACK_URL_SCHEME] =  env[HTTPS_KEY] ? HTTPS : HTTP

    # A rack extension. If the app writes #call'ables to this
    # array, we will invoke them when the request is done.
    #
    after_reply = env[RACK_AFTER_REPLY] = []

    begin
      begin
        status, headers, res_body = @app.call(env)   #这里调用了rack app

        return :async if req.hijacked

        status = status.to_i

        if status == -1
          unless headers.empty? and res_body == []
            raise "async response must have empty headers and body"
          end

          return :async
        end
      rescue StandardError => e
        @events.unknown_error self, e, "Rack app"

        status, headers, res_body = lowlevel_error(e)
      end

      content_length = nil
      no_body = head

      if res_body.kind_of? Array and res_body.size == 1
        content_length = res_body[0].bytesize
      end

      cork_socket client

      line_ending = LINE_END
      colon = COLON

      if env[HTTP_VERSION] == HTTP_11
        allow_chunked = true
        keep_alive = env[HTTP_CONNECTION] != CLOSE
        include_keepalive_header = false

        # An optimization. The most common response is 200, so we can
        # reply with the proper 200 status without having to compute
        # the response header.
        #
        if status == 200
          lines << HTTP_11_200
        else
          lines.append "HTTP/1.1 ", status.to_s, " ",
                       fetch_status_code(status), line_ending

          no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
        end
      else
      ......
      end

      response_hijack = nil

      headers.each do |k, vs|
        case k
        when CONTENT_LENGTH2
          content_length = vs
          next
        when TRANSFER_ENCODING
          allow_chunked = false
          content_length = nil
        when HIJACK
          response_hijack = vs
          next
        end

        if vs.respond_to?(:to_s)
          vs.to_s.split(NEWLINE).each do |v|
            lines.append k, colon, v, line_ending
          end
        else
          lines.append k, colon, line_ending
        end
      end

      if no_body
        if content_length and status != 204
          lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
        end

        lines << line_ending
        fast_write client, lines.to_s
        return keep_alive
      end

      if include_keepalive_header
        lines << CONNECTION_KEEP_ALIVE
      elsif !keep_alive
        lines << CONNECTION_CLOSE
      end

      unless response_hijack
        if content_length
          lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
          chunked = false
        elsif allow_chunked
          lines << TRANSFER_ENCODING_CHUNKED
          chunked = true
        end
      end

      lines << line_ending

      fast_write client, lines.to_s

      if response_hijack
        response_hijack.call client
        return :async
      end

      begin
        res_body.each do |part|
          if chunked
            fast_write client, part.bytesize.to_s(16)
            fast_write client, line_ending
            fast_write client, part
            fast_write client, line_ending
          else
            fast_write client, part
          end

          client.flush
        end

        if chunked
          fast_write client, CLOSE_CHUNKED
          client.flush
        end
      rescue SystemCallError, IOError
        raise ConnectionError, "Connection error detected during write"
      end

    ensure
      uncork_socket client

      body.close
      res_body.close if res_body.respond_to? :close

      after_reply.each { |o| o.call }
    end

    return keep_alive
  end

整个方法很长,但是逻辑不复杂。主要就是根据 rack app 的调用情况和 keep_alive/chunked/hijiack 等 http 选项,输出 http 响应的状态码/响应头部/响应体。其中最核心的执行 rack app 的代码只有一行:status, headers, res_body = @app.call(env)。此外,normalize_env 方法用于增加和修正一些 Rack 的 env 变量,代码如下:

# Given a Hash +env+ for the request read from +client+, add
# and fixup keys to comply with Rack's env guidelines.
#
def normalize_env(env, client)
  if host = env[HTTP_HOST]
    if colon = host.index(":")
      env[SERVER_NAME] = host[0, colon]
      env[SERVER_PORT] = host[colon+1, host.bytesize]
    else
      env[SERVER_NAME] = host
      env[SERVER_PORT] = default_server_port(env)
    end
  else
    env[SERVER_NAME] = LOCALHOST
    env[SERVER_PORT] = default_server_port(env)
  end

  unless env[REQUEST_PATH]
    # it might be a dumbass full host request header
    uri = URI.parse(env[REQUEST_URI])
    env[REQUEST_PATH] = uri.path

    raise "No REQUEST PATH" unless env[REQUEST_PATH]
  end

  env[PATH_INFO] = env[REQUEST_PATH]

  # From http://www.ietf.org/rfc/rfc3875 :
  # "Script authors should be aware that the REMOTE_ADDR and
  # REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9)
  # may not identify the ultimate source of the request.
  # They identify the client for the immediate request to the
  # server; that client may be a proxy, gateway, or other
  # intermediary acting on behalf of the actual source client."
  #

  unless env.key?(REMOTE_ADDR)
    addr = client.peeraddr.last

    # Set unix socket addrs to localhost
    addr = "127.0.0.1" if addr.empty?

    env[REMOTE_ADDR] = addr
  end
end

最后看看处理客户端请求的循环的 process_client 方法。这个方法的核心是调用 handle_request 方法,然后根据返回值决定下一步的处理。如果 handle_request 返回 false,也就是没有 keep_alive,那么本方法执行完立刻关闭客户端连接;如果返回 true,那么就是 keep_alive,就会循环调用 handle_request,此时会重用同一个客户端连接处理多个 http 请求;如果 handle_request 返回 async,那么就是 socket hijack 模式,此时由 rack app 来直接处理 socket 连接,puma 也不关闭客户端 socket。关于 hijacking 的说明可以参考这篇文章rack socket hijacking。Socket hijack 模式可以用来实现 WebSocket 功能。

# Given a connection on +client+, handle the incoming requests.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
def process_client(client, buffer)
  begin
    close_socket = true

    while true
      case handle_request(client, buffer)
      when false
        return
      when :async
        close_socket = false
        return
      when true
        return unless @queue_requests
        buffer.reset

        unless client.reset(@status == :run)
          close_socket = false
          client.set_timeout @persistent_timeout
          @reactor.add client
          return
        end
      end
    end

  # The client disconnected while we were reading data
  rescue ConnectionError
    # Swallow them. The ensure tries to close +client+ down

  # The client doesn't know HTTP well
  rescue HttpParserError => e
    client.write_400

    @events.parse_error self, client.env, e

  # Server error
  rescue StandardError => e
    client.write_500

    @events.unknown_error self, e, "Read"

  ensure
    buffer.reset

    begin
      client.close if close_socket
    rescue IOError, SystemCallError
      # Already closed
    rescue StandardError => e
      @events.unknown_error self, e, "Client"
    end
  end
end

至此把 puma 处理 rack app 的流程总体上过了一遍。

pumactl 如何控制 puma 的运行状态

Pumactl 是一个命令行工具,用来控制 puma 的运行状态。Pumactl 可以通过两种手段来控制 puma,一个是 pid,另一个是 control_url。

!/usr/bin/env ruby
require 'puma/control_cli'
cli = Puma::ControlCLI.new ARGV.dup
cli.run

class ControlCLI
  def run
    start if @options[:command] == "start"
    prepare_configuration
    @options.has_key?(:control_url) ? send_request : send_signal
  end

我们先看通过 pid 控制 puma 的运行,这个方式简单一些,但是功能也受限。从代码中可以看到,通过 pid 控制 puma 其实使用的是标准的 unix 信号机制。但是仅使用进程 pid 无法支持 stats 和 reload-worker-directory 命令。

def send_signal
  ......
  case @options[:command]
  when "restart"
    Process.kill "SIGUSR2", pid

  when "halt"
    Process.kill "QUIT", pid

  when "stop"
    Process.kill "SIGTERM", pid

  when "stats"
    puts "Stats not available via pid only"
    return

  when "reload-worker-directory"
    puts "reload-worker-directory not available via pid only"
    return

  when "phased-restart"
    Process.kill "SIGUSR1", pid

  else
    message "Puma is started"
    return
  end

  message "Command #{@options[:command]} sent success"
end

要使用 control_url 来控制 puma,必须在启动 puma 的时候提供 control 相关的参数,比如:

puma --control tcp://127.0.0.1:9293 --control-token foo

然后,在启动 puma 的时候,会根据配置的 control 参数启动一个单独的 puma server,并执行一个单独的 rack app。

class Runner
   def start_control
     str = @options[:control_url]
     require 'puma/app/status'
     uri = URI.parse str
     app = Puma::App::Status.new @cli    #响应control请求时执行的rack app
     if token = @options[:control_auth_token]
       app.auth_token = token unless token.empty? or token == :none
     end

     control = Puma::Server.new app, @cli.events  #启动一个独立的puma server, 并执行内置的app
     control.min_threads = 0
     control.max_threads = 1

     case uri.scheme
     when "tcp"
       log "* Starting control server on #{str}"
       control.add_tcp_listener uri.host, uri.port
     when "unix"
       log "* Starting control server on #{str}"
       path = "#{uri.host}#{uri.path}"

       control.add_unix_listener path
     else
       error "Invalid control URI: #{str}"
     end

     control.run
     @control = control
   end

从代码中可以看出,如果使用 control_url 机制,会启动一个 puma server,并执行 Puma::App::Status 这个 app。也就是说,control_url 机制下的 puma 进程会监听两个不同的端口,一个执行启动者提供的 rack app,另一个执行自己内部的 Status 这个 app。

class Status
  def call(env)
    unless authenticate(env)
      return rack_response(403, 'Invalid auth token', 'text/plain')
    end

    case env['PATH_INFO']
    when /\/stop$/
      @cli.stop
      return rack_response(200, OK_STATUS)

    when /\/halt$/
      @cli.halt
      return rack_response(200, OK_STATUS)

    when /\/restart$/
      @cli.restart
      return rack_response(200, OK_STATUS)

    when /\/phased-restart$/
      if !@cli.phased_restart
        return rack_response(404, '{ "error": "phased restart not available" }')
      else
        return rack_response(200, OK_STATUS)
      end

    when /\/reload-worker-directory$/
      if !@cli.reload_worker_directory
        return rack_response(404, '{ "error": "reload_worker_directory not available" }')
      else
        return rack_response(200, OK_STATUS)
      end

    when /\/stats$/
      return rack_response(200, @cli.stats)
    else
      rack_response 404, "Unsupported action", 'text/plain'
    end
  end

可以看到,Status 是一个标准的 rack app 实现,提供一个 call 方法,接受一个 env 参数并返回 rack_response。下面看一下 stats 的实现:

 class Single < Runner
   def stats
     b = @server.backlog
     r = @server.running
     %Q!{ "backlog": #{b}, "running": #{r} }!
   end

 class Server
   def backlog
     @thread_pool and @thread_pool.backlog
   end

   def running
     @thread_pool and @thread_pool.spawned
   end
 end

class ThreadPool  
 def backlog
   @mutex.synchronize { @todo.size }
 end

可以看到,backlog 是线程池中等待执行的任务的个数,running 指当前线程池中的线程数量。

有没有人用 Einhorn?

#2 楼 @est 没用过,看名字也是怪兽系列的啊

#2 楼 @est 第一次听说 Einhorn,看了一下,应该是一个 ruby 写的父子进程 socket 管理器,相当于把 unicorn 和 puma cluster 的父子进程管理部分独立出来。这样可以在 Einhorn 后面挂其它的 web server,提供了架构的灵活性。

ylt Puma 源代码分析 - 概述 提及了此话题。 07月03日 22:43
需要 登录 后方可回复, 如果你还没有账号请 注册新账号