Ruby 基于 Sidekiq 的异步任务管理引擎设计

dengqinghua · 2018年05月28日 · 最后由 superjagger 回复于 2019年01月08日 · 7172 次阅读
本帖已被管理员设置为精华贴

基于 Sidekiq 的异步任务管理引擎

原文地址:http://blog.dengqinghua.net/sidekiq_task_event.html

Github: https://github.com/dengqinghua/roses/blob/master/source/sidekiq_task_event.md

我们在项目中大量使用到了Sidekiq 作为队列任务处理,但是 Sidekiq 无法获取到每一个任务的处理情况。

在系统中有一类问题的抽象为:

批量处理 n 个任务,每个任务都比较耗时,希望可以快速地处理,并且能知道每一个任务的执行结果情况。

基于这类问题,我们研发了基于 Sidekiq 的异步任务管理引擎。

阅读完该文档之后,您将了解到:

  • Sidekiq 基本框架源码分析。
  • Sidekiq Middleware.
  • 异步任务管理引擎设计。

Sidekiq 基本框架

Sidekiq 基于 Redis 作为存储,一个例子如下:

sidekiq_exmaple

Sidekiq Client

Sidekiq Client 部分为队列数据的生产者,在 Sidekiq 源码中可以看到

module Sidekiq
  class Client
    def push(item)
      normed = normalize_item(item)
      payload = process_single(item['class'.freeze], normed)

      if payload
        raw_push([payload])
        payload['jid'.freeze]
      end
    end

    def atomic_push(conn, payloads)
      q = payloads.first['queue'.freeze]
      now = Time.now.to_f
      to_push = payloads.map do |entry|
        entry['enqueued_at'.freeze] = now
        Sidekiq.dump_json(entry)
      end
      conn.sadd('queues'.freeze, q)
      conn.lpush("queue:#{q}", to_push)
    end
  end
end

最终会在 Redis 中存储下面这些信息

  • retry 重试次数
  • queue 队列名称
  • backtrace 错误栈
  • class 处理类名称
  • args 参数
  • jid job_id
  • enqueued_at 进入队列的时间

并将这些信息通过lpush存储在 Redis 的队列中。

Sidekiq Server

Before 4.0

Sidekiq4.0 之前,使用的是Celluloid作为多线程的抽象层

模型如下:

sidekiq_actor_architecture

源码分析请参考 Working With Ruby Threads-Chapter 15

After 4.0

在 4.0 版本之后,Sidekiq 出于性能考虑,使用原生的Thread实现了一个简易的 Actor 版本模型。相关文章请见

模型如下:

sidekiq_new_framework

核心的组件包括

  1. Manager

    Manager 根据用户设置的并发数,生成处理队列任务的 Processor, 并对 idle 或者 dead 的 Processsor 进行管理,包括:

    1. start: Spin up Processors.
    2. processor_died: Handle job failure, throw away Processor, create new one.
    3. quiet: shutdown idle Processors.
    4. stop: hard stop the Processors by deadline.
    

    初始化 Manager

    class Manager
      def initialize(options={})
        logger.debug { options.inspect }
        @options = options
        @count = options[:concurrency] || 25
        raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
    
        # @done代表是否结束处理任务
        @done = false
        @workers = Set.new
    
        # 生成多个Processor, 每一个Processor对象在被调用start方法的时候, 会生成了一个线程
        @count.times do
          @workers << Processor.new(self)
        end
    
        # 添加一个锁, 用于修改 @workers 的数据, 管理Processor对象
        @plock = Mutex.new
      end
    end
    

    启动 Manager, 即调用Processor#start

    class Manager
      def start
        @workers.each do |x|
          x.start
        end
      end
    end
    
  2. Processor

    Processor 是处理任务的类,包括下面的功能

    1. fetches a job from Redis using brpop
    2. executes the job
      a. instantiate the Worker
      b. run the middleware chain
      c. call #perform
    

    Processor#start, 启动 Processor, 创建一个线程

    class Processor
      def start
        # 生成一个线程, 并调用run方法
        @thread ||= safe_thread("processor", &method(:run))
      end
    end
    

    Processor#run, 处理任务,去 Redis 获取队列数据

    # @mgr 即为他对应的 Manager 对象
    class Processor
      def run
        begin
          while !@done
            # 调用 perform 方法进行处理
            process_one
          end
    
          # 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
          @mgr.processor_stopped(self)
        rescue Sidekiq::Shutdown
          # 在接收到TERM SIGNAL之后, 等待超时的时候sidekiq会抛出异常 Sidekiq::Shutdown, 见下文分析
          # 线程被关闭.
          @mgr.processor_stopped(self)
        rescue Exception => ex
          # 程序报错了, Manager#processor_died 会重新生成一个新的Processor线程
          @mgr.processor_died(self, ex)
        end
      end
    end
    

