Rails Rails 4.2 新增后端任务框架 - Active Job

leekelby · 2014年08月18日 · 最后由 xiao__liang 回复于 2016年05月10日 · 14371 次阅读
本帖已被管理员设置为精华贴

不同的延迟任务,一样的 API.

在 Delayed Job、Resque、Sidekiq 等延迟任务之间切换,还要改代码? 以后就不必了...

虽然这几个延迟任务 gem 使用上类似,但语法上多少有一点不同。 新的 ActiveJob 组件统一了接口,使用和切换都会变得更容易。

Queue Adapter

默认使用的 queue_adapter 是 :inline,你可以根据需要自己设置 queue_adapter.

已经支持 Delayed Job、Resque、Sidekiq 等常用延迟任务 gem.

# 默认 queue adapter
ActiveJob::Base.queue_adapter = :inline
# 或
Rails.application.config.active_job.queue_adapter = :test

# 所有可用 adapter: :backburner, :delayed_job, :qu, :que, 
# :queue_classic, :resque, :sidekiq, :sneakers, :sucker_punch, :inline, :test

Queue Name

默认使用的 queue_name 是 "default"

可以定制:

class MyJob < ActiveJob::Base
  queue_as :my_jobs

  # ...
end

通过 config.active_job.queue_name_prefix= 可给所有队列名加前缀。

Core

# 实例方法
serialize

# 类方法
set # 常用
deserialize

使用举例:

# Enqueue a job to be performed as soon the queueing system is free.
MyJob.perform_later record

# Enqueue a job to be performed tomorrow at noon.
MyJob.set(wait_until: Date.tomorrow.noon).perform_later(record)

# Enqueue a job to be performed 1 week from now.
MyJob.set(wait: 1.week).perform_later(record)

Enqueuing 入队与重试

常用方法:

enqueue

使用举例:

my_job_instance.enqueue

# 目前,只接受以下 3 种参数
my_job_instance.enqueue wait: 5.minutes
my_job_instance.enqueue queue: :important
my_job_instance.enqueue wait_until: Date.tomorrow.midnight

执行任务失败,还可以:

retry_job

使用举例:

class SiteScrapperJob < ActiveJob::Base
  rescue_from(ErrorLoadingSite) do
    retry_job queue: :low_priority
  end

  def perform(*args)
    # raise ErrorLoadingSite if cannot scrape
  end
end

除上述两实例方法外,还有类方法:

perform_later

Execution 执行

# 实例方法
perform, perform_now

# 类方法
perform_now # 简单封装了实例方法 perform_now

使用举例:

MyJob.new(*args).perform_now

MyJob.perform_now("mike")

Callbacks 回调

比某些延迟 gem 多做了一点点,除了队列&执行本身外,还可以有回调:

before_enqueue
around_enqueue
after_enqueue

before_perform
around_perform
after_perform

使用举例:

class VideoProcessJob < ActiveJob::Base
  queue_as :default

  after_perform do |job|
    UserMailer.notify_video_processed(job.arguments.first)
  end

  def perform(video_id)
    Video.find(video_id).process
  end
end

其它几个方法类似。

实现上,都是直接封装 set_callback

提示

有利必有弊,可能面临以下问题:
原 gem 本身的特点没能充分利用,灵活性降低,和其它 gem 的集成会变复杂。

---------------------------------------------------------------- 分隔线 ----------------------------------------------------------------

其它多个类或模块,统一在此列举。

Arguments 参数处理

接受的参数类型很广泛,需要先处理一下。

进队列时参数需要 serialize, 执行前参数需要 deserialize

当然,这都是自动完成的。

参数支持 Global ID

一般入队列 (enqueue_in、enqueue_at 和 enqueue) 只传能够标识对象的那部分参数 (如:class、id),出队列/执行的时候再根据这些参数获取对象。

但因为 serialize_argument 支持的类型有多种,其中就包括 GlobalID::Identification. 所以我们可以传递一个"活的对象"进队列,而不只是它的一部分 (如:class、id).

使用 Global ID 前后对比:

class TrashableCleanupJob
  def perform(trashable_class, trashable_id, depth)
    # 出队列/执行的时候需要根据 trashable_class 和 trashable_id 查询相应 trashable
    trashable = trashable_class.constantize.find(trashable_id)
    trashable.cleanup(depth)
  end
end

class TrashableCleanupJob
  def perform(trashable, depth)
    # 出队列/执行的时候直接使用 trashable
    trashable.cleanup(depth)
  end
end

Note: 不规范的写法里,也可以直接传递对象。

Railtie

