Ruby 基于 Sidekiq 的异步任务管理引擎设计

dengqinghua · 2018年05月28日 · 最后由 admins_admin 回复于 2018年06月07日 · 1477 次阅读
本帖已被设为精华帖!

基于Sidekiq的异步任务管理引擎

原文地址: http://blog.dengqinghua.net/sidekiq_task_event.html

Github: https://github.com/dengqinghua/roses/blob/master/source/sidekiq_task_event.md

我们在项目中大量使用到了Sidekiq 作为队列任务处理, 但是Sidekiq无法获取到每一个任务的处理情况.

在系统中有一类问题的抽象为:

批量处理n个任务, 每个任务都比较耗时, 希望可以快速地处理, 并且能知道每一个任务的执行结果情况.

基于这类问题, 我们研发了基于Sidekiq的异步任务管理引擎.

阅读完该文档之后, 您将了解到:

  • Sidekiq基本框架源码分析.
  • Sidekiq Middleware.
  • 异步任务管理引擎设计.

Sidekiq基本框架

Sidekiq基于Redis作为存储, 一个例子如下:

sidekiq_exmaple

Sidekiq Client

Sidekiq Client部分为队列数据的生产者, 在 Sidekiq 源码中可以看到

module Sidekiq
  class Client
    def push(item)
      normed = normalize_item(item)
      payload = process_single(item['class'.freeze], normed)

      if payload
        raw_push([payload])
        payload['jid'.freeze]
      end
    end

    def atomic_push(conn, payloads)
      q = payloads.first['queue'.freeze]
      now = Time.now.to_f
      to_push = payloads.map do |entry|
        entry['enqueued_at'.freeze] = now
        Sidekiq.dump_json(entry)
      end
      conn.sadd('queues'.freeze, q)
      conn.lpush("queue:#{q}", to_push)
    end
  end
end

最终会在Redis中存储下面这些信息

  • retry 重试次数
  • queue 队列名称
  • backtrace 错误栈
  • class 处理类名称
  • args 参数
  • jid job_id
  • enqueued_at 进入队列的时间

并将这些信息通过lpush存储在Redis的队列中.

Sidekiq Server

Before 4.0

Sidekiq4.0之前, 使用的是Celluloid作为多线程的抽象层

模型如下:

sidekiq_actor_architecture

源码分析请参考 Working With Ruby Threads-Chapter 15

After 4.0

在4.0版本之后, Sidekiq出于性能考虑, 使用原生的Thread实现了一个简易的Actor版本模型. 相关文章请见

模型如下:

sidekiq_new_framework

核心的组件包括

  1. Manager

    Manager 根据用户设置的并发数, 生成处理队列任务的 Processor, 并对idle或者dead的 Processsor 进行管理, 包括:

    1. start: Spin up Processors.
    2. processor_died: Handle job failure, throw away Processor, create new one.
    3. quiet: shutdown idle Processors.
    4. stop: hard stop the Processors by deadline.
    

    初始化Manager

    class Manager
      def initialize(options={})
        logger.debug { options.inspect }
        @options = options
        @count = options[:concurrency] || 25
        raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
    
        # @done代表是否结束处理任务
        @done = false
        @workers = Set.new
    
        # 生成多个Processor, 每一个Processor对象在被调用start方法的时候, 会生成了一个线程
        @count.times do
          @workers << Processor.new(self)
        end
    
        # 添加一个锁, 用于修改 @workers 的数据, 管理Processor对象
        @plock = Mutex.new
      end
    end
    

    启动Manager, 即调用Processor#start

    class Manager
      def start
        @workers.each do |x|
          x.start
        end
      end
    end
    
  2. Processor

    Processor 是处理任务的类, 包括下面的功能

    1. fetches a job from Redis using brpop
    2. executes the job
      a. instantiate the Worker
      b. run the middleware chain
      c. call #perform
    

    Processor#start, 启动Processor, 创建一个线程

    class Processor
      def start
        # 生成一个线程, 并调用run方法
        @thread ||= safe_thread("processor", &method(:run))
      end
    end
    

    Processor#run, 处理任务, 去Redis获取队列数据

    # @mgr 即为他对应的 Manager 对象
    class Processor
      def run
        begin
          while !@done
            # 调用 perform 方法进行处理
            process_one
          end
    
          # 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
          @mgr.processor_stopped(self)
        rescue Sidekiq::Shutdown
          # 在接收到TERM SIGNAL之后, 等待超时的时候sidekiq会抛出异常 Sidekiq::Shutdown, 见下文分析
          # 线程被关闭.
          @mgr.processor_stopped(self)
        rescue Exception => ex
          # 程序报错了, Manager#processor_died 会重新生成一个新的Processor线程
          @mgr.processor_died(self, ex)
        end
      end
    end
    