队列重启时 job 的处理

当我们更新代码后,需要重启Sidekiq的进程。一般来说,我们会发送一个 TERM SIGNAL 指令给 Sidekiq 进程,它的执行步骤如下

  1. 停止 Fetch jobs.

    class Manager
      def quiet
        return if @done
        # 将 @done 设置为 true
        @done = true
    
        logger.info { "Terminating quiet workers" }
        @workers.each { |x| x.terminate } # 这里的每一个 x 都是一个Processor对象
        fire_event(:quiet, reverse: true)
      end
    end
    
    class Processsor
      def terminate(wait=false)
        @done = true # 将每一个Processor 的 @done 设置为 true, 下面的run方法则不再fetch新的job
        return if !@thread
        @thread.value if wait
      end
    
      def run
        begin
          while !@done
            process_one
          end
          # 一旦结束了, 则将 Processor对象中的manager对应的worker去掉, 即是改变了上述 Manager的 @workers 数组
          @mgr.processor_stopped(self)
        rescue Sidekiq::Shutdown
          @mgr.processor_stopped(self)
        rescue Exception => ex
          @mgr.processor_died(self, ex)
      end
    end
    
  2. 等待Sidekiq.options[:timeout]秒 (默认为 8 秒) 的时间,使得 Processor 去处理完当前未完成的 jobs

    class Manager
      def stop(deadline)
        quiet
        fire_event(:shutdown, reverse: true)
    
        # some of the shutdown events can be async,
        # we don't have any way to know when they're done but
        # give them a little time to take effect
        sleep PAUSE_TIME
        return if @workers.empty?
    
        logger.info { "Pausing to allow workers to finish..." }
        remaining = deadline - Time.now
    
        # 等待默认的8s后, 如果 @workers 为空, 则代表在规定时间内任务都处理完, 退出
        while remaining > PAUSE_TIME
          return if @workers.empty?
          sleep PAUSE_TIME
          remaining = deadline - Time.now
        end
        return if @workers.empty?
    
        # 等待默认的8s后, 如果 @workers 不为空, 则进行强制shutdown
        hard_shutdown
      end
    end
    
  3. 如果在等待时间之后,仍存在正在处理的 job, 则将 job 通过 rpush 命令推入 Redis, 强制使 processor 退出

    class Manager
      def hard_shutdown
        # We've reached the timeout and we still have busy workers.
        # They must die but their jobs shall live on.
        cleanup = nil
        @plock.synchronize do
          cleanup = @workers.dup
        end
    
        if cleanup.size > 0
          # 获取没有处理完的job
          jobs = cleanup.map {|p| p.job }.compact
    
          logger.warn { "Terminating #{cleanup.size} busy worker threads" }
          logger.warn { "Work still in progress #{jobs.inspect}" }
    
          # Re-enqueue unfinished jobs
          # NOTE: You may notice that we may push a job back to redis before
          # the worker thread is terminated. This is ok because Sidekiq's
          # contract says that jobs are run AT LEAST once. Process termination
          # is delayed until we're certain the jobs are back in Redis because
          # it is worse to lose a job than to run it twice.
          strategy = (@options[:fetch] || Sidekiq::BasicFetch)
    
          # 将未处理完的jobs推入队列的头部
          strategy.bulk_requeue(jobs, @options)
        end
    
        # 强制kill掉线程
        cleanup.each do |processor|
          processor.kill
        end
      end
    end
    
    class Processor
      def kill(wait=false)
        @done = true
        return if !@thread
        # unlike the other actors, terminate does not wait
        # for the thread to finish because we don't know how
        # long the job will take to finish.  Instead we
        # provide a `kill` method to call after the shutdown
        # timeout passes.
        @thread.raise ::Sidekiq::Shutdown
        @thread.value if wait
      end
    end
    

