Ruby 基于 Sidekiq 的异步任务管理引擎设计

dengqinghua · May 28, 2018 · Last by superjagger replied at January 08, 2019 · 7233 hits
Topic has been selected as the excellent topic by the admin.

基于 Sidekiq 的异步任务管理引擎

原文地址:http://blog.dengqinghua.net/sidekiq_task_event.html

Github: https://github.com/dengqinghua/roses/blob/master/source/sidekiq_task_event.md

我们在项目中大量使用到了Sidekiq 作为队列任务处理,但是 Sidekiq 无法获取到每一个任务的处理情况。

在系统中有一类问题的抽象为:

批量处理 n 个任务,每个任务都比较耗时,希望可以快速地处理,并且能知道每一个任务的执行结果情况。

基于这类问题,我们研发了基于 Sidekiq 的异步任务管理引擎。

阅读完该文档之后,您将了解到:

  • Sidekiq 基本框架源码分析。
  • Sidekiq Middleware.
  • 异步任务管理引擎设计。

Sidekiq 基本框架

Sidekiq 基于 Redis 作为存储,一个例子如下:

sidekiq_exmaple

Sidekiq Client

Sidekiq Client 部分为队列数据的生产者,在 Sidekiq 源码中可以看到

module Sidekiq
  class Client
    def push(item)
      normed = normalize_item(item)
      payload = process_single(item['class'.freeze], normed)

      if payload
        raw_push([payload])
        payload['jid'.freeze]
      end
    end

    def atomic_push(conn, payloads)
      q = payloads.first['queue'.freeze]
      now = Time.now.to_f
      to_push = payloads.map do |entry|
        entry['enqueued_at'.freeze] = now
        Sidekiq.dump_json(entry)
      end
      conn.sadd('queues'.freeze, q)
      conn.lpush("queue:#{q}", to_push)
    end
  end
end

最终会在 Redis 中存储下面这些信息

  • retry 重试次数
  • queue 队列名称
  • backtrace 错误栈
  • class 处理类名称
  • args 参数
  • jid job_id
  • enqueued_at 进入队列的时间

并将这些信息通过lpush存储在 Redis 的队列中。

Sidekiq Server

Before 4.0

Sidekiq4.0 之前,使用的是Celluloid作为多线程的抽象层

模型如下:

sidekiq_actor_architecture

源码分析请参考 Working With Ruby Threads-Chapter 15

After 4.0

在 4.0 版本之后,Sidekiq 出于性能考虑,使用原生的Thread实现了一个简易的 Actor 版本模型。相关文章请见

模型如下:

sidekiq_new_framework

核心的组件包括

  1. Manager

    Manager 根据用户设置的并发数,生成处理队列任务的 Processor, 并对 idle 或者 dead 的 Processsor 进行管理,包括:

    1. start: Spin up Processors.
    2. processor_died: Handle job failure, throw away Processor, create new one.
    3. quiet: shutdown idle Processors.
    4. stop: hard stop the Processors by deadline.
    

    初始化 Manager

    class Manager
      def initialize(options={})
        logger.debug { options.inspect }
        @options = options
        @count = options[:concurrency] || 25
        raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
    
        # @done代表是否结束处理任务
        @done = false
        @workers = Set.new
    
        # 生成多个Processor, 每一个Processor对象在被调用start方法的时候, 会生成了一个线程
        @count.times do
          @workers << Processor.new(self)
        end
    
        # 添加一个锁, 用于修改 @workers 的数据, 管理Processor对象
        @plock = Mutex.new
      end
    end
    

    启动 Manager, 即调用Processor#start

    class Manager
      def start
        @workers.each do |x|
          x.start
        end
      end
    end
    
  2. Processor

    Processor 是处理任务的类,包括下面的功能

    1. fetches a job from Redis using brpop
    2. executes the job
      a. instantiate the Worker
      b. run the middleware chain
      c. call #perform
    

    Processor#start, 启动 Processor, 创建一个线程

    class Processor
      def start
        # 生成一个线程, 并调用run方法
        @thread ||= safe_thread("processor", &method(:run))
      end
    end
    

    Processor#run, 处理任务,去 Redis 获取队列数据

    # @mgr 即为他对应的 Manager 对象
    class Processor
      def run
        begin
          while !@done
            # 调用 perform 方法进行处理
            process_one
          end
    
          # 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
          @mgr.processor_stopped(self)
        rescue Sidekiq::Shutdown
          # 在接收到TERM SIGNAL之后, 等待超时的时候sidekiq会抛出异常 Sidekiq::Shutdown, 见下文分析
          # 线程被关闭.
          @mgr.processor_stopped(self)
        rescue Exception => ex
          # 程序报错了, Manager#processor_died 会重新生成一个新的Processor线程
          @mgr.processor_died(self, ex)
        end
      end
    end
    