队列重启时job的处理

当我们更新代码后, 需要重启Sidekiq的进程. 一般来说, 我们会发送一个 TERM SIGNAL 指令给Sidekiq进程, 它的执行步骤如下

  1. 停止Fetch jobs.

    class Manager
      def quiet
        return if @done
        # 将 @done 设置为 true
        @done = true
    
        logger.info { "Terminating quiet workers" }
        @workers.each { |x| x.terminate } # 这里的每一个 x 都是一个Processor对象
        fire_event(:quiet, reverse: true)
      end
    end
    
    class Processsor
      def terminate(wait=false)
        @done = true # 将每一个Processor 的 @done 设置为 true, 下面的run方法则不再fetch新的job
        return if !@thread
        @thread.value if wait
      end
    
      def run
        begin
          while !@done
            process_one
          end
          # 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
          @mgr.processor_stopped(self)
        rescue Sidekiq::Shutdown
          @mgr.processor_stopped(self)
        rescue Exception => ex
          @mgr.processor_died(self, ex)
      end
    end
    
  2. 等待Sidekiq.options[:timeout]秒(默认为8秒)的时间, 使得Processor去处理完当前未完成的jobs

    class Manager
      def stop(deadline)
        quiet
        fire_event(:shutdown, reverse: true)
    
        # some of the shutdown events can be async,
        # we don't have any way to know when they're done but
        # give them a little time to take effect
        sleep PAUSE_TIME
        return if @workers.empty?
    
        logger.info { "Pausing to allow workers to finish..." }
        remaining = deadline - Time.now
    
        # 等待默认的8s后, 如果 @workers 为空, 则代表在规定时间内任务都处理完, 退出
        while remaining > PAUSE_TIME
          return if @workers.empty?
          sleep PAUSE_TIME
          remaining = deadline - Time.now
        end
        return if @workers.empty?
    
        # 等待默认的8s后, 如果 @workers 不为空, 则进行强制shutdown
        hard_shutdown
      end
    end
    
  3. 如果在等待时间之后, 仍存在正在处理的job, 则将job通过rpush命令推入Redis, 强制使 processor 退出

    class Manager
      def hard_shutdown
        # We've reached the timeout and we still have busy workers.
        # They must die but their jobs shall live on.
        cleanup = nil
        @plock.synchronize do
          cleanup = @workers.dup
        end
    
        if cleanup.size > 0
          # 获取没有处理完的job
          jobs = cleanup.map {|p| p.job }.compact
    
          logger.warn { "Terminating #{cleanup.size} busy worker threads" }
          logger.warn { "Work still in progress #{jobs.inspect}" }
    
          # Re-enqueue unfinished jobs
          # NOTE: You may notice that we may push a job back to redis before
          # the worker thread is terminated. This is ok because Sidekiq's
          # contract says that jobs are run AT LEAST once. Process termination
          # is delayed until we're certain the jobs are back in Redis because
          # it is worse to lose a job than to run it twice.
          strategy = (@options[:fetch] || Sidekiq::BasicFetch)
    
          # 将未处理完的jobs推入队列的头部
          strategy.bulk_requeue(jobs, @options)
        end
    
        # 强制kill掉线程
        cleanup.each do |processor|
          processor.kill
        end
      end
    end
    
    class Processor
      def kill(wait=false)
        @done = true
        return if !@thread
        # unlike the other actors, terminate does not wait
        # for the thread to finish because we don't know how
        # long the job will take to finish.  Instead we
        # provide a `kill` method to call after the shutdown
        # timeout passes.
        @thread.raise ::Sidekiq::Shutdown
        @thread.value if wait
      end
    end
    

NOTE: 注意在接收到TERM SIGNAL一些job有可能被重复执行. Sidekiq的FAQ中有说明: Remember that Sidekiq will run your jobs AT LEAST once.