NOTE: 注意在接收到TERM SIGNAL一些 job 有可能被重复执行。Sidekiq 的 FAQ 中有说明:Remember that Sidekiq will run your jobs AT LEAST once.

INFO: Sidekiq 还提供了 Scheduling Job 的功能,即到时执行任务,该部分使用了一个 SortedSet 的 redis 数据结构,排序的因子为任务的执行时间。在启动 Sidekiq 服务的时候,会启动了一个线程轮询所有执行时间小于等于当前时间的队列数据,将该部分的数据在 pop 至队列,再由 Processor 处理。

Sidekiq Middleware

Sidekiq 在 client-side 和 server-side 都支持 AOP 操作,该部分和Rack的原理一致。

有了server-side middleware的支持,我们可以

在sidekiq处理任务前后, 捕捉到任务的处理情况

如 Sidekiq 提供了 ActiveRecordserver-side middleware

module Sidekiq
  module Middleware
    module Server
      class ActiveRecord
        def initialize
          # With Rails 5+ we must use the Reloader **always**.
          # The reloader handles code loading and db connection management.
          if defined?(::Rails) && ::Rails::VERSION::MAJOR >= 5
            raise ArgumentError, "Rails 5 no longer needs or uses the ActiveRecord middleware."
          end
        end

        def call(*args)
          yield
        ensure
          ::ActiveRecord::Base.clear_active_connections!
        end
      end
    end
  end
end

对于基于 Rails 的 Sidekiq 服务,Sidekiq 会确保在每次执行任务之后,都会清掉使用的连接,避免多线程占用过多的 Rails 数据库连接。

AsyncTask

需求分析

我们经常有一些这样的需求:

1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

最开始我们都是通过串行的方式进行处理,比如

1. 给卖家批量报名活动, 一次可以报名200个商品, 如果报名失败的记录, 需要有提示信息

我们提供一个商品的 HTTP 接口,然后由 JS 发 Ajax 请求进行调用,但是该方式有一些问题:

  • 数据容易丢失
  • 一些接口请求很慢,容易造成超时
  • JS 交互复杂,大量的逻辑都放在了前端,出问题不好排查

但是对于数据量大的情况,串行调用变得非常慢,如

2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

我们考虑使用 Sidekiq 进行处理,即每一个任务都放在 Redis 里面。调用 perform_async 方法,获取到任务的 job_id

job_id = ProductWorker.perform_async(params)

但是新的问题出现了:我们无法获取到这个 job 的完成情况,如果逻辑上处理失败,也无法获取到对应的错误信息。

NOTE: Sidekiq-Pro 支持 batches 功能,但是它是收费的。

我们最终决定利用 Sidekiq 的 Middleware 特性,研发出一套异步任务管理引擎,它支持

  • 任务的聚合管理。一个 task 和多个 job 进行关联
  • 可以获得 job 的执行状态
  • 所有执行过程可视化

AsyncTask

任务处理引擎架构图

async_task

它包含三部分

  1. 创建 Task, 生成 task_id, 将每一个任务都推入 Redis, 并获取到对应的 job_id
  2. 生成 Event 记录,该 Event 和 job_id 一一对应,记录了整个 job 的生命周期
  3. 利用 Server-Side Middleware, 记录 Event 的状态和相关信息

