Sidekiq 是 Ruby 社区最受欢迎的异步任务框架之一,几乎是 Rails 项目标配。
本文,我将从实际使用者的角度来提出疑问,通过一一解答这些问题来剖析 Sidekiq 是如何工作的。(代码基于 Sidekiq-5.1.3)
在 Web 请求中,有很多任务是可以放到后台执行的,比如用户购买商品付款成功后,就可以直接向用户购买成功,相应的短信发送,物流通知等等就可以放到后台任务去做,不用在用户购买的同时立即执行,这些任务也称作异步任务。在 Rails 中,使用 Sidekiq 是这样的:
class HardWorker
include Sidekiq::Worker
sidekiq_options :retry => 5, queue: 'hard'
def perform(name)
# do something
end
end
HardWorker.perform_async('bob') # 异步执行任务
我们通常使用上面的方式,注册一个任务,让它异步执行。那么本文的第一个问题来了:
很容易看出 perform_async 这个类方法定义在 Sidekiq::Worker 里面,在源码中找到相关的代码:
def perform_async(*args)
client_push('class' => self, 'args' => args)
end
def client_push(item) # :nodoc:
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
# 有省略
Sidekiq::Client.new(pool).push(item)
end
可以看出,把自己的 Class 和方法的参数生成了一个 hash,传递进了 client_push 方法,pool 是一个 redis 连接,调用了 Client 的实例方法,通过一层层代码的查看,最终来到了我们最值得关注的一个方法:
def atomic_push(conn, payloads)
if payloads.first['at'] # 延时任务,例如指定了一分钟之后才执行
conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
now = Time.now.to_f
to_push = payloads.map do |entry|
entry['enqueued_at'] = now
Sidekiq.dump_json(entry)
end
conn.sadd('queues', q)
conn.lpush("queue:#{q}", to_push) #重点
end
end
payloads 是通过上面 item 参数加工后得到的,里面除了之前的 class,args 参数之外,还会有 sidekiq_options 方法定制的部分参数,例如 queue。我们重新专注于上面的代码,第一个问题的答案已经很明显了。
从上面可以清晰的看出,异步执行相关的数据被打包成 json,然后使用 redis 的 lpush 命令塞进了一个队列中,这个队列的名字和指定的 queue 有关系,按照我们上面的配置,这个队列的就是 "queue:hard"。写入队列的数据大概像这样:
{'class' => MyWorker, 'args' => [1, 2, 3]}
通过这些参数,就可以知道需要被执行的是哪个 Worker,并且参数是什么。注册异步任务到此就结束了,第二个问题随之来了。
通过第一个问题,我们知道了,所谓注册异步任务,就是将要执行的相关 Class 和参数写入 redis 队列,然后有其他专门的角色来处理这些异步任务,这样便实现了任务异步化,通过 redis 队列实现了完美的解耦合。那么这些队列数据是被谁消费了呢?实际进行异步任务处理的就是我们熟悉的 Sidekiq 进程,通过这样可以启动 Sidekiq 进程:
bundle exec sidekiq
启动的进程会去消费相应的队列数据,然后执行对应的代码。接下来,我们深入源码,看看这些具体是怎么回事。 首先简单的梳理一下 Sidekiq 的启动流程,通过启动流程顺藤摸瓜,找到我们的答案。Sidekiq 命令本质上是下面的代码:
#!/usr/bin/env ruby
$TESTING = false
require_relative '../lib/sidekiq/cli'
begin
cli = Sidekiq::CLI.instance
cli.parse
cli.run
rescue => e
raise e if $DEBUG
STDERR.puts e.message
STDERR.puts e.backtrace.join("\n")
exit 1
end
重点在cli.parse 和cli.run上,我们依次看看它们到底是什么:
def parse(args=ARGV)
@code = nil
setup_options(args)
initialize_logger
validate!
daemonize
write_pid
end
parse 方法就是解析了一下配置数据,例如日志路径,要消费的队列等等,关键在于run方法上,下面截取一部分:
def run
boot_system
print_banner
self_read, self_write = IO.pipe
require 'sidekiq/launcher'
@launcher = Sidekiq::Launcher.new(options)
begin
launcher.run # 关键方法
while readable_io = IO.select([self_read]) # 死循环,等待消息
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
logger.info 'Shutting down'
launcher.stop
logger.info "Bye!"
exit(0)
end
end
可以看出,在进行了一些初始化之后,执行了 launcher.run 方法,然后就进入了一个死循环,这个死循环会一直读 pipe 消息,根据消息执行相关的命令,常见的就是重启等等命令。进程中的秘密自然的被聚焦到了launcher.run方法上,我们去一探究竟。
class Launcher # 部分截取
include Util
attr_accessor :manager, :poller, :fetcher
def initialize(options)
@manager = Sidekiq::Manager.new(options)
@poller = Sidekiq::Scheduled::Poller.new
@done = false
@options = options
end
def run
@thread = safe_thread("heartbeat", &method(:start_heartbeat)) # 第一行
@poller.start # 第二行
@manager.start # 第三行
end
def start_heartbeat
while true
heartbeat
sleep 5
end
Sidekiq.logger.info("Heartbeat stopping...")
end
end
可以看到,执行了三行代码,先来看看 safe_thread 方法:
def safe_thread(name, &block)
Thread.new do
Thread.current['sidekiq_label'] = name
watchdog(name, &block)
end
end
safe_thread 方法就是返回了一个 Thread 实例,线程中会执行传递进去的 block,上面传递的是一个 start_heartbeat 方法。可以看出其实是启动了一个心跳线程,用来定时做一些数据统计,同时也是一种 TCP 连接保活机制。第二行执行了@poller.start
,它是 Poller 类的一个实例,下面看看源码中的start方法:
class Poller # 有删减
def initialize
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
@sleeper = ConnectionPool::TimedStack.new
@done = false
@thread = nil
end
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end
end
通过 safe_thread 方法返回了一个线程,线程中和 start_heartbeat 一样是一个死循环,wait 方法就是随机地 sleep 几秒,重点在enqueue方法:
# @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
def enqueue
begin
@enq.enqueue_jobs
rescue => ex
logger.error ex.message
handle_exception(ex)
end
end
再转到enqueue_job方法:
SETS = %w(retry schedule)
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
可以看出@poller.start
就是在 retry,schedule 队列间分别去取重试和延时的任务,使用的是 redis 的 sorted_set,具体的细节,第三个问题还会继续详细探讨。总之到这里,只要知道,@poller.start
这个方法就是起了一个线程,这个线程去 retry 和 schedule 队列中取任务,然后将取到的任务放到任务队列中等待执行,注意这里的任务队列,其实就是上面问题 1 我们所说的注册异步任务的队列。
到现在为止,真正的任务还是没有被执行,接下来,我们就看看任务如何被执行,关注第三行代码: @manager.start
代码:
class Manager # 有删减
def initialize(options={})
@options = options
@count = options[:concurrency] || 25
@workers = Set.new
@count.times do
@workers << Processor.new(self)
end
@plock = Mutex.new
end
def start
@workers.each do |x|
x.start
end
end
end
start 方法启动了@count
个线程,@count
的值可以自己设定,默认为 25。Sidekiq 默认每个进程中有 25 个线程 worker,这些@workers
就是用来消费上面所说的任务队列中的数据,并执行相关任务的代码,看看他们是如何执行的:代码
class Processor # 有删减
def start
@thread ||= safe_thread("processor", &method(:run))
end
private unless $TESTING
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
end
def process_one
@job = fetch
process(@job) if @job
@job = nil
end
end
注意力转到核心的 process_one 方法,可以看出流程大概为:去任务队列中取出一个任务,如果任务存在,就执行这个任务。
去任务队列取数据的细节值得我们关注,将注意力放到 fetch 方法,随着调用往下走,我们会发现一个关键的方法:
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << TIMEOUT
queues
end
end
def retrieve_work # 关键方法
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
可以看到,@worker
取数据的方式就是使用 redis 的 brpop 命令去指定的队列中,阻塞式的取数据。queue_cmd 参数中指定了要监听的队列以及超时时间。
这里引申一下,在日常的开发中,我们常常会为不同的异步任务类型指定不同的 queue,从上面我们知道,通过参数传递,可以指定@worker
去哪个队列消费数据。也就是说,我们可以启动不同的进程去消费不同队列的数据,一对一,或者一对多都行。这些进程可以在同一台机器上,也可以在不同的机器上。
拿到数据后,实际执行任务的方式就是,通过任务参数中的 class 参数,通过 class.constantize 拿实际的 class,然后执行 class.new.perform(*args),细节可以查看源码,这里就不再赘述。 总结一下,我们谈到了 3 种队列:
@poller.start
线程会定时去 retry 和 schedule 中取数据,将它放入 queue 中,@wokers
会自动去执行 queue 中的任务。
上面两个问题,分别回答了异步任务是如何注册的,同时任务又是如何被消费执行的,其中有些细节问题,还是值得我们思考,在使用 Sidekiq 的时候,会有如下的使用方式:
HardWorker.perform_in(5.minutes, 'bob', 5)
这种延时任务是如何实现的呢?这也是第二个问题中我留的一个尾巴,接下来回答第三个问题:
在第二个问题中,我们解释了
@poller.start
这条代码的工作内容就是在 retry 和 schedule 队列中去取数据,然后将取到的数据写入任务队列中,等待@workers
去执行,这就是延时和重试任务的关键之处。
上面我们说了,这两个队列是 redis 的 sorted_set。当有延时任务时,perform_in 方法就会将相应的 {class: 'HardWorker', args: *args} 参数写入 schedule 队列,并且将它被指定执行的时间点的时间戳当作 score 写入。上面延时 5 分钟,那么写入的 score 就大致为 (Time.now + 5.minutes).to_i。
当有任务失败的时候,也是同理,将下一次重试的时间当作 score,和对应的 class 等数据写入 retry 队列。
@poller.start
通过读取 score 比现在小的数据,然后将这些任务数据写入 任务队列。
job = conn.zrangebyscore(sorted_set, '-inf', Time.now.to_i, :limit => [0, 1])
当 score 比当前时间戳小,说明指定的执行时间已经到了,那么就将它读取出来,写入任务队列,等待被@workers
执行。
在日常的使用中,我们常常需要有定时的周期任务需要执行,比如每分钟需要执行一次,每天需要跑一个统计等等。Sidekiq 的免费版中没有提供这个功能,然而有很多的其他插件提供了这个功能,比如sidekiq-scheduler,接下来,我们就来看看它是如何实现周期性任务的。
在这之前,我们先思考一下,通过上面的内容,我们明白,只要有什么东西周期性地往任务队列中写入任务,然后@worker
本身会自动去执行这些任务,这样就实现了周期性任务。
通过使用这个插件,我发现它只需要增加一个配置文件就行了,其他的什么都没有做,便实现了这点,看源码:
Sidekiq.configure_server do |config| # 有删减
config.on(:startup) do
scheduler_options = {
dynamic: dynamic,
dynamic_every: dynamic_every,
enabled: enabled,
schedule: schedule,
listened_queues_only: listened_queues_only
}
schedule_manager = SidekiqScheduler::Manager.new(scheduler_options)
config.options[:schedule_manager] = schedule_manager
config.options[:schedule_manager].start # 重点
end
在自动加载这段配置代码的时候,它会先解析配置文件,然后执行一个 start 方法,这个 start 方法通过rufus-scheduler,通过线程模拟了类似 crontab 的功能,周期性地往任务队列写入数据。
这里会有个问题值得注意,每一个读取了相关配置文件的进程,都会通过执行上面的 start 方法,实现定时写入任务的功能。当我们启动了 N 个进程时,每个进程都会定时产生定时任务,原来配置的 1 分钟执行一次,就会变成 1 分钟执行 N 次。
简单的解决方法是,只让一个进程读相关配置文件,让这个读了配置文件的进程单独扮演生产定时写入任务的角色。
Rails 在 4 之后的版本中就集成了 Active::Job 的功能,它本质上就是 ruby 异步任务框架的一种再封装,实际的任务还是依靠像 Sidekiq 这种来执行的。
ruby 社区有众多的异步任务框架,Sidekiq,Resque,DelayJob 等等,这些异步框架的使用都有不少差异。如果项目早期使用的 Resque,但是随着项目的发展,后面觉得 Sidekiq 更加适合,那么从 Resque 切换到 Sidekiq 就会有很多代码的改动,只要有改动,则就有可能出问题。
Active::Job 就是在异步任务框架和 Rails 之间扮演了一个解耦合的角色,并且更方便地提供了一些回调功能。换任务框架,不需要改 Rails 相关的代码,只需要修改 Active::Job 的任务适配器就行。在有任务框架切换背景的时候还是很有价值。
但是从我个人的角度来看,如果切换任务框架的可能性不大时,就不太值得使用它,Active::Job 多了一层封装,无论从性能和复杂度上来讲都有所增加,毕竟简单的场景在很多时候会更好,Rails 已经封装的有些臃肿了。
如发现有错误或不恰当的地方,欢迎您的指正。