在之前的文章《Sidekiq 任务调度流程分析》中,我们一起仔细分析了 Sidekiq 是如何基于多线程完成队列任务处理以及调度的。我们在之前的分析里,看到了不管是 Sidekiq::Scheduled::Poller
还是 Sidekiq::Processor
的核心代码里,都会有一个由 @done
实例变量控制的循环体:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done # 这是 poller 的循环控制
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
begin
while !@done # 这是我们常说的 worker 循环控制
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
也就是说,这些 @done
实例变量决定了 poller
线程跟 worker
线程是否循环执行?一旦 @done
被改为 true
,那循环体就不再执行,线程自然也就是退出了。于是,单从这些代码,我们可以断定,Sidekiq 就是通过设置 @done
的值来通知一个线程安全退出(graceful exit)的。我们也知道,生产环境中,我们是通过发送信号的方式来告诉 sidekiq 退出或者进入静默 (quiet) 状态的,那么,这里的 @done
是怎么跟信号处理联系起来的呢?这些就是今天这篇文章的重点了!
USR1
信号(即进入 quiet
模式)需要尽可能早,而进程的退出重启需要尽可能晚。因为前一篇文章着眼于任务调度,所以略过了其他无关细节,包括信号处理,这篇文章则将镜头对准信号处理,所以让我们从头再来一遍,只是这一次,我们只关心与信号处理有关的代码。
依旧是从 cli.rb
文件开始,它是 Sidekiq 核心代码的生命起点,因为 Sidekiq 命令行启动后,它是第一个被执行的代码,Sidekiq 启动过程中调用了 Sidekiq::CLI#run
方法:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106
def run
boot_system
print_banner
self_read, self_write = IO.pipe
%w(INT TERM USR1 USR2 TTIN).each do |sig|
begin
trap sig do
self_write.puts(sig)
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end
end
# ... other codes
begin
launcher.run
while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
logger.info 'Shutting down'
launcher.stop
# Explicitly exit so busy Processor threads can't block
# process shutdown.
logger.info "Bye!"
exit(0)
end
以上的代码就是整个 Sidekiq 最顶层的信号处理的核心代码了,让我们慢慢分析!
首先,self_read, self_write = IO.pipe
创建了一个模拟管道的 IO 对象,并且同时返回这个 管道的一个写端以及一个读端,通过这两端,就可以实现对管道的读写了。需要注意的是,IO.pipe
创建的读端在读的时候不会自动生成 EOF
符,所以这就要求读时,写端是关闭的,而写时,读端是关闭的,一句话说,就是这样的管道不允许读写端同时打开。关于 IO.pipe
还有挺多细节跟需要注意的点,如果还需要了解,请阅读官方文档。
上面说的管道本质上只是一个 IO 对象而已,暂时不用纠结太多,让我们接着往下读:
%w(INT TERM USR1 USR2 TTIN).each do |sig|
begin
trap sig do
self_write.puts(sig)
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end
end
这段代码就比较有意思了,最外层遍历了一个系统信号的数组,然后逐个信号进行监听(trap,或者叫捕捉?)。让我们聚焦在 trap
方法的调用跟其 block 上,查阅 Ruby 文档,发现 trap
是 Signal
模块下的一个方法,Signal
主要是处理与系统信号有关的任务,然后 trap
的作用是:
Specifies the handling of signals. The first parameter is a signal name (a string such as “SIGALRM”, “SIGUSR1”, and so on) or a signal number...
所以,前面的那段代码的意思就很容易理解了,Sidekiq 注册了对 INT
、TERM
、USR1
、USR2
以及TTIN
等系统信号的处理,而在进程收到这些信号时,就会执行 self_write.puts(sig)
,也就是将收到的信号通过之前介绍的管道写端 self_write
记录下来。什么?只记录下来,那还得处理啊?!
稍安勿躁,让我们接着往下分析 Sidekiq::CLI#run
方法末尾的代码:
begin
launcher.run
while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
logger.info 'Shutting down'
launcher.stop
# Explicitly exit so busy Processor threads can't block
# process shutdown.
logger.info "Bye!"
exit(0)
end
看到没有,这里有个循环,循环控制条件里,readable_io = IO.select([self_read])
是从前面的管道的读端 self_read
阻塞地等待信号的到达。对于 IO.select
,Ruby 官方文档介绍如下:
Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.
所以这里就是说 Sidekiq 主线程首先负责执行完其他初始化工作,最后阻塞在信号等待以及处理。在其等到新的信号之后,进入上面代码展示的循环体:
signal = readable_io.first[0].gets.strip
handle_signal(signal)
这里语法细节先不深究,我们看下这两行代码第一行是从前面说的管道中读取信号,并且将信号传递给 handle_signal
方法,让我们接着往下看 handle_signal
方法的定义:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153
def handle_signal(sig)
Sidekiq.logger.debug "Got #{sig} signal"
case sig
when 'INT'
# Handle Ctrl-C in JRuby like MRI
# http://jira.codehaus.org/browse/JRUBY-4637
raise Interrupt
when 'TERM'
# Heroku sends TERM and then waits 10 seconds for process to exit.
raise Interrupt
when 'USR1'
Sidekiq.logger.info "Received USR1, no longer accepting new work"
launcher.quiet
when 'USR2'
if Sidekiq.options[:logfile]
Sidekiq.logger.info "Received USR2, reopening log file"
Sidekiq::Logging.reopen_logs
end
when 'TTIN'
Thread.list.each do |thread|
Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}"
if thread.backtrace
Sidekiq.logger.warn thread.backtrace.join("\n")
else
Sidekiq.logger.warn "<no backtrace available>"
end
end
end
end
这里的代码挺长,但是一点都不难理解,我简单解释下就够了。当进程:
TERM
或者 INT
信号时,直接抛出 Interrupt
中断;USR1
信号时,则通知 launcher
执行 .quiet
方法,Sidekiq 在这里进入 Quiet 模式(怎么进入?);USR2
信号时,重新打开日志;TTIN
信号时,打印所有线程当前正在执行的代码列表。到此,一个信号从收到被存下,到被取出处理的大致过程就是这样的,至于具体的处理方式,我们下个章节详细展开。现在有一点需要补充的是,上面讲当 Sidekiq 收到 TERM
或者 INT
信号时,都会抛出 Interrupt
中断异常,那这个异常又是如何处理的呢?我们回过头去看刚才最开始的 Sidekiq::CLI#run
方法末尾的代码:
begin
launcher.run
while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
rescue Interrupt
logger.info 'Shutting down'
launcher.stop
# Explicitly exit so busy Processor threads can't block
# process shutdown.
logger.info "Bye!"
exit(0)
end
原来是 run
方法在处理信号时,声明了 rescue Interrupt
,捕捉了 Interrupt
中断异常,并且在异常处理时打印必要日志,同时执行 launcher.stop
通知各个线程停止工作,最后调用 exit
方法强制退出进程,到此,一个 Sidekiq 进程就彻底退出了。
但是问题又来了,信号处理的大致过程我是知道了,但是具体的 launcher.quiet
跟 launcher.stop
都干了些什么呢?
老规矩,先上代码:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36
def quiet
@done = true
@manager.quiet
@poller.terminate
end
代码只有短短三行。Launcher 对象首先设置自己的实例变量 @done
的值为 true
,接着执行 @manager.quiet
以及 @poller.terminate
。看方法命名上理解,应该是 Luancher 对象又将 quiet 的消息传递给了 @manager
即 Sidekiq::Manager
对象,同时通知 @poller
即 Sidekiq::Scheduled::Poller
对象结束工作。那到底是不是真的这样呢?让我们继续深挖!
让我们来看看 Sidekiq::Manager#quiet
方法的代码
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58
def quiet
return if @done
@done = true
logger.info { "Terminating quiet workers" }
@workers.each { |x| x.terminate }
fire_event(:quiet, true)
end
上面的代码也很短,首先将 Sidekiq::Manager
对象自身的 @done
实例变量的值设置为 true
,接着对其所管理的每一个 worker,都发出一个 terminate
消息。让我们接着往下看 worker 对象(Sidekiq::Processor
对象)的 #terminate
方法定义:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46
def terminate(wait=false)
@done = true
return if !@thread
@thread.value if wait
end
这里的代码依然保持了精短的特点!跟上一层逻辑一样,worker 在处理 terminate
时,同样设置自己的 @done
实例变量为 true
后返回,但是,如果其参数 wait
为 true
,则会保持主线程等待,直到 @thread
线程退出(@thread.value
相当于执行 @thread.join
并且返回线程的返回值,可参考 Ruby 文档)。
那么,这里就要问了,worker 设置 @done
为 true 是要干嘛?这里好像也没有做什么特别的事啊?!勿急,还记得上篇文章介绍 worker 运行时的核心代码吗?
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
看到了吧,@done
变量可是一个重要的开关,当 @done
为 false
时,worker 一直周而复始地从队列中取任务并且老老实实干活;而当 @done
为 true
时,worker 在处理完当前的任务之后,便不再执行新的任务,执行 @msg.processor_stopped(self)
通知 worker 管理器自己已经退出工作,最终 #run
方法返回。由于 #run
方法是在独立线程里执行的,所以当 #run
方法返回时,其所在的线程自然也就退出了。
那关于 worker 的 quiet 模式进入过程就是这么简单,通过一个共享变量 @done
便实现了对工作线程的控制。
前面说到 Sidekiq::Launcher#quiet
执行时,先将消息传递给了 worker 管理器,随后执行了 @poller.terminate
,那我们来看看 #terminate
方法的定义:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61
def terminate
@done = true
if @thread
t = @thread
@thread = nil
@sleeper << 0
t.value
end
end
又是如此简短的代码。poller 退出的逻辑跟 worker 退出的逻辑非常一致,都是同样先设置自己的 @done
实例变量的值为 true
,接着等待线程 @thread
退出,最后 poller 返回。
那么,poller 的 @done
是不是也是用来控制线程退出呢?答案是肯定的!
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done
enqueue
wait
end
Sidekiq.logger.info("Scheduler exiting...")
end
end
还记得上面这段代码吗?poller 在每次将定时任务压回任务队列之后,等待一定时间,然后重新检查 @done
的值,如果为 true
,则 poller 直接返回退出,因为 #start
方法里的循环体在新线程中执行,当循环结束时,线程自然也退出了。
USR1
系统信号时,Sidekiq 主线程向 @launcher
发送 quiet
消息,@launcher
又将消息传递给 @manager
,同时向 @poller
发出 terminate
消息;@manager
在收到 quiet
消息时,逐一对运行中的 worker 发送 terminate
消息,worker 收到消息后,设置自己的 @done
为 true
,标识不再处理新任务,当前任务处理完成后退出线程;@poller
在收到 terminate
消息后,也是设置自己的 @done
为 true
,在本次任务执行完毕后,线程也退出;前面介绍的是 Sidekiq 进入 quiet 模式的过程,那 Sidekiq 的停止过程又是怎样的呢?
让我们从 Sidekiq::Launcher#stop
方法开始寻找答案:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56
def stop
deadline = Time.now + @options[:timeout]
@done = true
@manager.quiet
@poller.terminate
@manager.stop(deadline)
# Requeue everything in case there was a worker who grabbed work while stopped
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy.bulk_requeue([], @options)
clear_heartbeat
end
首先,Sidekiq::Launcher
对象设定了一个强制退出的 deadline
,时间是以当前时间加上配置的 timeout
,这个时间默认是 8 秒。
接着,设定对象本身的 @done
变量的值为 true
,然后分别对 @manager
和 @poller
发送 quiet
和 terminate
消息,这个过程就是我们上面说的 Sidekiq::Launcher#quiet
的过程,所以,这里的代码主要是 Sidekiq 要确保退出前已经通知各个线程准备退出。
接下来的代码就比较重要了,我们先看这一行:
@manager.stop(deadline)
在通知完 @manager
进入 quiet 模式之后,launcher 向 @manager
发送了 stop
消息,并且同时传递了 deadline
参数。让我们接着继续往下看:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83
PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5
def stop(deadline)
quiet
fire_event(:shutdown, true)
# some of the shutdown events can be async,
# we don't have any way to know when they're done but
# give them a little time to take effect
sleep PAUSE_TIME
return if @workers.empty?
logger.info { "Pausing to allow workers to finish..." }
remaining = deadline - Time.now
while remaining > PAUSE_TIME
return if @workers.empty?
sleep PAUSE_TIME
remaining = deadline - Time.now
end
return if @workers.empty?
hard_shutdown
end
上面的代码,manager 首先调用了自身的 quiet
方法(这里就真的多此一举了,因为外层的 launcher 已经调用过一次了),然后 manager 执行 sleep
系统调用进入休眠,持续时间为 0.5 秒,休眠结束后检查所有 worker 是否已经都退出,如果退出,则直接返回,任务提前结束;如果仍有 worker 未退出,则检查当前时间是否接近强制退出的 deadline,如果不是,则重复“检查所有 worker 退出 - 休眠”的过程,直到 deadline 来临,或者 worker 线程都已经全部退出。如果最后到达 deadline,仍有 worker 线程未退出,则最后执行 hard_shutdown
。
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135
def hard_shutdown
cleanup = nil
@plock.synchronize do
cleanup = @workers.dup
end
if cleanup.size > 0
jobs = cleanup.map {|p| p.job }.compact
# ... other codes
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy.bulk_requeue(jobs, @options)
end
cleanup.each do |processor|
processor.kill
end
end
这里 hard_shutdown
方法在执行时,首先克隆了当前仍未退出的 @workers
列表,接着获取每个 worker 当前正在处理的任务,将这些正在执行中的任务数据通过 strategy.bulk_requeue(jobs, @options)
重新写回队列,而最后对每一个 worker 发送 kill
消息:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58
def kill(wait=false)
@done = true
return if !@thread
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end
worker 在收到 kill
消息时,首先设置自己的 @done
为 true
,最后向 worker 所关联的线程抛出 ::Sidekiq::Shutdown
异常。让我们看看 worker 的线程又是如何处理异常的:
# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
又回到 worker 的 run
方法这里,可以看到,run
方法捕捉了 Sidekiq::Shutdown
异常,并且在处理异常时,只是执行 @mgr.processor_stopped(self)
,通知 manager 自己已经退出,由于已经跳出正常流程,worker 的 run
方法返回,线程也因此得以退出。至此,worker 也都正常退出了。
stop
指令,并且给出最后期限(deadline
);graceful
以及 hard
两种方式的退出处理。好了,今天就写到这吧!仍然挺长一篇,啰嗦了。感谢看到这里!