NOTE: 步骤一的 job_id 由 Sidekiq 生成

Task 和 Event 创建

我们将 Task 和 Event 都创建了对应的数据库表,则

class Task
  has_many :events
end

class Event
  validates_uniqueness_of :job_id
  belongs_to :task
end

Task 的数据结构为

字段 释义
worker_name worker 的名称
id 主键 id

Event 的数据结构为

字段 释义
job_id 任务 id,全局唯一
status 当前状态,包括enqueue,working, finish, failed, error
params 任务执行的所有参数
added_messages 增量的信息,记录整个任务的流程

NOTE: 注意到 status 包含了 falied 和 error 两个不同的状态。其中 failed 代表为 业务逻辑上的失败,如一个卖家因为资质不合格导致无法报名,为了获取该状态,处理时可直接抛出异常 (NormalException), 状态为 failed. 而 error 代表为系统错误,如程序 bug 或者接口超时等

Server-Side Middleware

在这里我们配置了 use_task_event, 如果需要使用该插件,需要在 worker 中配置 use_task_event: true.

class AWorker
  include Sidekiq::Worker

  sidekiq_options use_task_event: true

  def perform(options)
    handle_job(options)
  end
end

Server-Side Middleware 代码和注释如下:

module AsyncTask
  class MiddlewareServer
    def call(worker, item, queue)
      if item['use_task_event'] # 配置入口
        begin
          job_id = item['jid']
          Task.record(job_id, :working, message: "处理中")

          yield

          # 正常处理成功, 设置 status 为 finish
          Task.record(job_id, :finish, message: "已经完成")
        rescue SystemExit, Interrupt => ex
          # 被中断, 设置 status 为 error
          Task.record(job_id, :error, message: "被中断")

          # 如果之后会被重试, 则重新再设置为 :enqueue
          if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
            (retry_status - 1 != retry_count.to_i)

            Task.record(job_id, :enqueue, message: "等待重试")
          end

          raise ex
        rescue NormalException => ex
          # 业务逻辑上的失败, 设置 status 为 failed, 错误信息放在 message 中
          Task.record(job_id, :failed, message: "发生错误: #{ex.message}")
        rescue Exception => ex
          # 程序bug, 设置 status 为 error
          Task.record(job_id, :error, message: "发生致命错误: #{ex.message}")

          # 如果之后会被重试, 则重新再设置为 :enqueue
          if retry_status.is_a?(Integer) && (retry_status > 0) && retry_count &&
            (retry_status - 1 != retry_count.to_i)

            Task.record(job_id, :enqueue, message: "等待重试")
          end

          raise ex
        end
      else
        yield
      end
    end
  end
end

在项目启动时加载该 Middleware

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add AsyncTask::MiddlewareServer
  end
end

问题剖析

回顾在文章开始时提到的需求

1. 给卖家批量报名, 一次可以报名200个商品, 进行活动, 如果报名失败的记录, 需要有提示信息
2. 批量创建活动, 一次导入一个1万条商品的excel, 需要给这1万条数据创建
3. 批量导出50万大促信息

对于需求 1, 2, 都可以用相同的处理方式,流程如下:

  1. 前端一次将所有的数据全部提到给后端。
  2. 后端根据数据量拆分为 n 个 jobs, 并生成一个 task_id, 返回给前端。
  3. 前端每隔一段时间,调用后端的接口来询问 task_id 对于的 job 的状态,如果出错,则一同返回错误信息。

对于需求 3, 我们可以将 50 万信息分为不同的 worker 来处理,并用统一的 task_id 进行关联,也将大大提高导出的效率。

sidekiq 自带 web 页面的。sidekiq 也有 api 可以拿到 job 情况的。

lithium4010 回复

可以拿到 job 情况。但是在 sidekiq 的存储结构中,并没有将 job_id 当做 key 来存储

通过 sidekiq 的 api 获取 job 是非常低效的,在 API 文档 中提到