INFO: Sidekiq 还提供了 Scheduling Job 的功能, 即到时执行任务, 该部分使用了一个 SortedSet 的redis数据结构, 排序的因子为任务的执行时间. 在启动 Sidekiq 服务的时候, 会启动了一个线程轮询所有执行时间小于等于当前时间的队列数据, 将该部分的数据在pop至队列, 再由 Processor 处理.

Sidekiq Middleware

Sidekiq 在 client-side 和 server-side 都支持AOP操作, 该部分和Rack的原理一致.

有了server-side middleware的支持, 我们可以

在sidekiq处理任务前后, 捕捉到任务的处理情况

如Sidekiq提供了 ActiveRecordserver-side middleware

module Sidekiq
  module Middleware
    module Server
      class ActiveRecord
        def initialize
          # With Rails 5+ we must use the Reloader **always**.
          # The reloader handles code loading and db connection management.
          if defined?(::Rails) && ::Rails::VERSION::MAJOR >= 5
            raise ArgumentError, "Rails 5 no longer needs or uses the ActiveRecord middleware."
          end
        end

        def call(*args)
          yield
        ensure
          ::ActiveRecord::Base.clear_active_connections!
        end
      end
    end
  end
end

对于基于Rails的Sidekiq服务, Sidekiq会确保在每次执行任务之后, 都会清掉使用的连接, 避免多线程占用过多的Rails数据库连接.

AsyncTask

需求分析

我们经常有一些这样的需求:

1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

最开始我们都是通过串行的方式进行处理, 比如

1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息

我们提供一个商品的HTTP接口, 然后由JS发Ajax请求进行调用, 但是该方式有一些问题:

  • 数据容易丢失
  • 一些接口请求很慢, 容易造成超时
  • JS交互复杂, 大量的逻辑都放在了前端, 出问题不好排查

但是对于数据量大的情况, 串行调用变得非常慢, 如

2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

我们考虑使用Sidekiq进行处理, 即每一个任务都放在Redis里面. 调用perform_async方法, 获取到任务的job_id

job_id = ProductWorker.perform_async(params)

但是新的问题出现了: 我们无法获取到这个job的完成情况, 如果逻辑上处理失败, 也无法获取到对应的错误信息.

NOTE: Sidekiq-Pro 支持batches功能, 但是它是收费的.

我们最终决定利用 Sidekiq 的 Middleware 特性, 研发出一套异步任务管理引擎, 它支持

  • 任务的聚合管理. 一个task和多个job进行关联
  • 可以获得job的执行状态
  • 所有执行过程可视化

AsyncTask

任务处理引擎架构图

async_task

它包含三部分

  1. 创建Task, 生成task_id, 将每一个任务都推入Redis, 并获取到对应的job_id
  2. 生成Event记录, 该Event和job_id一一对应, 记录了整个job的生命周期
  3. 利用Server-Side Middleware, 记录Event的状态和相关信息

NOTE: 步骤一的job_id由Sidekiq生成

Task 和 Event 创建

我们将Task和Event都创建了对应的数据库表, 则

class Task
  has_many :events
end

class Event
  validates_uniqueness_of :job_id
  belongs_to :task
end

Task的数据结构为

字段 释义
worker_name worker的名称
id 主键id

Event的数据结构为

字段 释义
job_id 任务id,全局唯一
status 当前状态,包括enqueue,working, finish, failed, error
params 任务执行的所有参数
added_messages 增量的信息,记录整个任务的流程

NOTE: 注意到status包含了 falied 和 error 两个不同的状态. 其中 failed 代表为 业务逻辑上的失败, 如一个卖家因为资质不合格导致无法报名, 为了获取该状态, 处理时可直接抛出异常(NormalException), 状态为failed. 而 error 代表为系统错误, 如程序bug或者接口超时等

Server-Side Middleware

在这里我们配置了 use_task_event, 如果需要使用该插件, 需要在 worker 中配置 use_task_event: true.

class AWorker
  include Sidekiq::Worker

  sidekiq_options use_task_event: true

  def perform(options)
    handle_job(options)
  end
end

Server-Side Middleware代码和注释如下:

module AsyncTask
  class MiddlewareServer
    def call(worker, item, queue)
      if item['use_task_event'] # 配置入口
        begin
          job_id = item['jid']
          Task.record(job_id, :working, message: "处理中")

          yield

          # 正常处理成功, 设置 status 为 finish
          Task.record(job_id, :finish, message: "已经完成")
        rescue SystemExit, Interrupt => ex
          # 被中断, 设置 status 为 error
          Task.record(job_id, :error, message: "被中断")

          # 如果之后会被重试, 则重新再设置为 :enqueue
          if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
            (retry_status - 1 != retry_count.to_i)

            Task.record(job_id, :enqueue, message: "等待重试")
          end

          raise ex
        rescue NormalException => ex
          # 业务逻辑上的失败, 设置 status 为 failed, 错误信息放在 message 中
          Task.record(job_id, :failed, message: "发生错误: #{ex.message}")
        rescue Exception => ex
          # 程序bug, 设置 status 为 error
          Task.record(job_id, :error, message: "发生致命错误: #{ex.message}")

          # 如果之后会被重试, 则重新再设置为 :enqueue
          if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
            (retry_status - 1 != retry_count.to_i)

            Task.record(job_id, :enqueue, message: "等待重试")
          end

          raise ex
        end
      else
        yield
      end
    end
  end
end

在项目启动时加载该Middleware

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add AsyncTask::MiddlewareServer
  end
end

问题剖析

回顾在文章开始时提到的需求

1. 给卖家批量报名, 一次可以报名200个商品, 进行活动, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

对于需求1, 2, 都可以用相同的处理方式, 流程如下:

  1. 前端一次将所有的数据全部提到给后端.
  2. 后端根据数据量拆分为n个jobs, 并生成一个task_id, 返回给前端.
  3. 前端每隔一段时间, 调用后端的接口来询问 task_id 对于的 job 的状态, 如果出错, 则一同返回错误信息.

对于需求3, 我们可以将50万信息分为不同的worker来处理, 并用统一的task_id进行关联, 也将大大提高导出的效率.

共收到 22 条回复

sidekiq 自带 web 页面的。 sidekiq 也有 api 可以拿到 job 情况的。

lithium4010 回复

可以拿到 job 情况. 但是在sidekiq的存储结构中, 并没有将job_id当做key来存储

通过sidekiq 的api 获取 job 是非常低效的, 在 API 文档 中提到

Find a job by JID (WARNING: this is very inefficient if your queue is big!)

另外, 我们需要获取到 job 的整个状态, 包括 进入队列, 处理中, 成功, 业务校验的失败, 系统的失败 等, 这是 sidekiq 的 api 不具备的.

sidekiq-pro 提供了类似 batches 功能, 但是我们因为一些原因没有使用.

对sidekiq的研究还是比较深入,这一点不可否认,点N个赞。

过一年再来回头来看今天你们这个设计,可能会发现其实还可以走另外的设计方式。具体怎么设计,我并没有细看,所以不发表意见。

zfjoy520 回复

嗯, 可以贴一下链接或者设计思路吗? 我们也都学习一下.

这些其实都是造轮子, 我们的痛点是一些资源申请不下来.

dengqinghua 回复

容我先细细看下你们的思路先^_^

1、sidekiq平民版支持一次批量插入成千上万的job,可以用 Sidekiq::Client.push_bulk(items)这个方法来生成待执行的任务;源码和注释见: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L135 https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L92

这个方法同样也返回了所有生成job的jid,效率如何需要自测。

2、关于失败job的提示信息记录或者是执行过程全记录,可用第1步生成的jid数组来自由发挥,选择适合自身项目的一种即可。 方法有很多种,比如照原来的方式存mysql,步步更新。

3、对于需求3. 批量导出50万大促信息,ruby本身即有一个对大数据的分批方法: each_slice,文档见: http://ruby-doc.org/core-2.5.1/Enumerable.html#method-i-each_slice

可将这50万,按10_000或者更多分为一批,做一些简单拼凑然后导出,降低数据库或其他数据源的查询压力,最后将结果拼装成一个excel或者是你们想要的其他的数据导出格式即可。

临时想到的一些平时用过的方式,也不知道是否符合你们项目的胃口,有说得不恰当的地方请不要笑话。

zfjoy520 回复

