Rails 五问 Sidekiq

early · 2018年05月24日 · 最后由 early 回复于 2018年05月29日 · 3274 次阅读
本帖已被设为精华帖!

Sidekiq 是Ruby社区最受欢迎的异步任务框架之一,几乎是Rails项目标配。

本文,我将从实际使用者的角度来提出疑问,通过一一解答这些问题来剖析Sidekiq是如何工作的。(代码基于Sidekiq-5.1.3)

在Web请求中,有很多任务是可以放到后台执行的,比如用户购买商品付款成功后,就可以直接向用户购买成功,相应的短信发送,物流通知等等就可以放到后台任务去做,不用在用户购买的同时立即执行,这些任务也称作异步任务。在Rails中,使用Sidekiq是这样的:

class HardWorker
  include Sidekiq::Worker
  sidekiq_options :retry => 5, queue: 'hard'
  def perform(name)
    # do something
  end
end
HardWorker.perform_async('bob')     # 异步执行任务

我们通常使用上面的方式,注册一个任务,让它异步执行。那么本文的第一个问题来了:

1. 异步任务到底如何注册的?

很容易看出perform_async这个类方法定义在 Sidekiq::Worker 里面,在源码中找到相关的代码

def perform_async(*args)
  client_push('class' => self, 'args' => args)
end

def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
  # 有省略
  Sidekiq::Client.new(pool).push(item)
end

可以看出,把自己的Class和方法的参数生成了一个hash,传递进了client_push方法, pool是一个redis连接, 调用了Client的实例方法, 通过一层层代码的查看,最终来到了我们最值得关注的一个方法

def atomic_push(conn, payloads)
  if payloads.first['at']  # 延时任务,例如指定了一分钟之后才执行
    conn.zadd('schedule', payloads.map do |hash|
      at = hash.delete('at').to_s
      [at, Sidekiq.dump_json(hash)]
    end)
  else
    q = payloads.first['queue']
    now = Time.now.to_f
    to_push = payloads.map do |entry|
      entry['enqueued_at'] = now
      Sidekiq.dump_json(entry)
    end
    conn.sadd('queues', q)
    conn.lpush("queue:#{q}", to_push)  #重点
  end
end

payloads是通过上面item参数加工后得到的,里面除了之前的class,args参数之外,还会有sidekiq_options方法定制的部分参数,例如queue。 我们重新专注于上面的代码,第一个问题的答案已经很明显了。

从上面可以清晰的看出,异步执行相关的数据被打包成json,然后使用redis的lpush命令塞进了一个队列中,这个队列的名字和指定的queue有关系,按照我们上面的配置,这个队列的就是 "queue:hard"。写入队列的数据大概像这样:

{'class' => MyWorker, 'args' => [1, 2, 3]}

通过这些参数,就可以知道需要被执行的是哪个Worker,并且参数是什么。 注册异步任务到此就结束了,第二个问题随之来了。

2. 上面这些队列里的数据是怎么被消费的?

通过第一个问题,我们知道了,所谓注册异步任务,就是将要执行的相关Class和参数写入redis队列,然后有其他专门的角色来处理这些异步任务,这样便实现了任务异步化,通过redis队列实现了完美的解耦合。那么这些队列数据是被谁消费了呢? 实际进行异步任务处理的就是我们熟悉的Sidekiq进程,通过这样可以启动Sidekiq进程:

bundle exec sidekiq 

启动的进程会去消费相应的队列数据,然后执行对应的代码。接下来,我们深入源码,看看这些具体是怎么回事。 首先简单的梳理一下Sidekiq的启动流程,通过启动流程顺藤摸瓜,找到我们的答案。Sidekiq命令本质上是下面的代码

#!/usr/bin/env ruby
$TESTING = false
require_relative '../lib/sidekiq/cli'
begin
  cli = Sidekiq::CLI.instance
  cli.parse
  cli.run
rescue => e
  raise e if $DEBUG
  STDERR.puts e.message
  STDERR.puts e.backtrace.join("\n")
  exit 1