Find a job by JID (WARNING: this is very inefficient if your queue is big!)

另外,我们需要获取到 job 的整个状态,包括 进入队列,处理中,成功,业务校验的失败,系统的失败 等,这是 sidekiq 的 api 不具备的。

sidekiq-pro 提供了类似 batches 功能,但是我们因为一些原因没有使用。

对 sidekiq 的研究还是比较深入,这一点不可否认,点 N 个赞。

过一年再来回头来看今天你们这个设计,可能会发现其实还可以走另外的设计方式。具体怎么设计,我并没有细看,所以不发表意见。

zfjoy520 回复

嗯,可以贴一下链接或者设计思路吗?我们也都学习一下。

这些其实都是造轮子,我们的痛点是一些资源申请不下来。

dengqinghua 回复

容我先细细看下你们的思路先^_^

1、sidekiq 平民版支持一次批量插入成千上万的 job,可以用 Sidekiq::Client.push_bulk(items) 这个方法来生成待执行的任务;源码和注释见: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L135 https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L92

这个方法同样也返回了所有生成 job 的 jid,效率如何需要自测。

2、关于失败 job 的提示信息记录或者是执行过程全记录,可用第 1 步生成的 jid 数组来自由发挥,选择适合自身项目的一种即可。 方法有很多种,比如照原来的方式存 mysql,步步更新。

3、对于需求 3. 批量导出 50 万大促信息,ruby 本身即有一个对大数据的分批方法:each_slice,文档见: http://ruby-doc.org/core-2.5.1/Enumerable.html#method-i-each_slice

可将这 50 万,按 10_000 或者更多分为一批,做一些简单拼凑然后导出,降低数据库或其他数据源的查询压力,最后将结果拼装成一个 excel 或者是你们想要的其他的数据导出格式即可。

临时想到的一些平时用过的方式,也不知道是否符合你们项目的胃口,有说得不恰当的地方请不要笑话。

zfjoy520 回复

我的理解如下:

  1. 消息的生成和消费,是在不同的经常里面的,如果需要用 job_id, 则还是需要将 job_id 存储起来
  2. Sidekiq 的 API 通过 job_id 寻找这个 job 是非常低效的,这在前文已经说过
  3. each_slice 本身还是在一个进程里面,无法使用 多核 来进行操作。虽然在一个进程内的 Sidekiq 多线程处理,是无法使用多核的 (这里任务是耗 CPU 的), 另外,如果是用 each_slice 处理,真的是很慢,要是中途不小心报错,整个任务就失败了。
  4. job_id 生成是有成本的。
def push_bulk(items)
   ...

  raw_push(payloads) if !payloads.empty?

  # 如果有十万个任务, 这里需要循环十万次得到jid
  payloads.collect { |payload| payload['jid'] }
end

我们部署 Sidekiq 多个进程,就有一定概率使用到多核。

50 万数据我们是分成多个 sidekiq job 处理,并且有一个 task_id 跟这些 job 关联。这样可以用到了多核,也能观察到各个任务进行的进度,并且可以重试.

当然单独针对这个业务逻辑,也可以设计单独的表结构来存储任务。

Sidekiq 的 API 通过 job_id 寻找这个 job 是非常低效的,这在前文已经说过

我不是很明白,为什么会有通过一个 jid 去寻找 job 的需求

each_slice 多核问题

是不是可以考虑将 50 万数据,each_slice 成 50 份或者是 100 份,enqueue 至 sidekiq,然后利用多进程 sidekiq 来跑多核,我并没有要将 50 万数据扔一个 job 里面跑的意思。我指是的你可以用 each_slice 来将数据分片,然后批量创建 job。

job_id 生成是有成本的

跟第 3 一样,既然明知是一个很耗时的方法,我们何必非要让 push_bulk(items) 一次跑 10 万个,我们将数据切片打散成多个 job 也是可以的。

zfjoy520 回复

因为我们需要一个 job 的状态,以及业务失败时,这个 job 的错误信息。

