Ruby 尝试使用 Ruby 3 调度器

dsh0416 · 2020年08月18日 · 最后由 zzz6519003 回复于 2020年09月22日 · 2495 次阅读
本帖已被设为精华帖!

一次失败的提案

在准备 RubyConf China 2020 的时候,我仔细检查了 Fiber 调度器 提出的补丁。当我看调度器的样例代码的时候,我发现其调用的是 Ruby 中的 IO.select API。IO.select API 在 Ruby 内部有多种实现,它可能调用 poll、大尺寸 select、POSIX 兼容的 select 取决于不同的操作系统。于是我想用一些更快的 syscall 来实现,比如 epoll kqueueIOCP

我做了一个相关的提案但是被拒绝了。主要问题是 Ruby 的 IO.select API 是无状态的。如果没有含状态的注册,这些新 API 的性能甚至会不如 poll。在 Koichi Sasada 跑了 banchmark 证明了这一点后,提案被正式拒绝。在和 Samuel Williams 在 Twitter 上讨论后,它建议我从 Scheduler 的实现上来进行注入,因为 Scheduler 本身是有状态的。于是我开始写一个 gem 作为 Ruby 3 调度器接口的概念证明。

实现调度器

本文中的 Ruby 版本是:

ruby 2.8.0dev (2020-08-18T10:10:09Z master 172d44e809) [x86_64-linux]

基本的 Scheduler 例子来自于 Ruby 的单元测试。这是 Ruby 3 调度器的测试,而不是真正用于生产的,因此是使用 IO.select 进行 I/O 多路复用。因此我们可以基于此,开发一个性能更好的 Ruby 调度器。

我们需要做一些 C 开发来支持其它 syscall,因此第一件事是兼容原始的实现。

Fallback 到 Ruby IO.select

对于 select/poll API, 不需要预先创建文件描述符,也不需要在运行时注册文件描述符。所以唯一要做的就是处理调度器触发时的行为。

VALUE method_scheduler_wait(VALUE self) {
    // return IO.select(@readable.keys, @writable.keys, [], next_timeout)
    VALUE readable, writable, readable_keys, writable_keys, next_timeout;
    ID id_select = rb_intern("select");
    ID id_keys = rb_intern("keys");
    ID id_next_timeout = rb_intern("next_timeout");

    readable = rb_iv_get(self, "@readable");
    writable = rb_iv_get(self, "@writable");

    readable_keys = rb_funcall(readable, id_keys, 0);
    writable_keys = rb_funcall(writable, id_keys, 0);
    next_timeout = rb_funcall(self, id_next_timeout, 0);

    return rb_funcall(rb_cIO, id_select, 4, readable_keys, writable_keys, rb_ary_new(), next_timeout);
}

我们花了 10 行 C 干了原来 1 行 Ruby 就干好了的事。主要是这允许我们用 C 的宏定义来控制,从而使用其它 I/O 多路复用方法,例如 epoll and kqueue。我们需要实现 4 个 C 方法:

Scheduler.backend
scheduler = Scheduler.new

scheduler.register(io, interest)
scheduler.deregister(io)
scheduler.wait
#include <ruby.h>

VALUE Evt = Qnil;
VALUE Scheduler = Qnil;

void Init_evt_ext();
VALUE method_scheduler_init(VALUE self);
VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest);
VALUE method_scheduler_deregister(VALUE self, VALUE io);
VALUE method_scheduler_wait(VALUE self);
VALUE method_scheduler_backend();

void Init_evt_ext()
{
    Evt = rb_define_module("Evt");
    Scheduler = rb_define_class_under(Evt, "Scheduler", rb_cObject);
    rb_define_singleton_method(Scheduler, "backend", method_scheduler_backend, 0);
    rb_define_method(Scheduler, "init_selector", method_scheduler_init, 0);
    rb_define_method(Scheduler, "register", method_scheduler_register, 2);
    rb_define_method(Scheduler, "deregister", method_scheduler_deregister, 1);
    rb_define_method(Scheduler, "wait", method_scheduler_wait, 0);
}

Scheduler.backend 是专门给调试用的,剩下 4 个 API 会注入到调度器的 Scheduelr#run, Scheduelr#wait_readable, Scheduelr#wait_writable, Scheduelr#wait_any 中。

使用 epollkqueue

epoll 的三个核心 API 是 epoll_create epoll_ctl epoll_wait。很好理解,我们只要在调度器初始化的时候初始化 epoll fd,然后在注册 I/O 事件的时候调用 epoll_ctl,最后用 epoll_wait 替换掉 IO.select

#if defined(__linux__) // TODO: Do more checks for using epoll
#include <sys/epoll.h>
#define EPOLL_MAX_EVENTS 64