我的理解如下:

  1. 消息的生成和消费, 是在不同的经常里面的, 如果需要用 job_id, 则还是需要将 job_id 存储起来
  2. Sidekiq的 API 通过 job_id 寻找这个job 是非常低效的, 这在前文已经说过
  3. each_slice 本身还是在一个进程里面, 无法使用 多核 来进行操作. 虽然在一个进程内的Sidekiq多线程处理, 是无法使用多核的(这里任务是耗CPU的), 另外, 如果是用each_slice 处理, 真的是很慢, 要是中途不小心报错, 整个任务就失败了.
  4. job_id生成是有成本的.
def push_bulk(items)
   ...

  raw_push(payloads) if !payloads.empty?

  # 如果有十万个任务, 这里需要循环十万次得到jid
  payloads.collect { |payload| payload['jid'] }
end

我们部署Sidekiq多个进程, 就有一定概率使用到多核.

50万数据我们是分成多个sidekiq job处理, 并且有一个task_id跟这些job关联. 这样可以用到了多核, 也能观察到各个任务进行的进度, 并且可以重试.

当然单独针对这个业务逻辑, 也可以设计单独的表结构来存储任务.

Sidekiq的 API 通过 job_id 寻找这个job 是非常低效的, 这在前文已经说过

我不是很明白,为什么会有通过一个jid去寻找job的需求

each_slice多核问题

是不是可以考虑将50万数据,each_slice成50份或者是100份,enqueue至sidekiq,然后利用多进程sidekiq来跑多核,我并没有要将50万数据扔一个job里面跑的意思。我指是的你可以用each_slice来将数据分片,然后批量创建job。

job_id生成是有成本的

跟第3一样,既然明知是一个很耗时的方法,我们何必非要让push_bulk(items)一次跑10万个,我们将数据切片打散成多个job也是可以的。

zfjoy520 回复

因为我们需要一个job的状态, 以及业务失败时, 这个job的错误信息.

如下面的信息:

{ status: failed, message: "发生错误: 您的资质不满足报名条件" }

我们是通过 raise NormalException 来中断任务, 并将信息存储在异常中的.

所以说, 我们需要通过job_id, 来找到这个job, 访问这个job的状态(status), 并拿到业务逻辑的错误信息(message)

这儿其实跟 Erlang 的 Let It Crash 有点儿像, 但是使用上会稍微简单一点.

另外, 所有跟job相关的参数, 状态信息也都会入到数据库. 因为Sidekiq 的 任务是使用 Redis#brpop 命令的, 消费了之后, 消息就没有了, 就找不到这个 job 了.

huacnlee 将本帖设为了精华贴 05月29日 10:11
zfjoy520 回复

each_slice 这块我理解得不对, 我们在实际业务的做法跟您所说的是一致的.

dengqinghua 回复

喔,懂了,可能设计思路不太相同。

其实你已经有了数据库的持久化形式了,在job的运行过程中,也对整个流程做了步进日志的记录,你可以不用再去访问job的状态,前端展示只需要轮询持久化之后的结果就可以了。

zfjoy520 回复

是的, 我们是这样做的.

job_id 在执行

job_id = AWorker.perform_async(params)

的时候, 已经生成了. 但是需要利用 Sidekiq 的 Middleware, 去更新这个 job 的执行情况.

对于前端, 只需要轮询状态就可以了.

请问 Sidekiq 在使用redis的时候,有场景用到 RPOPLPUSH 了木有?还是 Pro 版才用到了

pathbox 回复

Pro版本使用到了, 用来做高可用, 保证数据不丢的.

Sidekiq Pro offers an alternative fetch strategy, super_fetch, for job processing using Redis' RPOPLPUSH command which keeps jobs in Redis.

wiki: 这里

dengqinghua 回复

你单独做了一套任务状态跟踪,我认为这一块可以独立出来的不用和 sidekiq 这个东西耦合。

lithium4010 回复

是的,您说得对。

但是这一块是专门为sidekiq定制的,数据结构设计还包括 sidekiq_class 等,这些其实也可以用在 kafka 中。

dengqinghua 回复

是的。Pro版的代码应该没有开源吧,要想用RPOPLPUSH保证更高可用,得自己考虑改下源码。。。。。。

pathbox 回复

嗯, 是的.

这个文档解决的问题, 是希望耗时的任务 高性能并且状态可跟踪, 跟高可用关系不大, 所以没有考虑自己实现 RPOPLPUSH

你的需求不用hack也能实现。 需求无非是下面几点

  1. 导入需要高性能
  2. 导出需要高性能
  3. 需要有跟踪报错功能