队列重启时 job 的处理

当我们更新代码后,需要重启Sidekiq的进程。一般来说,我们会发送一个 TERM SIGNAL 指令给 Sidekiq 进程,它的执行步骤如下

  1. 停止 Fetch jobs.

    class Manager
      def quiet
        return if @done
        # 将 @done 设置为 true
        @done = true
    
        logger.info { "Terminating quiet workers" }
        @workers.each { |x| x.terminate } # 这里的每一个 x 都是一个Processor对象
        fire_event(:quiet, reverse: true)
      end
    end
    
    class Processsor
      def terminate(wait=false)
        @done = true # 将每一个Processor 的 @done 设置为 true, 下面的run方法则不再fetch新的job
        return if !@thread
        @thread.value if wait
      end
    
      def run
        begin
          while !@done
            process_one
          end
          # 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
          @mgr.processor_stopped(self)
        rescue Sidekiq::Shutdown
          @mgr.processor_stopped(self)
        rescue Exception => ex
          @mgr.processor_died(self, ex)
      end
    end
    
  2. 等待Sidekiq.options[:timeout]秒 (默认为 8 秒) 的时间,使得 Processor 去处理完当前未完成的 jobs

    class Manager
      def stop(deadline)
        quiet
        fire_event(:shutdown, reverse: true)
    
        # some of the shutdown events can be async,
        # we don't have any way to know when they're done but
        # give them a little time to take effect
        sleep PAUSE_TIME
        return if @workers.empty?
    
        logger.info { "Pausing to allow workers to finish..." }
        remaining = deadline - Time.now
    
        # 等待默认的8s后, 如果 @workers 为空, 则代表在规定时间内任务都处理完, 退出
        while remaining > PAUSE_TIME
          return if @workers.empty?
          sleep PAUSE_TIME
          remaining = deadline - Time.now
        end
        return if @workers.empty?
    
        # 等待默认的8s后, 如果 @workers 不为空, 则进行强制shutdown
        hard_shutdown
      end
    end
    
  3. 如果在等待时间之后,仍存在正在处理的 job, 则将 job 通过 rpush 命令推入 Redis, 强制使 processor 退出

    class Manager
      def hard_shutdown
        # We've reached the timeout and we still have busy workers.
        # They must die but their jobs shall live on.
        cleanup = nil
        @plock.synchronize do
          cleanup = @workers.dup
        end
    
        if cleanup.size > 0
          # 获取没有处理完的job
          jobs = cleanup.map {|p| p.job }.compact
    
          logger.warn { "Terminating #{cleanup.size} busy worker threads" }
          logger.warn { "Work still in progress #{jobs.inspect}" }
    
          # Re-enqueue unfinished jobs
          # NOTE: You may notice that we may push a job back to redis before
          # the worker thread is terminated. This is ok because Sidekiq's
          # contract says that jobs are run AT LEAST once. Process termination
          # is delayed until we're certain the jobs are back in Redis because
          # it is worse to lose a job than to run it twice.
          strategy = (@options[:fetch] || Sidekiq::BasicFetch)
    
          # 将未处理完的jobs推入队列的头部
          strategy.bulk_requeue(jobs, @options)
        end
    
        # 强制kill掉线程
        cleanup.each do |processor|
          processor.kill
        end
      end
    end
    
    class Processor
      def kill(wait=false)
        @done = true
        return if !@thread
        # unlike the other actors, terminate does not wait
        # for the thread to finish because we don't know how
        # long the job will take to finish.  Instead we
        # provide a `kill` method to call after the shutdown
        # timeout passes.
        @thread.raise ::Sidekiq::Shutdown
        @thread.value if wait
      end
    end
    

