原文地址:http://blog.dengqinghua.net/sidekiq_task_event.html
Github: https://github.com/dengqinghua/roses/blob/master/source/sidekiq_task_event.md
我们在项目中大量使用到了Sidekiq 作为队列任务处理,但是 Sidekiq 无法获取到每一个任务的处理情况。
在系统中有一类问题的抽象为:
批量处理 n 个任务,每个任务都比较耗时,希望可以快速地处理,并且能知道每一个任务的执行结果情况。
基于这类问题,我们研发了基于 Sidekiq 的异步任务管理引擎。
阅读完该文档之后,您将了解到:
Sidekiq 基于 Redis 作为存储,一个例子如下:
Sidekiq Client 部分为队列数据的生产者,在 Sidekiq 源码中可以看到
module Sidekiq
class Client
def push(item)
normed = normalize_item(item)
payload = process_single(item['class'.freeze], normed)
if payload
raw_push([payload])
payload['jid'.freeze]
end
end
def atomic_push(conn, payloads)
q = payloads.first['queue'.freeze]
now = Time.now.to_f
to_push = payloads.map do |entry|
entry['enqueued_at'.freeze] = now
Sidekiq.dump_json(entry)
end
conn.sadd('queues'.freeze, q)
conn.lpush("queue:#{q}", to_push)
end
end
end
最终会在 Redis 中存储下面这些信息
并将这些信息通过lpush存储在 Redis 的队列中。
Sidekiq4.0 之前,使用的是Celluloid作为多线程的抽象层
模型如下:
源码分析请参考 Working With Ruby Threads-Chapter 15
在 4.0 版本之后,Sidekiq 出于性能考虑,使用原生的Thread
实现了一个简易的 Actor 版本模型。相关文章请见
模型如下:
核心的组件包括
Manager
Manager
根据用户设置的并发数,生成处理队列任务的 Processor
, 并对 idle 或者 dead 的 Processsor
进行管理,包括:
1. start: Spin up Processors.
2. processor_died: Handle job failure, throw away Processor, create new one.
3. quiet: shutdown idle Processors.
4. stop: hard stop the Processors by deadline.
初始化 Manager
class Manager
def initialize(options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
# @done代表是否结束处理任务
@done = false
@workers = Set.new
# 生成多个Processor, 每一个Processor对象在被调用start方法的时候, 会生成了一个线程
@count.times do
@workers << Processor.new(self)
end
# 添加一个锁, 用于修改 @workers 的数据, 管理Processor对象
@plock = Mutex.new
end
end
启动 Manager, 即调用Processor#start
class Manager
def start
@workers.each do |x|
x.start
end
end
end
Processor
Processor
是处理任务的类,包括下面的功能
1. fetches a job from Redis using brpop
2. executes the job
a. instantiate the Worker
b. run the middleware chain
c. call #perform
Processor#start
, 启动 Processor, 创建一个线程
class Processor
def start
# 生成一个线程, 并调用run方法
@thread ||= safe_thread("processor", &method(:run))
end
end
Processor#run
, 处理任务,去 Redis 获取队列数据
# @mgr 即为他对应的 Manager 对象
class Processor
def run
begin
while !@done
# 调用 perform 方法进行处理
process_one
end
# 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
# 在接收到TERM SIGNAL之后, 等待超时的时候sidekiq会抛出异常 Sidekiq::Shutdown, 见下文分析
# 线程被关闭.
@mgr.processor_stopped(self)
rescue Exception => ex
# 程序报错了, Manager#processor_died 会重新生成一个新的Processor线程
@mgr.processor_died(self, ex)
end
end
end
当我们更新代码后,需要重启Sidekiq
的进程。一般来说,我们会发送一个 TERM SIGNAL
指令给 Sidekiq 进程,它的执行步骤如下
停止 Fetch jobs.
class Manager
def quiet
return if @done
# 将 @done 设置为 true
@done = true
logger.info { "Terminating quiet workers" }
@workers.each { |x| x.terminate } # 这里的每一个 x 都是一个Processor对象
fire_event(:quiet, reverse: true)
end
end
class Processsor
def terminate(wait=false)
@done = true # 将每一个Processor 的 @done 设置为 true, 下面的run方法则不再fetch新的job
return if !@thread
@thread.value if wait
end
def run
begin
while !@done
process_one
end
# 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
等待Sidekiq.options[:timeout]
秒 (默认为 8 秒) 的时间,使得 Processor 去处理完当前未完成的 jobs
class Manager
def stop(deadline)
quiet
fire_event(:shutdown, reverse: true)
# some of the shutdown events can be async,
# we don't have any way to know when they're done but
# give them a little time to take effect
sleep PAUSE_TIME
return if @workers.empty?
logger.info { "Pausing to allow workers to finish..." }
remaining = deadline - Time.now
# 等待默认的8s后, 如果 @workers 为空, 则代表在规定时间内任务都处理完, 退出
while remaining > PAUSE_TIME
return if @workers.empty?
sleep PAUSE_TIME
remaining = deadline - Time.now
end
return if @workers.empty?
# 等待默认的8s后, 如果 @workers 不为空, 则进行强制shutdown
hard_shutdown
end
end
如果在等待时间之后,仍存在正在处理的 job, 则将 job 通过 rpush 命令推入 Redis, 强制使 processor 退出
class Manager
def hard_shutdown
# We've reached the timeout and we still have busy workers.
# They must die but their jobs shall live on.
cleanup = nil
@plock.synchronize do
cleanup = @workers.dup
end
if cleanup.size > 0
# 获取没有处理完的job
jobs = cleanup.map {|p| p.job }.compact
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
logger.warn { "Work still in progress #{jobs.inspect}" }
# Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
# 将未处理完的jobs推入队列的头部
strategy.bulk_requeue(jobs, @options)
end
# 强制kill掉线程
cleanup.each do |processor|
processor.kill
end
end
end
class Processor
def kill(wait=false)
@done = true
return if !@thread
# unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how
# long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown
# timeout passes.
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end
end
NOTE: 注意在接收到TERM SIGNAL
一些 job 有可能被重复执行。Sidekiq 的 FAQ 中有说明:Remember that Sidekiq will run your jobs AT LEAST once.
INFO: Sidekiq 还提供了 Scheduling Job 的功能,即到时执行任务,该部分使用了一个 SortedSet 的 redis 数据结构,排序的因子为任务的执行时间。在启动 Sidekiq 服务的时候,会启动了一个线程轮询所有执行时间小于等于当前时间的队列数据,将该部分的数据在 pop 至队列,再由 Processor 处理。
Sidekiq 在 client-side 和 server-side 都支持 AOP 操作,该部分和Rack的原理一致。
有了server-side middleware
的支持,我们可以
在sidekiq处理任务前后, 捕捉到任务的处理情况
如 Sidekiq 提供了 ActiveRecord
的 server-side middleware
module Sidekiq
module Middleware
module Server
class ActiveRecord
def initialize
# With Rails 5+ we must use the Reloader **always**.
# The reloader handles code loading and db connection management.
if defined?(::Rails) && ::Rails::VERSION::MAJOR >= 5
raise ArgumentError, "Rails 5 no longer needs or uses the ActiveRecord middleware."
end
end
def call(*args)
yield
ensure
::ActiveRecord::Base.clear_active_connections!
end
end
end
end
end
对于基于 Rails 的 Sidekiq 服务,Sidekiq 会确保在每次执行任务之后,都会清掉使用的连接,避免多线程占用过多的 Rails 数据库连接。
我们经常有一些这样的需求:
1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息
最开始我们都是通过串行的方式进行处理,比如
1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息
我们提供一个商品的 HTTP 接口,然后由 JS 发 Ajax 请求进行调用,但是该方式有一些问题:
但是对于数据量大的情况,串行调用变得非常慢,如
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息
我们考虑使用 Sidekiq 进行处理,即每一个任务都放在 Redis 里面。调用 perform_async 方法,获取到任务的 job_id
job_id = ProductWorker.perform_async(params)
但是新的问题出现了:我们无法获取到这个 job 的完成情况,如果逻辑上处理失败,也无法获取到对应的错误信息。
NOTE: Sidekiq-Pro 支持 batches 功能,但是它是收费的。
我们最终决定利用 Sidekiq 的 Middleware 特性,研发出一套异步任务管理引擎,它支持
任务处理引擎架构图
它包含三部分
NOTE: 步骤一的 job_id 由 Sidekiq 生成
我们将 Task 和 Event 都创建了对应的数据库表,则
class Task
has_many :events
end
class Event
validates_uniqueness_of :job_id
belongs_to :task
end
Task 的数据结构为
字段 | 释义 |
---|---|
worker_name | worker 的名称 |
id | 主键 id |
Event 的数据结构为
字段 | 释义 |
---|---|
job_id | 任务 id,全局唯一 |
status | 当前状态,包括enqueue ,working , finish , failed , error
|
params | 任务执行的所有参数 |
added_messages | 增量的信息,记录整个任务的流程 |
NOTE: 注意到 status 包含了 falied 和 error 两个不同的状态。其中 failed 代表为 业务逻辑上的失败,如一个卖家因为资质不合格导致无法报名,为了获取该状态,处理时可直接抛出异常 (NormalException), 状态为 failed. 而 error 代表为系统错误,如程序 bug 或者接口超时等
在这里我们配置了 use_task_event
, 如果需要使用该插件,需要在 worker 中配置 use_task_event: true
.
class AWorker
include Sidekiq::Worker
sidekiq_options use_task_event: true
def perform(options)
handle_job(options)
end
end
Server-Side Middleware 代码和注释如下:
module AsyncTask
class MiddlewareServer
def call(worker, item, queue)
if item['use_task_event'] # 配置入口
begin
job_id = item['jid']
Task.record(job_id, :working, message: "处理中")
yield
# 正常处理成功, 设置 status 为 finish
Task.record(job_id, :finish, message: "已经完成")
rescue SystemExit, Interrupt => ex
# 被中断, 设置 status 为 error
Task.record(job_id, :error, message: "被中断")
# 如果之后会被重试, 则重新再设置为 :enqueue
if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
(retry_status - 1 != retry_count.to_i)
Task.record(job_id, :enqueue, message: "等待重试")
end
raise ex
rescue NormalException => ex
# 业务逻辑上的失败, 设置 status 为 failed, 错误信息放在 message 中
Task.record(job_id, :failed, message: "发生错误: #{ex.message}")
rescue Exception => ex
# 程序bug, 设置 status 为 error
Task.record(job_id, :error, message: "发生致命错误: #{ex.message}")
# 如果之后会被重试, 则重新再设置为 :enqueue
if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
(retry_status - 1 != retry_count.to_i)
Task.record(job_id, :enqueue, message: "等待重试")
end
raise ex
end
else
yield
end
end
end
end
在项目启动时加载该 Middleware
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add AsyncTask::MiddlewareServer
end
end
回顾在文章开始时提到的需求
1. 给卖家批量报名, 一次可以报名200个商品, 进行活动, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息
对于需求 1, 2, 都可以用相同的处理方式,流程如下:
对于需求 3, 我们可以将 50 万信息分为不同的 worker 来处理,并用统一的 task_id 进行关联,也将大大提高导出的效率。