分享 Webrick 源码阅读笔记

davidqhr · October 28, 2013 · Last by dujiakun replied at September 02, 2014 · 8254 hits
Topic has been selected as the excellent topic by the admin.

写在前面

从 rack webrick handler 开始,顺藤摸瓜一样的往下读。

一些基础

SizedQueue

一个线程安全的队列,有大小限制。队列为空的时候,pop 操作的线程会被阻塞。队列满的时候,push 操作的线程会被阻塞。

q = SizedQueue.new 1

q.push 1

Thread.start {
    loop do
        puts q.pop
        sleep 10
    end
}


q.push 2
q.push 3

TCPServer

TCP/IP stream 型连接的服务器端套接字的类。accept 实例方法会受理客户端的连接请求,返回已连接的 TCPSocket 的实例。

IO::select

多路复用 IO。参数列表前三项为输入/输出/异常的 IO(或者子类)的实例数组。第四个参数是 timeout。第四个参数是 timeout 可以是整数、Float 或 nil(省略时的默认值)。指定为 nil 时,将会一直等到 IO 变成就绪状态。timeout 时将返回 nil,除此以外将返回一个包含 3 个元素的数组,这 3 个元素分别是等待输入/输出/异常的对象的数组 (指定数组的子集)。

从 rack 开始

rack 可以简单的理解成 ruby frameword 和 webserver 之间的一个通用接口。一份基于 rack 开发的 web 服务可以使用 rack 支持的各种 server 来运行。rack 中的所有 server 都具有一个叫做 run 的方法,这个是 web server 的入口。那么从 rack/lib/rack/handler/webrick.rb 中可以找到如下代码。

def self.run(app, options={})
    environment  = ENV['RACK_ENV'] || 'development'
    default_host = environment == 'development' ? 'localhost' : '0.0.0.0'

    options[:BindAddress] = options.delete(:Host) || default_host
    options[:Port] ||= 8080
    options[:OutputBufferSize] = 5
    @server = ::WEBrick::HTTPServer.new(options)
    @server.mount "/", Rack::Handler::WEBrick, app
    yield @server  if block_given?
    @server.start
end

那么就从 WEBrick::HTTPServer 开始,看看 mount 和 start 方法是怎么工作的。

进入 webrick

class HTTPServer < ::WEBrick::GenericServer
    ...
end

这里有必要说说 GenericServer。 其中有两个只读的实例变量:listeners, tokens。 listeners 是监听连接的 socket 数组。 tokens 是最大连接数量(并发数量)。

start 方法

def start(&block)
    ...
      while @status == :Running
        ...
          if svrs = IO.select(@listeners, nil, nil, 2.0)
            @logger.debug(svrs.to_s)
            svrs[0].each{|svr|
              @tokens.pop          # blocks while no token is there.
              if sock = accept_client(svr)
                sock.do_not_reverse_lookup = config[:DoNotReverseLookup]
                th = start_thread(sock, &block)
                th[:WEBrickThread] = true
                thgroup.add(th)
              else
                @tokens.push(nil)
              end
            }
          end
        ...
      end
    ...
  }
end

start 中,是一个循环。当没有请求的时候,主线程会被 select 阻塞。有请求的时候,针对每个输入就绪的 socket,会通过调用 socket 的 accept 方法,来产生一个与客户端通信的新 socket,而原来的 socket 依然在端口上监听。

针对每个与客户端通信的 socket,webrick 会创建一个线程(相关代码在 start_thread 中,稍后提及)来处理请求,这里@tokens的作用类似信号量,初始化 server 的时候,会把@tokens用 nil 填充满,只有能从@token获取到信号的时候,才可以创建线程,获取不到信号的时候,会阻塞主线程,以此控制并发数量。这里参见之前提到的 SizedQueue。

每个请求的具体行为,就要继续查看 start_thread 了。

start_thread

这个方法中是一些异常和 logger 的处理,主要的一句是