NOTE: 注意在接收到TERM SIGNAL一些 job 有可能被重复执行。Sidekiq 的 FAQ 中有说明:Remember that Sidekiq will run your jobs AT LEAST once.

INFO: Sidekiq 还提供了 Scheduling Job 的功能,即到时执行任务,该部分使用了一个 SortedSet 的 redis 数据结构,排序的因子为任务的执行时间。在启动 Sidekiq 服务的时候,会启动了一个线程轮询所有执行时间小于等于当前时间的队列数据,将该部分的数据在 pop 至队列,再由 Processor 处理。

Sidekiq Middleware

Sidekiq 在 client-side 和 server-side 都支持 AOP 操作,该部分和Rack的原理一致。

有了server-side middleware的支持,我们可以

在sidekiq处理任务前后, 捕捉到任务的处理情况

如 Sidekiq 提供了 ActiveRecordserver-side middleware

module Sidekiq
  module Middleware
    module Server
      class ActiveRecord
        def initialize
          # With Rails 5+ we must use the Reloader **always**.
          # The reloader handles code loading and db connection management.
          if defined?(::Rails) && ::Rails::VERSION::MAJOR >= 5
            raise ArgumentError, "Rails 5 no longer needs or uses the ActiveRecord middleware."
          end
        end

        def call(*args)
          yield
        ensure
          ::ActiveRecord::Base.clear_active_connections!
        end
      end
    end
  end
end

对于基于 Rails 的 Sidekiq 服务,Sidekiq 会确保在每次执行任务之后,都会清掉使用的连接,避免多线程占用过多的 Rails 数据库连接。

AsyncTask

需求分析

我们经常有一些这样的需求:

1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

最开始我们都是通过串行的方式进行处理,比如

1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息

我们提供一个商品的 HTTP 接口,然后由 JS 发 Ajax 请求进行调用,但是该方式有一些问题:

  • 数据容易丢失
  • 一些接口请求很慢,容易造成超时
  • JS 交互复杂,大量的逻辑都放在了前端,出问题不好排查

但是对于数据量大的情况,串行调用变得非常慢,如

2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

我们考虑使用 Sidekiq 进行处理,即每一个任务都放在 Redis 里面。调用 perform_async 方法,获取到任务的 job_id

job_id = ProductWorker.perform_async(params)

但是新的问题出现了:我们无法获取到这个 job 的完成情况,如果逻辑上处理失败,也无法获取到对应的错误信息。

NOTE: Sidekiq-Pro 支持 batches 功能,但是它是收费的。

我们最终决定利用 Sidekiq 的 Middleware 特性,研发出一套异步任务管理引擎,它支持

  • 任务的聚合管理。一个 task 和多个 job 进行关联
  • 可以获得 job 的执行状态
  • 所有执行过程可视化

AsyncTask

任务处理引擎架构图

async_task

它包含三部分

  1. 创建 Task, 生成 task_id, 将每一个任务都推入 Redis, 并获取到对应的 job_id
  2. 生成 Event 记录,该 Event 和 job_id 一一对应,记录了整个 job 的生命周期
  3. 利用 Server-Side Middleware, 记录 Event 的状态和相关信息