如下面的信息:

{ status: failed, message: "发生错误: 您的资质不满足报名条件" }

我们是通过 raise NormalException 来中断任务,并将信息存储在异常中的。

所以说,我们需要通过 job_id, 来找到这个 job, 访问这个 job 的状态 (status), 并拿到业务逻辑的错误信息 (message)

这儿其实跟 Erlang 的 Let It Crash 有点儿像,但是使用上会稍微简单一点。

另外,所有跟 job 相关的参数,状态信息也都会入到数据库。因为 Sidekiq 的 任务是使用 Redis#brpop 命令的,消费了之后,消息就没有了,就找不到这个 job 了。

huacnlee 将本帖设为了精华贴。 05月29日 10:11
zfjoy520 回复

each_slice 这块我理解得不对,我们在实际业务的做法跟您所说的是一致的。

dengqinghua 回复

喔,懂了,可能设计思路不太相同。

其实你已经有了数据库的持久化形式了,在 job 的运行过程中,也对整个流程做了步进日志的记录,你可以不用再去访问 job 的状态,前端展示只需要轮询持久化之后的结果就可以了。

zfjoy520 回复

是的,我们是这样做的。

job_id 在执行

job_id = AWorker.perform_async(params)

的时候,已经生成了。但是需要利用 Sidekiq 的 Middleware, 去更新这个 job 的执行情况。

对于前端,只需要轮询状态就可以了。

请问 Sidekiq 在使用 redis 的时候,有场景用到 RPOPLPUSH 了木有?还是 Pro 版才用到了

pathbox 回复

Pro 版本使用到了,用来做高可用,保证数据不丢的。

Sidekiq Pro offers an alternative fetch strategy, super_fetch, for job processing using Redis' RPOPLPUSH command which keeps jobs in Redis.

wiki: 这里

dengqinghua 回复

你单独做了一套任务状态跟踪,我认为这一块可以独立出来的不用和 sidekiq 这个东西耦合。

lithium4010 回复

是的,您说得对。

但是这一块是专门为 sidekiq 定制的,数据结构设计还包括 sidekiq_class 等,这些其实也可以用在 kafka 中。

dengqinghua 回复

是的。Pro 版的代码应该没有开源吧,要想用 RPOPLPUSH 保证更高可用,得自己考虑改下源码。。。。。。

pathbox 回复

嗯,是的。

这个文档解决的问题,是希望耗时的任务 高性能并且状态可跟踪, 跟高可用关系不大,所以没有考虑自己实现 RPOPLPUSH

你的需求不用 hack 也能实现。 需求无非是下面几点

  1. 导入需要高性能
  2. 导出需要高性能
  3. 需要有跟踪报错功能

我的做法是把任务分拆,利用 sidekiq 多个 worker 同时执行任务,将结果合并即可。

实现 1 导出可以利用 find_in_batches, find_each 实现 2 导入可以利用 CSV.foreach 实现 3 跟踪报错可以用一个 model 记录 Job 执行过程中产生的数据和日志即可