我的做法是把任务分拆,利用sidekiq多个worker同时执行任务,将结果合并即可。

实现1导出可以利用find_in_batches, find_each 实现2导入可以利用CSV.foreach 实现3跟踪报错可以用一个model记录Job执行过程中产生的数据和日志即可

# QueryThread用来把一个大量数据的查询拆分成多个小任务
# a = Concurrent::Array.new
# QueryThread.split(Subscription.where("status = ?", "canceled").where("id < 1000"), 5) do |subquery, idx, min_id, max_id|
#   subquery.find_each do |sub|
#     a << [idx, sub.id, min_id, max_id]
#   end
# end
#
class QueryThread
  def initialize(query, min_id, max_id, index, &block)
    @thread = Thread.new do
      subquery = query.where("#{query.table_name}.id >= ? and #{query.table_name}.id <= ?", min_id, max_id)
      block.call(subquery, index, min_id, max_id)
    end
  end

  def join
    @thread.join
  end

  class << self

    def split(query, thread_count, &block)
      arguments_groups = split_to_array(query, thread_count)

      query_threads = []
      arguments_groups.each do |argument|
        query_threads << QueryThread.new(query, argument[:min_id], argument[:max_id], argument[:index], &block)
      end
      query_threads.each(&:join)
    end

   #根据查询语句的最大id和最小id分组
    def split_to_array(query, worker_number)
      total_min_id = query.pluck("min(#{query.table_name}.id)").first.to_i
      total_max_id = query.pluck("max(#{query.table_name}.id)").first.to_i
      split_range(total_min_id, total_max_id, worker_number)
    end

    def split_range(total_min_id, total_max_id, worker_number)
      if total_max_id == 0
        return []
      elsif (total_max_id - total_min_id + 1) < worker_number
        # needn't so much worker
        return [{min_id: total_min_id, max_id: total_max_id, index: 0}]
      end

      range_in_thread = (total_max_id - total_min_id + 1) / worker_number
      result = []
      (worker_number - 1).times.each do |i|
        min_id = total_min_id + i * range_in_thread
        max_id = total_min_id + (i + 1) * range_in_thread - 1
        result << {min_id: min_id, max_id: max_id, index: i}
      end

      # last thread
      min_id = total_min_id + (worker_number - 1) * range_in_thread
      max_id = total_max_id
      result << {min_id: min_id, max_id: max_id, index: worker_number - 1}
      result
    end

  end
end

设计一个BatchJob用来做数据导出

class BatchJob < ApplicationJob
  queue_as :default

  class_attribute :query_block, :worker_number, :job_block
  def self.set_query(&block)
    self.query_block = block
  end

  def self.set_worker_number(number = 10)
    self.worker_number = number
  end

  def self.set_job(&block)
    self.job_block = block
  end

  def self.perform_batch_later
    query = query_block.call
    argu_group = QueryThread.split_to_array(query, worker_number || 10)
    argu_group.each do |argus|
      self.perform_later(argus)
    end
  end

  def perform(options = {})
    min_id = options.fetch(:min_id)
    max_id = options.fetch(:max_id)

    q = self.class.query_block.call
    subquery = q.where("#{q.table_name}.id >= ? and #{q.table_name}.id <= ?", min_id, max_id)
    self.class.job_block.call(subquery, options)
  end

end

如下用法,写个job继承BatchJob

  • set_query 里写查询语句,
  • set_worker_number 设定分几次执行(可以同时执行),
  • set_job里写具体查询到的结果进行逻辑
  • 执行的时候ExportDataJob.perform_batch_later即可,以下例子会把查询拆成40个job,可同时导出,最后再根据Attachment的数据合并即可。
class ExportDataJob < BatchJob
  set_query {
    Customer.where("updated_at > ?", Time.parse("Tue Apr 3 14:08:33 2018 -0500"))
  }

  set_worker_number 40
  set_job do |query, thread_hash|
    Attachment.upload_csv!(Rails.root.join("tmp/fix_grandfater_#{thread_hash[:index]}.csv"), "wb") do |csv|
      query.includes(:user).find_each(batch_size: 100) do |customer|
        csv << [customer.id, customer.name, ...] #自己写逻辑
      end
    end
  end

end

根据以上原理,可以写出类似的方法做导入