NOTE: 步骤一的 job_id 由 Sidekiq 生成

Task 和 Event 创建

我们将 Task 和 Event 都创建了对应的数据库表,则

class Task
  has_many :events
end

class Event
  validates_uniqueness_of :job_id
  belongs_to :task
end

Task 的数据结构为

字段 释义
worker_name worker 的名称
id 主键 id

Event 的数据结构为

字段 释义
job_id 任务 id,全局唯一
status 当前状态,包括enqueue,working, finish, failed, error
params 任务执行的所有参数
added_messages 增量的信息,记录整个任务的流程

NOTE: 注意到 status 包含了 falied 和 error 两个不同的状态。其中 failed 代表为 业务逻辑上的失败,如一个卖家因为资质不合格导致无法报名,为了获取该状态,处理时可直接抛出异常 (NormalException), 状态为 failed. 而 error 代表为系统错误,如程序 bug 或者接口超时等

Server-Side Middleware

在这里我们配置了 use_task_event, 如果需要使用该插件,需要在 worker 中配置 use_task_event: true.

class AWorker
  include Sidekiq::Worker

  sidekiq_options use_task_event: true

  def perform(options)
    handle_job(options)
  end
end

Server-Side Middleware 代码和注释如下:

module AsyncTask
  class MiddlewareServer
    def call(worker, item, queue)
      if item['use_task_event'] # 配置入口
        begin
          job_id = item['jid']
          Task.record(job_id, :working, message: "处理中")

          yield

          # 正常处理成功, 设置 status 为 finish
          Task.record(job_id, :finish, message: "已经完成")
        rescue SystemExit, Interrupt => ex
          # 被中断, 设置 status 为 error
          Task.record(job_id, :error, message: "被中断")

          # 如果之后会被重试, 则重新再设置为 :enqueue
          if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
            (retry_status - 1 != retry_count.to_i)

            Task.record(job_id, :enqueue, message: "等待重试")
          end

          raise ex
        rescue NormalException => ex
          # 业务逻辑上的失败, 设置 status 为 failed, 错误信息放在 message 中
          Task.record(job_id, :failed, message: "发生错误: #{ex.message}")
        rescue Exception => ex
          # 程序bug, 设置 status 为 error
          Task.record(job_id, :error, message: "发生致命错误: #{ex.message}")

          # 如果之后会被重试, 则重新再设置为 :enqueue
          if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
            (retry_status - 1 != retry_count.to_i)

            Task.record(job_id, :enqueue, message: "等待重试")
          end

          raise ex
        end
      else
        yield
      end
    end
  end
end

在项目启动时加载该 Middleware

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add AsyncTask::MiddlewareServer
  end
end

问题剖析

回顾在文章开始时提到的需求

1. 给卖家批量报名, 一次可以报名200个商品, 进行活动, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

对于需求 1, 2, 都可以用相同的处理方式,流程如下:

  1. 前端一次将所有的数据全部提到给后端。
  2. 后端根据数据量拆分为 n 个 jobs, 并生成一个 task_id, 返回给前端。
  3. 前端每隔一段时间,调用后端的接口来询问 task_id 对于的 job 的状态,如果出错,则一同返回错误信息。

对于需求 3, 我们可以将 50 万信息分为不同的 worker 来处理,并用统一的 task_id 进行关联,也将大大提高导出的效率。

sidekiq 自带 web 页面的。sidekiq 也有 api 可以拿到 job 情况的。

可以拿到 job 情况。但是在 sidekiq 的存储结构中,并没有将 job_id 当做 key 来存储

通过 sidekiq 的 api 获取 job 是非常低效的,在 API 文档 中提到

Find a job by JID (WARNING: this is very inefficient if your queue is big!)

另外,我们需要获取到 job 的整个状态,包括 进入队列,处理中,成功,业务校验的失败,系统的失败 等,这是 sidekiq 的 api 不具备的。