end

重点在cli.parsecli.run上,我们依次看看它们到底是什么:

def parse(args=ARGV)
     @code = nil
     setup_options(args)
     initialize_logger
     validate!
     daemonize
     write_pid
   end

parse方法就是解析了一下配置数据,例如日志路径,要消费的队列等等,关键在于run方法上,下面截取一部分:

def run
  boot_system
  print_banner
  self_read, self_write = IO.pipe
  require 'sidekiq/launcher'
  @launcher = Sidekiq::Launcher.new(options)
  begin
    launcher.run                                # 关键方法
    while readable_io = IO.select([self_read])  # 死循环,等待消息
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    logger.info 'Shutting down'
    launcher.stop
    logger.info "Bye!"
    exit(0)
  end
end

可以看出,在进行了一些初始化之后,执行了 launcher.run方法,然后就进入了一个死循环,这个死循环会一直读pipe消息,根据消息执行相关的命令,常见的就是重启等等命令。 进程中的秘密自然的被聚焦到了launcher.run方法上,我们去一探究竟。

  class Launcher  # 部分截取
    include Util
    attr_accessor :manager, :poller, :fetcher
    def initialize(options)
      @manager = Sidekiq::Manager.new(options)
      @poller = Sidekiq::Scheduled::Poller.new
      @done = false
      @options = options
    end

    def run
      @thread = safe_thread("heartbeat", &method(:start_heartbeat))  # 第一行
      @poller.start    # 第二行
      @manager.start    # 第三行
    end 

    def start_heartbeat
      while true
        heartbeat
        sleep 5
      end
      Sidekiq.logger.info("Heartbeat stopping...")
    end
end

可以看到,执行了三行代码,先来看看safe_thread 方法:

def safe_thread(name, &block)
  Thread.new do
    Thread.current['sidekiq_label'] = name
    watchdog(name, &block)
  end
end

safe_thread方法就是返回了一个Thread实例,线程中会执行传递进去的block,上面传递的是一个start_heartbeat方法。可以看出其实是启动了一个心跳线程, 用来定时做一些数据统计,同时也是一种TCP连接保活机制。第二行执行了@poller.start, 它是Poller类的一个实例,下面看看源码中的start方法:

 class Poller  # 有删减
   def initialize
     @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
     @sleeper = ConnectionPool::TimedStack.new
     @done = false
     @thread = nil
   end

   def start
     @thread ||= safe_thread("scheduler") do
       initial_wait
       while !@done
         enqueue
         wait
       end
       Sidekiq.logger.info("Scheduler exiting...")
     end
   end
end

通过safe_thread方法返回了一个线程,线程中和start_heartbeat一样是一个死循环,wait方法就是随机地sleep几秒,重点在enqueue方法:

# @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new      
def enqueue
   begin
     @enq.enqueue_jobs
   rescue => ex
     logger.error ex.message
     handle_exception(ex)
   end
end

再转到enqueue_job方法:

SETS = %w(retry schedule)
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
   Sidekiq.redis do |conn|
     sorted_sets.each do |sorted_set|
       while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
         if conn.zrem(sorted_set, job)
           Sidekiq::Client.push(Sidekiq.load_json(job))
           Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
         end
       end
     end
   end
 end

可以看出@poller.start就是在 retry, schedule 队列间分别去取重试和延时的任务,使用的是redis的sorted_set,具体的细节,第三个问题还会继续详细探讨。总之到这里,只要知道,@poller.start这个方法就是起了一个线程,这个线程去retry和schedule队列中取任务,然后将取到的任务放到任务队列中等待执行,注意这里的任务队列,其实就是上面问题1我们所说的注册异步任务的队列。

到现在为止,真正的任务还是没有被执行,接下来,我们就看看任务如何被执行,关注第三行代码: @manager.start 代码:

  class Manager # 有删减
    def initialize(options={})
      @options = options
      @count = options[:concurrency] || 25
      @workers = Set.new
      @count.times do
        @workers << Processor.new(self)
      end
      @plock = Mutex.new
    end

    def start
      @workers.each do |x|
        x.start
      end
    end