class CsvJob < ApplicationJob
  queue_as :default

  class_attribute :worker_number, :job_block

  def self.set_worker_number(number = 10)
    self.worker_number = number
  end

  def self.set_job(&block)
    self.job_block = block
  end

  # CsvJob.perform_batch_later(123, csv: {headers: true}, job: {store: "123"})
  def self.perform_batch_later(attachment_id, options = {})
    options[:csv] ||= {}
    csv_data = Attachment.find(attachment_id).file.read
    total_lines = CSV.parse(csv_data, options[:csv]).count

    worker_group = QueryThread.split_range(0, total_lines - 1, worker_number || 10)
    worker_group.each do |worker_options|
      self.perform_later(attachment_id, options, worker_options)
    end
  end

  def perform(attachment_id, options = {}, worker_options = {})
    temp_filename = Rails.root.join("tmp/temp_csv_#{attachment_id}_#{options[:index]}.csv")
    File.open(temp_filename, "wb") do |f|
      f.write Attachment.find(attachment_id).file.read
    end

    csv = CSV.foreach(temp_filename, options[:csv])

    self.class.job_block.call csv, worker_options, options[:job]
  end
end

用法如下,

  • set_worker_number 设置分几次完成
  • set_job 设置逻辑,这里手动和worker_options[:min_id] [:max_id]做对比 代码丑陋了点。

调用时 参数为含有csv文件信息的Attachment的id,以及其他自定义参数 #ImportCustomerJob.perform_batch_later(attachment_id, csv: {headers: true}, job: {store: "stage"})


class ImportCustomerJob < CsvJob

  set_worker_number 20

  set_job do |csv, worker_options, job_options|
    job_options ||= {}

    Attachment.log("import_#{job_options[:store]}_#{worker_options[:index]}") do |logger|
      logger.info "start.."
      index = 0
      csv.each do |row|

        if index < worker_options[:min_id]
          index += 1
          next
        end
        if index > worker_options[:max_id]
          break
        end

        Customer.create_or_update_by!(store: job_options[:store], remote_id: row["remote_id"]) do |c|
          c.attributes = row.to_h
        end

        index += 1
      end
      logger.info "finish.."
    end
  end
end

最后附上Attachment,代码随便写了写。起个持久化的作用

class Attachment < ApplicationRecord
  mount_uploader :file, S3FileUploader


  def self.upload_file!(filename, name = nil)
    a = Attachment.new
    file = File.open(filename, "r")
    a.file = file
    a.name = name
    a.save!
    a
  ensure
    file.close if file
  end

  def self.upload_csv!(filename, csv_options, &block)
    CSV.open(filename, csv_options) do |csv|
      block.call(csv)
    end
  ensure
    upload_file!(filename)
  end

  # todo add log and upload to s3
  def self.log(name, &block)
    log_file = open(Rails.root.join("tmp/#{name}.log"), "w")
    log_file.sync = true
    logger = Logger.new(log_file)
    logger.formatter = proc{|severity, time, program, msg|  "#{severity}: #{msg}\n" }

    begin
      block.call(logger)
    rescue Exception => e
      logger.error e.message
      logger.error e.backtrace.join("\n")
      raise e
    ensure
      log_file.close
      Attachment.upload_file!(Rails.root.join("tmp/#{name}.log"), name)
    end
  end
end

这样不修改Sidekiq就可以完成导入,导出。也不用担心Thread的问题。全程有记录,报错的话Sidekiq会重启任务(一般是你自己的逻辑有问题),符合你的要求。

yakjuly 回复

感谢分享, 我们会参考一下.

我们在设计的时候会考虑几点:

1. Sidekiq Worker不需要去管理job的状态, 也就是并发的任务本身对job状态是无感知的, 也没有状态的概念
2. 工具是通用的, 不仅仅在导出场景, 任何批量多任务处理, 都可以使用.

在新版本的Rails中, 有ActiveJob, 里面的callback可以做到这些功能, 但是我们并未在上面做扩展, 因为

  • 我们使用的Rails版本太低了(<5)
  • 我们希望这个更底层一些, 所以选择使用了Sidekiq的Middleware

您说的这个也是一个解决方案, 但是个人觉得太偏向于某一种场景了, 如果换一种场景的话, 可能又得再实现一遍; 建议可以抽离出一个插件, 做成一个执行引擎, 相信从可读性和实用性而言会更好一些.

学习到了

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册