def start_thread(sock, &block)
    ...
    block ? block.call(sock) : run(sock)
    ...
end

显而易见,run(sock) 就是下个目标。

run

这个方法,就要回到::WEBrick::HTTPServer 了。

def run(sock)
      while true
        res = HTTPResponse.new(@config)
        req = HTTPRequest.new(@config)
        server = self
        begin
          timeout = @config[:RequestTimeout]
          while timeout > 0
            break if IO.select([sock], nil, nil, 0.5)
            timeout = 0 if @status != :Running
            timeout -= 0.5
          end
          raise HTTPStatus::EOFError if timeout <= 0
          raise HTTPStatus::EOFError if sock.eof?
          req.parse(sock)
          res.request_method = req.request_method
          res.request_uri = req.request_uri
          res.request_http_version = req.http_version
          res.keep_alive = req.keep_alive?
          server = lookup_server(req) || self
          if callback = server[:RequestCallback]
            callback.call(req, res)
          elsif callback = server[:RequestHandler]
            msg = ":RequestHandler is deprecated, please use :RequestCallback"
            @logger.warn(msg)
            callback.call(req, res)
          end
          server.service(req, res)
        rescue HTTPStatus::EOFError, HTTPStatus::RequestTimeout => ex
          res.set_error(ex)
        rescue HTTPStatus::Error => ex
          @logger.error(ex.message)
          res.set_error(ex)
        rescue HTTPStatus::Status => ex
          res.status = ex.code
        rescue StandardError => ex
          @logger.error(ex)
          res.set_error(ex, true)
        ensure
          if req.request_line
            if req.keep_alive? && res.keep_alive?
              req.fixup()
            end
            res.send_response(sock)
            server.access_log(@config, req, res)
          end
        end
        break if @http_version < "1.1"
        break unless req.keep_alive?
        break unless res.keep_alive?
      end
    end

req.parse

从 socket 读取请求报文,构造 request 实例。

def parse(socket=nil)
  ...
  read_request_line(socket)
  ...

  if @http_version.major > 0
    read_header(socket)
    ...
  end

  ...

  if /close/io =~ self["connection"]
    @keep_alive = false
  elsif /keep-alive/io =~ self["connection"]
    @keep_alive = true
  elsif @http_version < "1.1"
    @keep_alive = false
  else
    @keep_alive = true
  end
end

先是解析请求行,再是请求报文头部解析,最后确定 keep_alive

回到 run。

一般的使用情况下 server 都是 self,lookup_server 与 virtual_hosts 有关。server.service 就是 self.service,其中,找到了真正的 servlet 的实例,并调用实例的 service 方法。其中可以看看 mount 方法的作用:可以把不同的 servlet mount 不同的 url 上,形成一个路由表。

rack 的 webrick handler 就是一个 webrick servlet,并且复写了 service 这个方法。server.service(req, res) 调用完毕,那么 response 的各个属性也就填好了,接着res.send_response(sock)会通过 socket 来发送数据。最后根据链接是否 keep-alive 来决定是否跳出循环。

收藏了以后慢慢读…

Thanks for sharing

有意思。

同收藏,楼主不能停,各种 F5 伺候

不加精 天理不容啊

好吧,原来 webrick 用while(true)如此粗暴的方式来处理keep_alive

不错,真的很好,顶一下!

收藏~好帖子,下来慢慢看!

#6 楼 @donnior 其实还可以,因为有 select 来阻塞

#9 楼 @davidqhr 是的,毕竟 webrick 很简单,也只使用于开发环境下而已;我只是最近刚好在看一些 http server 实现,有感而发

好帖,收藏

学习了

Very Nice!

年轻是和楼主一样有精力去读开源代码,现在 33 了,精力不足了。看上 100 行就开始犯困了。唉

少侠,我是来顶的

先顶后看,年薪百万

真不错。

顶一个先

You need to Sign in before reply, if you don't have an account, please Sign up first.