end

start方法启动了@count个线程,@count的值可以自己设定,默认为25。Sidekiq默认每个进程中有25个线程worker,这些@workers就是用来消费上面所说的任务队列中的数据,并执行相关任务的代码, 看看他们是如何执行的:代码

  class Processor  # 有删减
    def start
      @thread ||= safe_thread("processor", &method(:run))
    end

    private unless $TESTING

    def run
      begin
        while !@done
          process_one
        end
        @mgr.processor_stopped(self)
    end

    def process_one
      @job = fetch
      process(@job) if @job
      @job = nil
    end
end

注意力转到核心的process_one方法,可以看出流程大概为: 去任务队列中取出一个任务,如果任务存在,就执行这个任务。

去任务队列取数据的细节值得我们关注,将注意力放到fetch方法,随着调用往下走,我们会发现一个关键的方法

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    queues = @queues.shuffle.uniq
    queues << TIMEOUT
    queues
  end
end

def retrieve_work # 关键方法
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end

可以看到,@worker取数据的方式就是使用redis的brpop命令去指定的队列中,阻塞式的取数据。queue_cmd参数中指定了要监听的队列以及超时时间。

这里引申一下,在日常的开发中,我们常常会为不同的异步任务类型指定不同的queue,从上面我们知道,通过参数传递,可以指定@worker去哪个队列消费数据。也就是说,我们可以启动不同的进程去消费不同队列的数据,一对一,或者一对多都行。这些进程可以在同一台机器上,也可以在不同的机器上。

拿到数据后,实际执行任务的方式就是,通过任务参数中的 class 参数,通过class.constantize拿实际的class,然后执行 class.new.perform(*args), 细节可以查看源码,这里就不再赘述。 总结一下,我们谈到了3种队列:

  • retry , 放重试任务的队列
  • schedule,放延迟任务的队列
  • "queue:#{queue}",可能是多个队列,存放即将被执行的任务

@poller.start线程会定时去retry和schedule中取数据,将它放入 queue中,@wokers会自动去执行queue中的任务。

上面两个问题,分别回答了异步任务是如何注册的,同时任务又是如何被消费执行的,其中有些细节问题,还是值得我们思考,在使用Sidekiq的时候,会有如下的使用方式:

HardWorker.perform_in(5.minutes, 'bob', 5)

这种延时任务是如何实现的呢? 这也是第二个问题中我留的一个尾巴,接下来回答第三个问题:

3. 延时任务以及重试任务是如何实现的?

在第二个问题中,我们解释了

@poller.start

这条代码的工作内容就是在 retry和schedule队列中去取数据,然后将取到的数据写入任务队列中,等待@workers去执行,这就是延时和重试任务的关键之处。

上面我们说了,这两个队列是redis的sorted_set。 当有延时任务时,perform_in方法就会将相应的 {class: 'HardWorker', args: *args} 参数写入schedule队列, 并且将它被指定执行的时间点的时间戳当作score写入。上面延时5分钟,那么写入的score就大致为 (Time.now + 5.minutes).to_i 。

当有任务失败的时候,也是同理,将下一次重试的时间当作score,和对应的class等数据写入retry队列

@poller.start 通过读取score比现在小的数据,然后将这些任务数据写入 任务队列。

job = conn.zrangebyscore(sorted_set, '-inf', Time.now.to_i, :limit => [0, 1])

当score比当前时间戳小,说明指定的执行时间已经到了,那么就将它读取出来,写入任务队列,等待被@workers执行。

4. 定时任务如何实现的?

在日常的使用中,我们常常需要有定时的周期任务需要执行,比如每分钟需要执行一次,每天需要跑一个统计等等。Sidekiq的免费版中没有提供这个功能,然而有很多的其他插件提供了这个功能,比如sidekiq-scheduler,接下来,我们就来看看它是如何实现周期性任务的。