# QueryThread用来把一个大量数据的查询拆分成多个小任务
# a = Concurrent::Array.new
# QueryThread.split(Subscription.where("status = ?", "canceled").where("id < 1000"), 5) do |subquery, idx, min_id, max_id|
#   subquery.find_each do |sub|
#     a << [idx, sub.id, min_id, max_id]
#   end
# end
#
class QueryThread
  def initialize(query, min_id, max_id, index, &block)
    @thread = Thread.new do
      subquery = query.where("#{query.table_name}.id >= ? and #{query.table_name}.id <= ?", min_id, max_id)
      block.call(subquery, index, min_id, max_id)
    end
  end

  def join
    @thread.join
  end

  class << self

    def split(query, thread_count, &block)
      arguments_groups = split_to_array(query, thread_count)

      query_threads = []
      arguments_groups.each do |argument|
        query_threads << QueryThread.new(query, argument[:min_id], argument[:max_id], argument[:index], &block)
      end
      query_threads.each(&:join)
    end

   #根据查询语句的最大id和最小id分组
    def split_to_array(query, worker_number)
      total_min_id = query.pluck("min(#{query.table_name}.id)").first.to_i
      total_max_id = query.pluck("max(#{query.table_name}.id)").first.to_i
      split_range(total_min_id, total_max_id, worker_number)
    end

    def split_range(total_min_id, total_max_id, worker_number)
      if total_max_id == 0
        return []
      elsif (total_max_id - total_min_id + 1) < worker_number
        # needn't so much worker
        return [{min_id: total_min_id, max_id: total_max_id, index: 0}]
      end

      range_in_thread = (total_max_id - total_min_id + 1) / worker_number
      result = []
      (worker_number - 1).times.each do |i|
        min_id = total_min_id + i * range_in_thread
        max_id = total_min_id + (i + 1) * range_in_thread - 1
        result << {min_id: min_id, max_id: max_id, index: i}
      end

      # last thread
      min_id = total_min_id + (worker_number - 1) * range_in_thread
      max_id = total_max_id
      result << {min_id: min_id, max_id: max_id, index: worker_number - 1}
      result
    end

  end
end

设计一个 BatchJob 用来做数据导出

class BatchJob < ApplicationJob
  queue_as :default

  class_attribute :query_block, :worker_number, :job_block
  def self.set_query(&block)
    self.query_block = block
  end

  def self.set_worker_number(number = 10)
    self.worker_number = number
  end

  def self.set_job(&block)
    self.job_block = block
  end

  def self.perform_batch_later
    query = query_block.call
    argu_group = QueryThread.split_to_array(query, worker_number || 10)
    argu_group.each do |argus|
      self.perform_later(argus)
    end
  end

  def perform(options = {})
    min_id = options.fetch(:min_id)
    max_id = options.fetch(:max_id)

    q = self.class.query_block.call
    subquery = q.where("#{q.table_name}.id >= ? and #{q.table_name}.id <= ?", min_id, max_id)
    self.class.job_block.call(subquery, options)
  end

end

如下用法,写个 job 继承 BatchJob

  • set_query 里写查询语句,
  • set_worker_number 设定分几次执行(可以同时执行),
  • set_job 里写具体查询到的结果进行逻辑
  • 执行的时候 ExportDataJob.perform_batch_later 即可,以下例子会把查询拆成 40 个 job,可同时导出,最后再根据 Attachment 的数据合并即可。
class ExportDataJob < BatchJob
  set_query {
    Customer.where("updated_at > ?", Time.parse("Tue Apr 3 14:08:33 2018 -0500"))
  }

  set_worker_number 40
  set_job do |query, thread_hash|
    Attachment.upload_csv!(Rails.root.join("tmp/fix_grandfater_#{thread_hash[:index]}.csv"), "wb") do |csv|
      query.includes(:user).find_each(batch_size: 100) do |customer|
        csv << [customer.id, customer.name, ...] #自己写逻辑
      end
    end
  end

end

根据以上原理,可以写出类似的方法做导入

class CsvJob < ApplicationJob
  queue_as :default

  class_attribute :worker_number, :job_block

  def self.set_worker_number(number = 10)
    self.worker_number = number
  end

  def self.set_job(&block)
    self.job_block = block
  end

  # CsvJob.perform_batch_later(123, csv: {headers: true}, job: {store: "123"})
  def self.perform_batch_later(attachment_id, options = {})
    options[:csv] ||= {}
    csv_data = Attachment.find(attachment_id).file.read
    total_lines = CSV.parse(csv_data, options[:csv]).count

    worker_group = QueryThread.split_range(0, total_lines - 1, worker_number || 10)
    worker_group.each do |worker_options|
      self.perform_later(attachment_id, options, worker_options)
    end
  end

  def perform(attachment_id, options = {}, worker_options = {})
    temp_filename = Rails.root.join("tmp/temp_csv_#{attachment_id}_#{options[:index]}.csv")
    File.open(temp_filename, "wb") do |f|
      f.write Attachment.find(attachment_id).file.read
    end

    csv = CSV.foreach(temp_filename, options[:csv])

    self.class.job_block.call csv, worker_options, options[:job]
  end