VALUE method_scheduler_init(VALUE self) {
    rb_iv_set(self, "@epfd", INT2NUM(epoll_create(1))); // Size of epoll is ignored after Linux 2.6.8.
    return Qnil;
}

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    struct epoll_event event;
    ID id_fileno = rb_intern("fileno");
    int epfd = NUM2INT(rb_iv_get(self, "@epfd"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_WRITABLE")));

    if (ruby_interest & readable) {
        event.events |= EPOLLIN;
    } else if (ruby_interest & writable) {
        event.events |= EPOLLOUT;
    }
    event.data.ptr = (void*) io;

    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
    return Qnil;
}

VALUE method_scheduler_deregister(VALUE self, VALUE io) {
    ID id_fileno = rb_intern("fileno");
    int epfd = NUM2INT(rb_iv_get(self, "@epfd"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // Require Linux 2.6.9 for NULL event.
    return Qnil;
}

VALUE method_scheduler_wait(VALUE self) {
    int n, epfd, i, event_flag, timeout;
    VALUE next_timeout, obj_io, readables, writables, result;
    ID id_next_timeout = rb_intern("next_timeout");
    ID id_push = rb_intern("push");

    epfd = NUM2INT(rb_iv_get(self, "@epfd"));
    next_timeout = rb_funcall(self, id_next_timeout, 0);
    readables = rb_ary_new();
    writables = rb_ary_new();

    if (next_timeout == Qnil) {
        timeout = -1;
    } else {
        timeout = NUM2INT(next_timeout);
    }

    struct epoll_event* events = (struct epoll_event*) xmalloc(sizeof(struct epoll_event) * EPOLL_MAX_EVENTS);

    n = epoll_wait(epfd, events, EPOLL_MAX_EVENTS, timeout);
    // TODO: Check if n >= 0

    for (i = 0; i < n; i++) {
        event_flag = events[i].events;
        if (event_flag & EPOLLIN) {
            obj_io = (VALUE) events[i].data.ptr;
            rb_funcall(readables, id_push, 1, obj_io);
        } else if (event_flag & EPOLLOUT) {
            obj_io = (VALUE) events[i].data.ptr;
            rb_funcall(writables, id_push, 1, obj_io);
        }
    }

    result = rb_ary_new2(2);
    rb_ary_store(result, 0, readables);
    rb_ary_store(result, 1, writables);

    xfree(events);
    return result;
}

VALUE method_scheduler_backend() {
    return rb_str_new_cstr("epoll");
}
#endif

kqueue 是类似的。唯一不同的是,BSD 的注册和等待用的是同一个 API,只是参数不同,所以有点难懂。

#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__APPLE__)
#include <sys/event.h>
#define KQUEUE_MAX_EVENTS 64

VALUE method_scheduler_init(VALUE self) {
    rb_iv_set(self, "@kq", INT2NUM(kqueue()));
    return Qnil;
}

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    struct kevent event;
    u_short event_flags = 0;
    ID id_fileno = rb_intern("fileno");
    int kq = NUM2INT(rb_iv_get(self, "@kq"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_WRITABLE")));

    if (ruby_interest & readable) {
        event_flags |= EVFILT_READ;
    } else if (ruby_interest & writable) {
        event_flags |= EVFILT_WRITE;
    }

    EV_SET(&event, fd, event_flags, EV_ADD|EV_ENABLE, 0, 0, (void*) io);
    kevent(kq, &event, 1, NULL, 0, NULL); // TODO: Check the return value
    return Qnil;
}

VALUE method_scheduler_deregister(VALUE self, VALUE io) {
    struct kevent event;
    ID id_fileno = rb_intern("fileno");
    int kq = NUM2INT(rb_iv_get(self, "@kq"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    EV_SET(&event, fd, 0, EV_DELETE, 0, 0, NULL);
    kevent(kq, &event, 1, NULL, 0, NULL); // TODO: Check the return value
    return Qnil;
}

VALUE method_scheduler_wait(VALUE self) {
    int n, kq, i;
    u_short event_flags = 0;

    struct kevent* events; // Event Triggered
    struct timespec timeout;
    VALUE next_timeout, obj_io, readables, writables, result;
    ID id_next_timeout = rb_intern("next_timeout");
    ID id_push = rb_intern("push");

    kq = NUM2INT(rb_iv_get(self, "@kq"));
    next_timeout = rb_funcall(self, id_next_timeout, 0);
    readables = rb_ary_new();
    writables = rb_ary_new();

   events = (struct kevent*) xmalloc(sizeof(struct kevent) * KQUEUE_MAX_EVENTS);

    if (next_timeout == Qnil || NUM2INT(next_timeout) == -1) {
        n = kevent(kq, NULL, 0, events, KQUEUE_MAX_EVENTS, NULL);
    } else {
        timeout.tv_sec = next_timeout / 1000;
        timeout.tv_nsec = next_timeout % 1000 * 1000 * 1000;
        n = kevent(kq, NULL, 0, events, KQUEUE_MAX_EVENTS, &timeout);
    }

    // TODO: Check if n >= 0
    for (i = 0; i < n; i++) {
        event_flags = events[i].filter;
        if (event_flags & EVFILT_READ) {
            obj_io = (VALUE) events[i].udata;
            rb_funcall(readables, id_push, 1, obj_io);
        } else if (event_flags & EVFILT_WRITE) {
            obj_io = (VALUE) events[i].udata;
            rb_funcall(writables, id_push, 1, obj_io);
        }
    }

    result = rb_ary_new2(2);
    rb_ary_store(result, 0, readables);
    rb_ary_store(result, 1, writables);

    xfree(events);
    return result;
}

VALUE method_scheduler_backend() {
    return rb_str_new_cstr("kqueue");
}
#endif

使用调度器的 HTTP 服务器例子

在实现好调度器后,我们要测试调度器的性能。因此我写了一个简单的 HTTP 服务器 benchmark

require 'evt'

puts "Using Backend: #{Evt::Scheduler.backend}"
Thread.current.scheduler = Evt::Scheduler.new

@server = Socket.new Socket::AF_INET, Socket::SOCK_STREAM
@server.bind Addrinfo.tcp '127.0.0.1', 3002
@server.listen Socket::SOMAXCONN
@scheduler = Thread.current.scheduler

def handle_socket(socket)
  line = socket.gets
  until line == "\r\n" || line.nil?
    line = socket.gets
  end
  socket.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
  socket.close
end

Fiber.new(blocking: false) do
  while true
    socket, addr = @server.accept
    Fiber.new(blocking: false) do
      handle_socket(socket)
    end.resume
  end
end.resume

@scheduler.run

比起原先阻塞的 I/O,使用 Ruby 3 非阻塞 I/O 后可以达到 3.33x 的性能,而使用 epoll 后可以达到 4.21x。服务器的例子很简单,所以当 JIT 启动时,不容易造成 ICache 不命中,因此性能进一步提升到了 4.54x。

Benchmark Result

测试是基于 Intel(R) Xeon(R) CPU E3-1220L V2 @ 2.30GHz CPU 的,而且程序是单线程的。如果有更好的 CPU,epollpoll 的差距会更大。欢迎尝试,相关 gem 代码已开源。

未来工作

未来工作主要是两部分。一个是提升现有 API 的稳定性,还有就是加入 io_uringIOCP 的支持。io_uring 倒是还好,但我是一点都不懂 Windows 开发。所以欢迎大家来提供意见和贡献。

源码

dsh0416/evt

Rei 将本帖设为了精华贴 08月18日 21:24

楼主谦虚了,我觉得懂 Windows 开发的一般也不懂 IOCP。

然后,我会告诉你,这种性能还是很低的,因为你只用了单线程,ruby thread 和 fiber 的关系是 thread 持有 fiber。

就算不用文中提到的 fiber,也是可以利用【面向状态】来实现让 fiber 内部 handler 处理的时候实现 schedule。

因此,fiber 跟并发还是扯不上关系,作者觉得并发了,是 set file descriptor flag non_block 了不让当前 thread 阻塞,好切入别的 subprocess。

魔术师喜欢用障眼法来让观众感到真实和快乐

amber 早就试过用这种模型,但是只是解决了编程风格问题,没有解决本质的 performance

作者需要认清本质问题【利用多核】,要想利用多核,Thread 是需要的,有了 Thread 再把 fiber 加入 cpu time 抢夺我觉得多此一举,要是这样还不如学 go 封装一下 thread,说到这里,表达的意思明确了,fiber 早期设计不是去解决 io block。

官方说明:

Fibers are primitives for implementing light weight cooperative concurrency in Ruby. Basically they are a means of creating code blocks that can be paused and resumed, much like threads. The main difference is that they are never preempted and that the scheduling must be done by the programmer and not the VM.

对于并发,Thread 是需要的,但是 GIL 的出现,让 Thread 捆成了无法 Parallel,这时候真正 3x3 表达的的,正是 Guild,我不知道为何要提出 Fiber 3x3,估计是对 fiber 的迷恋。

题主自己应该也发现,max performance 图也只有 8k qps,其实题主可能没意识到【这是单核的】,所以性能瓶颈其实不在 block nonblock,我建议博主试试一个 IO 往 另一个 IO write 1GB 的数据,你可以试试【阻塞 write】快,还是【非阻塞 write】快,你会得出更惊喜的答案。

extern "C"

jakit 回复

thread 持有 fiber 所以呢?

jakit 回复

非阻塞就是异步吧?

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册