sidekiq-pro 提供了类似 batches 功能,但是我们因为一些原因没有使用。

对 sidekiq 的研究还是比较深入,这一点不可否认,点 N 个赞。

过一年再来回头来看今天你们这个设计,可能会发现其实还可以走另外的设计方式。具体怎么设计,我并没有细看,所以不发表意见。

Reply to zfjoy520 #2

嗯,可以贴一下链接或者设计思路吗?我们也都学习一下。

这些其实都是造轮子,我们的痛点是一些资源申请不下来。

容我先细细看下你们的思路先^_^

1、sidekiq 平民版支持一次批量插入成千上万的 job,可以用 Sidekiq::Client.push_bulk(items) 这个方法来生成待执行的任务;源码和注释见: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L135 https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L92

这个方法同样也返回了所有生成 job 的 jid,效率如何需要自测。

2、关于失败 job 的提示信息记录或者是执行过程全记录,可用第 1 步生成的 jid 数组来自由发挥,选择适合自身项目的一种即可。 方法有很多种,比如照原来的方式存 mysql,步步更新。

3、对于需求 3. 批量导出 50 万大促信息,ruby 本身即有一个对大数据的分批方法:each_slice,文档见: http://ruby-doc.org/core-2.5.1/Enumerable.html#method-i-each_slice

可将这 50 万,按 10_000 或者更多分为一批,做一些简单拼凑然后导出,降低数据库或其他数据源的查询压力,最后将结果拼装成一个 excel 或者是你们想要的其他的数据导出格式即可。

临时想到的一些平时用过的方式,也不知道是否符合你们项目的胃口,有说得不恰当的地方请不要笑话。

Reply to zfjoy520 #5

我的理解如下:

  1. 消息的生成和消费,是在不同的经常里面的,如果需要用 job_id, 则还是需要将 job_id 存储起来
  2. Sidekiq 的 API 通过 job_id 寻找这个 job 是非常低效的,这在前文已经说过
  3. each_slice 本身还是在一个进程里面,无法使用 多核 来进行操作。虽然在一个进程内的 Sidekiq 多线程处理,是无法使用多核的 (这里任务是耗 CPU 的), 另外,如果是用 each_slice 处理,真的是很慢,要是中途不小心报错,整个任务就失败了。
  4. job_id 生成是有成本的。
def push_bulk(items)
   ...

  raw_push(payloads) if !payloads.empty?

  # 如果有十万个任务, 这里需要循环十万次得到jid
  payloads.collect { |payload| payload['jid'] }
end

我们部署 Sidekiq 多个进程,就有一定概率使用到多核。

50 万数据我们是分成多个 sidekiq job 处理,并且有一个 task_id 跟这些 job 关联。这样可以用到了多核,也能观察到各个任务进行的进度,并且可以重试.

当然单独针对这个业务逻辑,也可以设计单独的表结构来存储任务。

Sidekiq 的 API 通过 job_id 寻找这个 job 是非常低效的,这在前文已经说过

我不是很明白,为什么会有通过一个 jid 去寻找 job 的需求

each_slice 多核问题

是不是可以考虑将 50 万数据,each_slice 成 50 份或者是 100 份,enqueue 至 sidekiq,然后利用多进程 sidekiq 来跑多核,我并没有要将 50 万数据扔一个 job 里面跑的意思。我指是的你可以用 each_slice 来将数据分片,然后批量创建 job。

job_id 生成是有成本的

跟第 3 一样,既然明知是一个很耗时的方法,我们何必非要让 push_bulk(items) 一次跑 10 万个,我们将数据切片打散成多个 job 也是可以的。

Reply to zfjoy520 #7

因为我们需要一个 job 的状态,以及业务失败时,这个 job 的错误信息。

如下面的信息:

{ status: failed, message: "发生错误: 您的资质不满足报名条件" }

