不同的延迟任务,一样的 API.
在 Delayed Job、Resque、Sidekiq 等延迟任务之间切换,还要改代码? 以后就不必了...
虽然这几个延迟任务 gem 使用上类似,但语法上多少有一点不同。 新的 ActiveJob 组件统一了接口,使用和切换都会变得更容易。
默认使用的 queue_adapter 是 :inline,你可以根据需要自己设置 queue_adapter.
已经支持 Delayed Job、Resque、Sidekiq 等常用延迟任务 gem.
# 默认 queue adapter
ActiveJob::Base.queue_adapter = :inline
# 或
Rails.application.config.active_job.queue_adapter = :test
# 所有可用 adapter: :backburner, :delayed_job, :qu, :que,
# :queue_classic, :resque, :sidekiq, :sneakers, :sucker_punch, :inline, :test
默认使用的 queue_name 是 "default"
可以定制:
class MyJob < ActiveJob::Base
queue_as :my_jobs
# ...
end
通过 config.active_job.queue_name_prefix=
可给所有队列名加前缀。
# 实例方法
serialize
# 类方法
set # 常用
deserialize
使用举例:
# Enqueue a job to be performed as soon the queueing system is free.
MyJob.perform_later record
# Enqueue a job to be performed tomorrow at noon.
MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record)
# Enqueue a job to be performed 1 week from now.
MyJob.set(wait: 1.week).perform_later(record)
常用方法:
enqueue
使用举例:
my_job_instance.enqueue
# 目前,只接受以下 3 种参数
my_job_instance.enqueue wait: 5.minutes
my_job_instance.enqueue queue: :important
my_job_instance.enqueue wait_until: Date.tomorrow.midnight
执行任务失败,还可以:
retry_job
使用举例:
class SiteScrapperJob < ActiveJob::Base
rescue_from(ErrorLoadingSite) do
retry_job queue: :low_priority
end
def perform(*args)
# raise ErrorLoadingSite if cannot scrape
end
end
除上述两实例方法外,还有类方法:
perform_later
# 实例方法
perform, perform_now
# 类方法
perform_now # 简单封装了实例方法 perform_now
使用举例:
MyJob.new(*args).perform_now
MyJob.perform_now("mike")
比某些延迟 gem 多做了一点点,除了队列&执行本身外,还可以有回调:
before_enqueue
around_enqueue
after_enqueue
before_perform
around_perform
after_perform
使用举例:
class VideoProcessJob < ActiveJob::Base
queue_as :default
after_perform do |job|
UserMailer.notify_video_processed(job.arguments.first)
end
def perform(video_id)
Video.find(video_id).process
end
end
其它几个方法类似。
实现上,都是直接封装 set_callback
有利必有弊,可能面临以下问题:
原 gem 本身的特点没能充分利用,灵活性降低,和其它 gem 的集成会变复杂。
---------------------------------------------------------------- 分隔线 ----------------------------------------------------------------
其它多个类或模块,统一在此列举。
接受的参数类型很广泛,需要先处理一下。
进队列时参数需要 serialize, 执行前参数需要 deserialize
当然,这都是自动完成的。
一般入队列 (enqueue_in、enqueue_at 和 enqueue) 只传能够标识对象的那部分参数 (如:class、id),出队列/执行的时候再根据这些参数获取对象。
但因为 serialize_argument 支持的类型有多种,其中就包括 GlobalID::Identification. 所以我们可以传递一个"活的对象"进队列,而不只是它的一部分 (如:class、id).
使用 Global ID 前后对比:
class TrashableCleanupJob
def perform(trashable_class, trashable_id, depth)
# 出队列/执行的时候需要根据 trashable_class 和 trashable_id 查询相应 trashable
trashable = trashable_class.constantize.find(trashable_id)
trashable.cleanup(depth)
end
end
class TrashableCleanupJob
def perform(trashable, depth)
# 出队列/执行的时候直接使用 trashable
trashable.cleanup(depth)
end
end
Note: 不规范的写法里,也可以直接传递对象。
设置 logger 和配置 (如:默认 queue_adapter)
原来,不同的延迟任务 gem 有各自不同的 self.perform、perform、run、work,现在:
都有同名的 self.enqueue 和 self.enqueue_at
around_enqueue、around_perform 和 before_enqueue 有日志记录
enqueue、enqueue_at、perform_start、perform 等过程也有日志记录
每个任务都有全局唯一的 job_id
配置实例,对应着 Core 的 set 类方法。
queue_adapter 是 Delayed Job、Resque、Sidekiq 等不同的延迟任务抽象而来。
而 queue_adapter 所用的 API(enqueue_at、enqueue_in、enqueue 等),也是从原延迟任务所提供的 API 抽象而来。
rails generate job NAME [options]
使用 ActiveSupport 的异常捕获方法 rescue_from
class GuestsCleanupJob < ActiveJob::Base
queue_as :default
# 异常捕获
rescue_from(ActiveRecord::RecordNotFound) do |exception|
# 异常处理
end
def perform
# ...
end
end