分享 Puma 源代码分析 - 单进程模式

ylt · 2015年03月03日 · 最后由 Dounx 回复于 2020年04月21日 · 4422 次阅读

Puma 的单进程运行模式

Puma 的类结构

在 puma 的启动过程代码中可以看到,负责单进程运行模式的类是 Single。在分析单进程运行模式之前,先总体看一下 puma 的单进程/集群架构。其类图如下所示,可以看出很像策略模式。

最右边是 puma 的启动入口类 PUAM::CLI,它包含一个成员变量@runner,属于 Runner 类。Runner 类有两个子类 Single 和 Cluster,分别负责单进程和集群运行模式。Single 和 Cluster 都提供一个 run 接口,CLI 的 run 方法会调用对应类的 run 方法。可以看出,这类似 GOF 的策略模式。但是 ruby 是动态语言,puma 的作者其实应该不是按照策略模式来实现这套框架的。

下面是 puma 早期的代码:

clustered = @options[:workers] > 0
if clustered
  run_cluster
else
  run_single
end

可见在 puma 早期,单进程执行 run_single 方法,集群执行 run_cluster 方法,没有 Runner/Single/Cluster 的类继承体系,CLI 里直接写单进程和集群的运行模式代码。

所以,现在的这个架构是逐步重构出来的。Single 和 Cluster 有很多雷同的代码,于是增加一个 Runner 类,把共同的代码提取到基类中,形成了现在的架构。我自己本人也是这样的编程习惯,通过不断的重构,清除重复代码,厘清代码的结构。

Runner 中有四个成员变量,分别是 cli/options/app/control,各代表命令行启动类,命令行参数类、配置的 rack app,运行时控制 puma 的 PUMA::ControlCLI 类。这四个类对单进程和多进程来说都是一样的。然后 Single 包含一个 PUAM::Server 类,而 Cluster 则包含一个 PUMA::Worker 的数组,这是单进程与集群模式真正有区别的地方。

单进程的执行流程

单进程的执行流程由 Single 类的 run 方法实现,它首先判断是否要以 daemon 的方法运行 puma,然后加载配置文件并确定监听哪些网络接口,接着把 puma 的进程 pid 以及状态信息写入状态文件中,然后启动运行时控制服务器的网络接口,然后通知启动监听者 puma 启动了,最后是运行 Server 类的 run 方法。

class Single < Runner
    def run
       if daemon?
         log "* Daemonizing..."
         Process.daemon(true)
         redirect_io
       end
      load_and_bind
      @cli.write_state
      start_control
      @server = server = start_server
      @cli.events.fire_on_booted!
      begin
        server.run.join
      rescue Interrupt
        # Swallow it
      end
end       

绑定网络端口

绑定网络端口的实现代码如下:

class Runner
    def load_and_bind
      ……
      begin
        @app = @cli.config.app
      rescue Exception => e
        log "! Unable to load application: #{e.class}: #{e.message}"
        raise e
      end
      @cli.binder.parse @options[:binds], self  #实际绑定网络接口发生在这里
    end
end

首先加载 Rack 的@app,然后调用 Binder 类的 parse 方法实现网络绑定端口的解析和绑定。我感觉这个 parse 方法名字取的不好,叫 parse_and_bind 会更合适一些。另外一个违反直觉的是启动 TCPServer 的代码在 Binder 类中,而不在 Server 类中。下面看看 parse 方法的实现:

class Binder
    def parse(binds, logger)
      binds.each do |str|
        uri = URI.parse str
        case uri.scheme
        when "tcp"
          params = Rack::Utils.parse_query uri.query
          opt = params.key?('low_latency')
          bak = params.fetch('backlog', 1024).to_i
          logger.log "* Listening on #{str}"
          io = add_tcp_listener uri.host, uri.port, opt, bak
          @listeners << [str, io]
        when "unix"
          path = "#{uri.host}#{uri.path}".gsub("%20", " ")
          logger.log "* Listening on #{str}"
          umask = nil
          mode = nil
          if uri.query
            params = Rack::Utils.parse_query uri.query
            if u = params['umask']
              # Use Integer() to respect the 0 prefix as octal
              umask = Integer(u)
            end
            if u = params['mode']
              mode = Integer('0'+u)
            end
          end
          io = add_unix_listener path, umask, mode
          @listeners << [str, io]
        when "ssl"
          ......
         end
    end
end

这里的核心代码是 add_tcp_listener 和 add_unix_listener,分别启动 TCPServer 和 UNIXServer 并监听对应的网络端口。Puma 可以一次监听多个网络端口,其中 unix 快于 tcp,tcp 快于 ssl。由于实际应用中,ssl 都是前端接入的 web 服务器做 ssl offloading,所以这里的 ssl 相关代码就不分析了。

def add_tcp_listener(host, port, optimize_for_latency=true, backlog=1024)
  host = host[1..-2] if host[0..0] == '['
  s = TCPServer.new(host, port)
  if optimize_for_latency
    s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  end
  s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
  s.listen backlog
  @ios << s
  s
end

def add_unix_listener(path, umask=nil, mode=nil)
  @unix_paths << path
  ......
  s = UNIXServer.new(path)
  @ios << s
  env = @proto_env.dup
  env[REMOTE_ADDR] = "127.0.0.1"
  @envs[s] = env
  s