我们是通过 raise NormalException 来中断任务,并将信息存储在异常中的。

所以说,我们需要通过 job_id, 来找到这个 job, 访问这个 job 的状态 (status), 并拿到业务逻辑的错误信息 (message)

这儿其实跟 Erlang 的 Let It Crash 有点儿像,但是使用上会稍微简单一点。

另外,所有跟 job 相关的参数,状态信息也都会入到数据库。因为 Sidekiq 的 任务是使用 Redis#brpop 命令的,消费了之后,消息就没有了,就找不到这个 job 了。

huacnlee mark as excellent topic. 29 May 10:11
Reply to zfjoy520 #7

each_slice 这块我理解得不对,我们在实际业务的做法跟您所说的是一致的。

喔,懂了,可能设计思路不太相同。

其实你已经有了数据库的持久化形式了,在 job 的运行过程中,也对整个流程做了步进日志的记录,你可以不用再去访问 job 的状态,前端展示只需要轮询持久化之后的结果就可以了。

是的,我们是这样做的。

job_id 在执行

job_id = AWorker.perform_async(params)

的时候,已经生成了。但是需要利用 Sidekiq 的 Middleware, 去更新这个 job 的执行情况。

对于前端,只需要轮询状态就可以了。

请问 Sidekiq 在使用 redis 的时候,有场景用到 RPOPLPUSH 了木有?还是 Pro 版才用到了

Reply to pathbox #13

Pro 版本使用到了,用来做高可用,保证数据不丢的。

Sidekiq Pro offers an alternative fetch strategy, super_fetch, for job processing using Redis' RPOPLPUSH command which keeps jobs in Redis.

wiki: 这里

你单独做了一套任务状态跟踪,我认为这一块可以独立出来的不用和 sidekiq 这个东西耦合。

是的,您说得对。

但是这一块是专门为 sidekiq 定制的,数据结构设计还包括 sidekiq_class 等,这些其实也可以用在 kafka 中。

是的。Pro 版的代码应该没有开源吧,要想用 RPOPLPUSH 保证更高可用,得自己考虑改下源码。。。。。。

Reply to pathbox #17

嗯,是的。

这个文档解决的问题,是希望耗时的任务 高性能并且状态可跟踪, 跟高可用关系不大,所以没有考虑自己实现 RPOPLPUSH

你的需求不用 hack 也能实现。 需求无非是下面几点

  1. 导入需要高性能
  2. 导出需要高性能
  3. 需要有跟踪报错功能

我的做法是把任务分拆,利用 sidekiq 多个 worker 同时执行任务,将结果合并即可。

实现 1 导出可以利用 find_in_batches, find_each 实现 2 导入可以利用 CSV.foreach 实现 3 跟踪报错可以用一个 model 记录 Job 执行过程中产生的数据和日志即可

# QueryThread用来把一个大量数据的查询拆分成多个小任务
# a = Concurrent::Array.new
# QueryThread.split(Subscription.where("status = ?", "canceled").where("id < 1000"), 5) do |subquery, idx, min_id, max_id|
#   subquery.find_each do |sub|
#     a << [idx, sub.id, min_id, max_id]
#   end
# end
#
class QueryThread
  def initialize(query, min_id, max_id, index, &block)
    @thread = Thread.new do
      subquery = query.where("#{query.table_name}.id >= ? and #{query.table_name}.id <= ?", min_id, max_id)
      block.call(subquery, index, min_id, max_id)
    end
  end

  def join
    @thread.join
  end

  class << self

    def split(query, thread_count, &block)
      arguments_groups = split_to_array(query, thread_count)

      query_threads = []
      arguments_groups.each do |argument|
        query_threads << QueryThread.new(query, argument[:min_id], argument[:max_id], argument[:index], &block)
      end
      query_threads.each(&:join)
    end

   #根据查询语句的最大id和最小id分组
    def split_to_array(query, worker_number)
      total_min_id = query.pluck("min(#{query.table_name}.id)").first.to_i
      total_max_id = query.pluck("max(#{query.table_name}.id)").first.to_i
      split_range(total_min_id, total_max_id, worker_number)
    end

    def split_range(total_min_id, total_max_id, worker_number)
      if total_max_id == 0
        return []
      elsif (total_max_id - total_min_id + 1) < worker_number
        # needn't so much worker
        return [{min_id: total_min_id, max_id: total_max_id, index: 0}]
      end

      range_in_thread = (total_max_id - total_min_id + 1) / worker_number
      result = []
      (worker_number - 1).times.each do |i|
        min_id = total_min_id + i * range_in_thread
        max_id = total_min_id + (i + 1) * range_in_thread - 1
        result << {min_id: min_id, max_id: max_id, index: i}
      end

      # last thread
      min_id = total_min_id + (worker_number - 1) * range_in_thread
      max_id = total_max_id
      result << {min_id: min_id, max_id: max_id, index: worker_number - 1}
      result
    end

  end