设置 logger 和配置 (如:默认 queue_adapter)

Queue Adapters

原来,不同的延迟任务 gem 有各自不同的 self.perform、perform、run、work,现在:

都有同名的 self.enqueue 和 self.enqueue_at

Logging

around_enqueue、around_perform 和 before_enqueue 有日志记录

enqueue、enqueue_at、perform_start、perform 等过程也有日志记录

Identifier

每个任务都有全局唯一的 job_id

Configured Job

配置实例,对应着 Core 的 set 类方法。

解析 queue_adapter 及其 API

queue_adapter 是 Delayed Job、Resque、Sidekiq 等不同的延迟任务抽象而来。

而 queue_adapter 所用的 API(enqueue_at、enqueue_in、enqueue 等),也是从原延迟任务所提供的 API 抽象而来。

命令行快捷生成

rails generate job NAME [options]

异常捕获与处理

使用 ActiveSupport 的异常捕获方法 rescue_from

class GuestsCleanupJob < ActiveJob::Base
  queue_as :default

  # 异常捕获
  rescue_from(ActiveRecord::RecordNotFound) do |exception|
   # 异常处理
  end

  def perform
    # ...
  end
end

这个赞!

前几天看到 DHH 的 pull request, 马上要合并到 rails 了!!!

记得最早 4.0 的时候就有这个计划,终于要来了。

是看到已经在做集成的工作了

这个好久发布呢?现在是 4.1.5

@huacnlee 内容排版更好了

好东西啊,计划任务经常用到的

还在犹豫要用Sidekiq还是Resque 呢?

#8 楼 @debugger Preparing for 4.1.5 release,很快就发布了。

#13 楼 @Justin sidekiq 啊,多好的项目

#13 楼 @Justin 我两个都用过,感觉 sidekiq 写起来比较简单,不用像 resque 一样非要建立 worker 和写 perform。直接 classname.delay.methodname 就行了。。。

不过感觉 sidekiq 不太稳定,不知道是内存的原因还是什么,经常就自己关掉了。。妈的,到现在也没找到原因。

#17 楼 @wudixiaotie 我这边也有一个 sidekiq 进程在部署完之后老是不见了,因为我的并发数设置成了 1,其他几个 sidekiq 进程就没问题,你的 sidekiq 版本 是多少?

数据放到哪?是 redis 吗?

22 楼 已删除

#19 楼 @hz_qiuyuanxin 3.1.4 应该是最近的版本,因为是新项目。不过以下是我的设置,您看可以给个建议到底是哪里的毛病么?:

:concurrency: 3
:max_threads: 1
:logfile: log/sidekiq.log
:pidfile: tmp/pids/sidekiq.pid
staging:
  :concurrency: 3
production:
  :concurrency: 3
  :max_threads: 1

运行 sidekiq 的命令是:nohup bundle exec sidekiq -d -e production -l log/sidekiq.log &

#19 楼 @hz_qiuyuanxin 我的并发是 3 阿,也会经常不见了,倒是 max_threads 是 1 这个有影响么

#24 楼 @wudixiaotie 有的,你试试调高后看看怎么样。还有 AR 的连接数记得设置成 concurrency + 2

Sidekiq.configure_server do |config|
  config.redis = {
    url: "redis://localhost:6379/1",
    namespace: "app_queue"
  }

  # make new config for sidekiq AR connections
  new_config = Rails.application.config.database_configuration[Rails.env]
  new_config['pool'] = config.options[:concurrency] + 2

  ActiveRecord::Base.establish_connection new_config
end

赞一个

赞! @liyijie 可以考虑升级用啊

#16 楼 @wudixiaotie delay 方式将方法异步化有坑...他内部实现是个代理,把类和参数信息序列化成 YAML,然后内置的 Worker 反序列化之,入队量太大的时候 YAML 那个 lib 反序列化会出现一些很奇怪的问题,比如只反序列化出一个空类,导致之后出现 undefined method 异常... 而且 YAML 库是 C 实现,debug 很难 另外是 YAML 序列化,如果参数信息过于复杂或者太大,也是应付不了的... 综上,这个功能几乎不用...

统一 API 是好事,虽然项目内更换 Gem 的概率很小,但是统一 API 还是有助于开发风格统一和减少学习成本的...

这个不是 4.1 就有吗?

赞一个!!!

#17 楼 @wudixiaotie 会不会是你还在用 Rails 3 呢?Sidekiq 3 只跟 Rails 4 兼容

部分 API 已改,请修改文章内容……

#35 楼 @chunlea 好吧,已修改。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号