在这之前,我们先思考一下,通过上面的内容,我们明白,只要有什么东西周期性地往任务队列中写入任务,然后@worker本身会自动去执行这些任务,这样就实现了周期性任务。

通过使用这个插件,我发现它只需要增加一个配置文件就行了,其他的什么都没有做,便实现了这点,看源码

Sidekiq.configure_server do |config|  # 有删减
  config.on(:startup) do
    scheduler_options = {
      dynamic:       dynamic,
      dynamic_every: dynamic_every,
      enabled:       enabled,
      schedule:      schedule,
      listened_queues_only: listened_queues_only
    }
    schedule_manager = SidekiqScheduler::Manager.new(scheduler_options)
    config.options[:schedule_manager] = schedule_manager
    config.options[:schedule_manager].start   # 重点
  end

在自动加载这段配置代码的时候, 它会先解析配置文件,然后执行一个start方法, 这个start方法通过rufus-scheduler,通过线程模拟了类似crontab的功能,周期性地往任务队列写入数据。

这里会有个问题值得注意,每一个读取了相关配置文件的进程,都会通过执行上面的start方法,实现定时写入任务的功能。 当我们启动了N个进程时,每个进程都会定时产生定时任务,原来配置的1分钟执行一次,就会变成1分钟执行N次。

简单的解决方法是,只让一个进程读相关配置文件,让这个读了配置文件的进程单独扮演生产定时写入任务的角色。

5. Active::Job 有什么用?

Rails在4之后的版本中就集成了Active::Job的功能,它本质上就是ruby异步任务框架的一种再封装,实际的任务还是依靠像Sidekiq这种来执行的。

ruby社区有众多的异步任务框架,Sidekiq,Resque,DelayJob等等, 这些异步框架的使用都有不少差异。如果项目早期使用的Resque,但是随着项目的发展,后面觉得Sidekiq更加适合,那么从Resque切换到Sidekiq就会有很多代码的改动,只要有改动,则就有可能出问题。

Active::Job就是在异步任务框架和Rails之间扮演了一个解耦合的角色,并且更方便地提供了一些回调功能。换任务框架,不需要改Rails相关的代码,只需要修改Active::Job的任务适配器就行。在有任务框架切换背景的时候还是很有价值。

但是从我个人的角度来看,如果切换任务框架的可能性不大时,就不太值得使用它,Active::Job多了一层封装,无论从性能和复杂度上来讲都有所增加,毕竟简单的场景在很多时候会更好,Rails已经封装的有些臃肿了。

如发现有错误或不恰当的地方,欢迎您的指正。

共收到 9 条回复

在 csdn 写博客? 汪汪汪?

IChou 回复

有些时候,你的选择是很正确的,垃圾文章确实有点多。面对低质量信息源,一般有两种应对模式:

  • “悲观锁”模式,相信都是垃圾,直接全部屏蔽。
  • “乐观锁”模式,相信还是有优质内容,自己去过滤出好的东西

有一天也许可以通过AI,帮我们判断,在搜索不同的话题的时候,恰当地在这两种模式间切换。

huacnlee 将本帖设为了精华贴 05月25日 10:33

分析还不错, 但是源码引用, 我建议还是附带源码地址..不然看得一脸雾水.

ruby_sky 回复

好建议,已加上链接。

实例变量这样写 @worker 否则会变成link

ruby里面 resque 和 sidekiq。实现方式差不多。将数据放到redis,此时能立即返回,实现异步。剩下的就是开启另外的进程去redis中轮询数据

hooopo 回复

确实,没太注意到

楼主很棒, 感谢分享!

希望楼主也能分析一下 sidekiq 中 线程的监控, 以及 sidekiq 由原来的 Celluloid 换成 Raw Threads 的方式后, 线程之间的通信情况.

还有一些队列的基本问题: 如 为什么 Sidekiq 是 At Least Once 等, 我们能做到 Exactly Once 吗

等等之类的问题.

dengqinghua 回复

感谢提出这么值得思考的问题

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册