end

设计一个 BatchJob 用来做数据导出

class BatchJob < ApplicationJob
  queue_as :default

  class_attribute :query_block, :worker_number, :job_block
  def self.set_query(&block)
    self.query_block = block
  end

  def self.set_worker_number(number = 10)
    self.worker_number = number
  end

  def self.set_job(&block)
    self.job_block = block
  end

  def self.perform_batch_later
    query = query_block.call
    argu_group = QueryThread.split_to_array(query, worker_number || 10)
    argu_group.each do |argus|
      self.perform_later(argus)
    end
  end

  def perform(options = {})
    min_id = options.fetch(:min_id)
    max_id = options.fetch(:max_id)

    q = self.class.query_block.call
    subquery = q.where("#{q.table_name}.id >= ? and #{q.table_name}.id <= ?", min_id, max_id)
    self.class.job_block.call(subquery, options)
  end

end

如下用法,写个 job 继承 BatchJob

  • set_query 里写查询语句,
  • set_worker_number 设定分几次执行(可以同时执行),
  • set_job 里写具体查询到的结果进行逻辑
  • 执行的时候 ExportDataJob.perform_batch_later 即可,以下例子会把查询拆成 40 个 job,可同时导出,最后再根据 Attachment 的数据合并即可。
class ExportDataJob < BatchJob
  set_query {
    Customer.where("updated_at > ?", Time.parse("Tue Apr 3 14:08:33 2018 -0500"))
  }

  set_worker_number 40
  set_job do |query, thread_hash|
    Attachment.upload_csv!(Rails.root.join("tmp/fix_grandfater_#{thread_hash[:index]}.csv"), "wb") do |csv|
      query.includes(:user).find_each(batch_size: 100) do |customer|
        csv << [customer.id, customer.name, ...] #自己写逻辑
      end
    end
  end

end

根据以上原理,可以写出类似的方法做导入

class CsvJob < ApplicationJob
  queue_as :default

  class_attribute :worker_number, :job_block

  def self.set_worker_number(number = 10)
    self.worker_number = number
  end

  def self.set_job(&block)
    self.job_block = block
  end

  # CsvJob.perform_batch_later(123, csv: {headers: true}, job: {store: "123"})
  def self.perform_batch_later(attachment_id, options = {})
    options[:csv] ||= {}
    csv_data = Attachment.find(attachment_id).file.read
    total_lines = CSV.parse(csv_data, options[:csv]).count

    worker_group = QueryThread.split_range(0, total_lines - 1, worker_number || 10)
    worker_group.each do |worker_options|
      self.perform_later(attachment_id, options, worker_options)
    end
  end

  def perform(attachment_id, options = {}, worker_options = {})
    temp_filename = Rails.root.join("tmp/temp_csv_#{attachment_id}_#{options[:index]}.csv")
    File.open(temp_filename, "wb") do |f|
      f.write Attachment.find(attachment_id).file.read
    end

    csv = CSV.foreach(temp_filename, options[:csv])

    self.class.job_block.call csv, worker_options, options[:job]
  end