end

写状态文件

当网络端口绑定以后,下一步是写入 pid 文件和状态文件,这两个文件的地址都是在 config 文件中通过 pidfile/state_path 命令配置的。

def write_state
  write_pid
  require 'yaml'
  if path = @options[:state]
    state = { "pid" => Process.pid }
    cfg = @config.dup
    state["config"] = cfg
    File.open(path, "w") do |f|
      f.write state.to_yaml
    end
  end
end

def write_pid
  if path = @options[:pidfile]
    File.open(path, "w") do |f|
      f.puts Process.pid
    end
    cur = Process.pid
    at_exit do
      if cur == Process.pid
        delete_pidfile
      end
    end
  end
end

写状态文件的代码很简单,基本就是把 Configuration 的对象实例序列化为 yaml 格式并写入文件中。写 pid 文件也很简单,只是 ruby 进程退出的时候要把老的 pid 文件删除。

下一步是启动 puma 的 control server,这是通过在 config 文件中配置 control_url 实现的。这块代码和主流程没有太大关系,先放放。

启动 Server

class Runner
  def start_server
    min_t = @options[:min_threads]
    max_t = @options[:max_threads]

    server = Puma::Server.new app, @cli.events, @options
    server.min_threads = min_t
    server.max_threads = max_t
    server.inherit_binder @cli.binder

    if @options[:mode] == :tcp
      server.tcp_mode!
    end
    unless development?
      server.leak_stack_on_error = false
    end
    server
  end

启动 Server 的方法 start_server 其实只是初始化了一个 Server 类的实例,赋值一些变量。

class Server
  def initialize(app, events=Events.stdio, options={})
    @app = app
    @events = events
    @check, @notify = Puma::Util.pipe
    @status = :stop
    @min_threads = 0
    @max_threads = 16
    @auto_trim_time = 1
    @thread = nil
    @thread_pool = nil
    @persistent_timeout = PERSISTENT_TIMEOUT
    @binder = Binder.new(events)
    @own_binder = true
    @first_data_timeout = FIRST_DATA_TIMEOUT
    @leak_stack_on_error = true
    @options = options
    @queue_requests = options[:queue_requests].nil? ? true : options[:queue_requests]
    ENV['RACK_ENV'] ||= "development"
    @mode = :http
  end

一个 Server 类用来服务一个 rack app。

boot 的事件通知

当 Server 初始化完成以后,通知事件监听者 puma 启动了。

class Events
  def fire_on_booted!
    @on_booted.each { |b| b.call }
  end

Server 的 run 方法

单进程模式的最后一步是调用 Server 的 run 方法。如果 background 设置为 true,那么会在一个新线程中运行实际的 handle_servers 代码,否则会同步执行。

class Server  
  def run(background=true)
    BasicSocket.do_not_reverse_lookup = true
    @events.fire :state, :booting
    @status = :run
    queue_requests = @queue_requests
    @thread_pool = ThreadPool.new(@min_threads,
                                  @max_threads,
                                  IOBuffer) do |client, buffer|
      #当有数据来的时候的回调block,后面详细分析io代码时分析此处省略的代码。
    end
    if queue_requests
      @reactor = Reactor.new self, @thread_pool
      @reactor.run_in_thread
    end
    @events.fire :state, :running
    if background
      @thread = Thread.new { handle_servers }
      return @thread
    else
      handle_servers
    end
  end

这一章里就不讨论 io 处理部分的代码(io 也是最复杂的一块),先看总体流程。

def handle_servers
  begin
    check = @check
    sockets = [check] + @binder.ios
    pool = @thread_pool
    queue_requests = @queue_requests

    while @status == :run
      begin
        ios = IO.select sockets
        #网络数据处理部分,后面详细分析io代码时分析此处省略的代码。
      rescue Errno::ECONNABORTED
        # client closed the socket even before accept
        client.close rescue nil
      rescue Object => e
        @events.unknown_error self, e, "Listen loop"
      end
    end

    @events.fire :state, @status
    graceful_shutdown if @status == :stop || @status == :restart
    if queue_requests
      @reactor.clear! if @status == :restart
      @reactor.shutdown
    end
    @check.close
    @notify.close
    if @status != :restart and @own_binder
      @binder.close
    end
  end
  @events.fire :state, :done
end

Puma 服务器通过@status变量控制其运行。当@status为:run 的时候,服务器处于运行状态,当外部操作导致@status变化时,puma 进入关闭/重启流程。

先占个沙发 :plus1:

puma ?

怎么有点熟悉?衣服还是鞋子品牌?

#2 楼 @pynix 学习新概念英语的时候,puma 是美洲狮,有一篇文章 puma at large 印象很深;当运动的时候,puma 指运动服装品牌;对于我们 ruby 程序员,puma 当然是指一个高性能的 web 服务器了。

持续关注,了解 web 服务器设计

ylt Puma 源代码分析 - 概述 提及了此话题。 07月03日 22:43
BasicSocket.do_not_reverse_lookup = true

这一句的作用具体是什么?

需要 登录 后方可回复, 如果你还没有账号请 注册新账号