end

用法如下,

  • set_worker_number 设置分几次完成
  • set_job 设置逻辑,这里手动和 worker_options[:min_id] [:max_id] 做对比 代码丑陋了点。

调用时 参数为含有 csv 文件信息的 Attachment 的 id,以及其他自定义参数 #ImportCustomerJob.perform_batch_later(attachment_id, csv: {headers: true}, job: {store: "stage"})


class ImportCustomerJob < CsvJob

  set_worker_number 20

  set_job do |csv, worker_options, job_options|
    job_options ||= {}

    Attachment.log("import_#{job_options[:store]}_#{worker_options[:index]}") do |logger|
      logger.info "start.."
      index = 0
      csv.each do |row|

        if index < worker_options[:min_id]
          index += 1
          next
        end
        if index > worker_options[:max_id]
          break
        end

        Customer.create_or_update_by!(store: job_options[:store], remote_id: row["remote_id"]) do |c|
          c.attributes = row.to_h
        end

        index += 1
      end
      logger.info "finish.."
    end
  end
end

最后附上 Attachment,代码随便写了写。起个持久化的作用

class Attachment < ApplicationRecord
  mount_uploader :file, S3FileUploader


  def self.upload_file!(filename, name = nil)
    a = Attachment.new
    file = File.open(filename, "r")
    a.file = file
    a.name = name
    a.save!
    a
  ensure
    file.close if file
  end

  def self.upload_csv!(filename, csv_options, &block)
    CSV.open(filename, csv_options) do |csv|
      block.call(csv)
    end
  ensure
    upload_file!(filename)
  end

  # todo add log and upload to s3
  def self.log(name, &block)
    log_file = open(Rails.root.join("tmp/#{name}.log"), "w")
    log_file.sync = true
    logger = Logger.new(log_file)
    logger.formatter = proc{|severity, time, program, msg|  "#{severity}: #{msg}\n" }

    begin
      block.call(logger)
    rescue Exception => e
      logger.error e.message
      logger.error e.backtrace.join("\n")
      raise e
    ensure
      log_file.close
      Attachment.upload_file!(Rails.root.join("tmp/#{name}.log"), name)
    end
  end
end

这样不修改 Sidekiq 就可以完成导入,导出。也不用担心 Thread 的问题。全程有记录,报错的话 Sidekiq 会重启任务(一般是你自己的逻辑有问题),符合你的要求。

yakjuly 回复

感谢分享,我们会参考一下。

我们在设计的时候会考虑几点:

1. Sidekiq Worker不需要去管理job的状态, 也就是并发的任务本身对job状态是无感知的, 也没有状态的概念
2. 工具是通用的, 不仅仅在导出场景, 任何批量多任务处理, 都可以使用.

在新版本的 Rails 中,有 ActiveJob, 里面的 callback 可以做到这些功能,但是我们并未在上面做扩展,因为

  • 我们使用的 Rails 版本太低了 (<5)
  • 我们希望这个更底层一些,所以选择使用了 Sidekiq 的 Middleware

您说的这个也是一个解决方案,但是个人觉得太偏向于某一种场景了,如果换一种场景的话,可能又得再实现一遍; 建议可以抽离出一个插件,做成一个执行引擎,相信从可读性和实用性而言会更好一些。

👏 👏 👏 不错不错 学习

最近使用了 rundeck,rundeck 对每个 job 的一个 execution 提供了详细的信息。execution 还支持回调

大佬,我这边有一个问题,就是我的 sidekiq 并发 5 个以上的任务运行一段时间就会报错:stream closed 然后杀死当前的 work,不会重启该 work。怎么破

需要 登录 后方可回复, 如果你还没有账号请 注册新账号