end

用法如下,

  • set_worker_number 设置分几次完成
  • set_job 设置逻辑,这里手动和 worker_options[:min_id] [:max_id] 做对比 代码丑陋了点。

调用时 参数为含有 csv 文件信息的 Attachment 的 id,以及其他自定义参数 #ImportCustomerJob.perform_batch_later(attachment_id, csv: {headers: true}, job: {store: "stage"})


class ImportCustomerJob < CsvJob

  set_worker_number 20

  set_job do |csv, worker_options, job_options|
    job_options ||= {}

    Attachment.log("import_#{job_options[:store]}_#{worker_options[:index]}") do |logger|
      logger.info "start.."
      index = 0
      csv.each do |row|

        if index < worker_options[:min_id]
          index += 1
          next
        end
        if index > worker_options[:max_id]
          break
        end

        Customer.create_or_update_by!(store: job_options[:store], remote_id: row["remote_id"]) do |c|
          c.attributes = row.to_h
        end

        index += 1
      end
      logger.info "finish.."
    end
  end
end

最后附上 Attachment,代码随便写了写。起个持久化的作用

class Attachment < ApplicationRecord
  mount_uploader :file, S3FileUploader


  def self.upload_file!(filename, name = nil)
    a = Attachment.new
    file = File.open(filename, "r")
    a.file = file
    a.name = name
    a.save!
    a
  ensure
    file.close if file
  end

  def self.upload_csv!(filename, csv_options, &block)
    CSV.open(filename, csv_options) do |csv|
      block.call(csv)
    end
  ensure
    upload_file!(filename)
  end

  # todo add log and upload to s3
  def self.log(name, &block)
    log_file = open(Rails.root.join("tmp/#{name}.log"), "w")
    log_file.sync = true
    logger = Logger.new(log_file)
    logger.formatter = proc{|severity, time, program, msg|  "#{severity}: #{msg}\n" }

    begin
      block.call(logger)
    rescue Exception => e
      logger.error e.message
      logger.error e.backtrace.join("\n")
      raise e
    ensure
      log_file.close
      Attachment.upload_file!(Rails.root.join("tmp/#{name}.log"), name)
    end
  end
end

这样不修改 Sidekiq 就可以完成导入,导出。也不用担心 Thread 的问题。全程有记录,报错的话 Sidekiq 会重启任务(一般是你自己的逻辑有问题),符合你的要求。

Reply to yakjuly #19

感谢分享,我们会参考一下。

我们在设计的时候会考虑几点:

1. Sidekiq Worker不需要去管理job的状态, 也就是并发的任务本身对job状态是无感知的, 也没有状态的概念
2. 工具是通用的, 不仅仅在导出场景, 任何批量多任务处理, 都可以使用.

在新版本的 Rails 中,有 ActiveJob, 里面的 callback 可以做到这些功能,但是我们并未在上面做扩展,因为

  • 我们使用的 Rails 版本太低了 (<5)
  • 我们希望这个更底层一些,所以选择使用了 Sidekiq 的 Middleware

您说的这个也是一个解决方案,但是个人觉得太偏向于某一种场景了,如果换一种场景的话,可能又得再实现一遍; 建议可以抽离出一个插件,做成一个执行引擎,相信从可读性和实用性而言会更好一些。

学习到了

👏 👏 👏 不错不错 学习

最近使用了 rundeck,rundeck 对每个 job 的一个 execution 提供了详细的信息。execution 还支持回调

大佬,我这边有一个问题,就是我的 sidekiq 并发 5 个以上的任务运行一段时间就会报错:stream closed 然后杀死当前的 work,不会重启该 work。怎么破

You need to Sign in before reply, if you don't have an account, please Sign up first.