运维 Sidekiq 队列阻塞监控重启脚本

frech · July 04, 2018 · Last by ForrestDouble replied at July 05, 2018 · 8448 hits

问题

码云 Gitee 在使用 Sidekiq 作为队列任务的时候,时不时遇到队列阻塞。这时候队列任务计划数 ( Scheduled ) 为个位数或 0,重试数 ( Retry ) 为个位数或 0,正在执行的任务数 ( Busy ) 为 0,而已经进入了队列的数目 ( Enqueued ) 一旦阻塞不执行,数量一直增加,达到几十 K,几百 K。正常的情况下,已经进入了队列的数目也就是个位数。检测了一下,sidekiq 相关的所有的 http 请求都已经加上了超时时间,阻塞还是会时不时发生。只能使用重启大法。无奈之下,@zoker 提出了根据这个数量来监控重启 Sidekiq 的想法。

解决方案

参考 Sidekiq 的源码,照葫芦画瓢,写了一个监控的 ruby 脚本,放到 crontab 下两分钟执行一次。Enqueued 数量大于 1000 就重启。也可以放到 Zabbix 进行监控报警。

require 'redis'
require 'redis-namespace'

redis_connection = Redis.new
# namespace 根据需要改写
namespaced_redis = Redis::Namespace.new('resque:gitee', redis: redis_connection)
pipe1_res = namespaced_redis.pipelined do
  namespaced_redis.zcard('schedule'.freeze)
  namespaced_redis.zcard('retry'.freeze)
  namespaced_redis.smembers('processes'.freeze)
  namespaced_redis.smembers('queues'.freeze)
end


pipe2_res = namespaced_redis.pipelined do
  pipe1_res[2].each { |key| namespaced_redis.hget(key, 'busy'.freeze) }
  pipe1_res[3].each { |queue| namespaced_redis.llen("queue:#{queue}") }
end

s = pipe1_res[2].size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued     = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)

stats = {
  scheduled_size:        pipe1_res[0],
  retry_size:            pipe1_res[1],
  busy_size:             workers_size,
  enqueued_size:         enqueued
}

stats.each do |stat, value|
  puts "#{stat}: #{value}"
end

# 根据需要重启
`bundle exec rake sidekiq:restart RAILS_ENV=production` if stats[:enqueued_size] > 1000

执行中的任务会丢吧?

Reply to yfractal

下面三段代码可以回答你的问题。sidekiq 会先设置一段时间让正在执行的任务执行。如果在一定时间内没执行完成的就会强行关掉并重新把任务塞回到 redis 里面,启动的时候执行。

https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/launcher.rb#L37

module Sidekiq
  class Launcher
    # Shuts down the process.  This method does not
    # return until all work is complete and cleaned up.
    # It can take up to the timeout to complete.
    def stop
      deadline = Time.now + @options[:timeout]

      @done = true
      @manager.quiet
      @poller.terminate

      @manager.stop(deadline)

      # Requeue everything in case there was a worker who grabbed work while stopped
      # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
      strategy = (@options[:fetch] || Sidekiq::BasicFetch)
      strategy.bulk_requeue([], @options)

      clear_heartbeat
    end
  end
end

https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/manager.rb#L62

module Sidekiq
  class Manager
    def stop(deadline)
      quiet
      fire_event(:shutdown, reverse: true)

      # some of the shutdown events can be async,
      # we don't have any way to know when they're done but
      # give them a little time to take effect
      sleep PAUSE_TIME
      return if @workers.empty?

      logger.info { "Pausing to allow workers to finish..." }
      remaining = deadline - Time.now
      while remaining > PAUSE_TIME
        return if @workers.empty?
        sleep PAUSE_TIME
        remaining = deadline - Time.now
      end
      return if @workers.empty?

      hard_shutdown
    end
  end
end

https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/manager.rb#L107

module Sidekiq
  class Manager
    def hard_shutdown
      # We've reached the timeout and we still have busy workers.
      # They must die but their jobs shall live on.
      cleanup = nil
      @plock.synchronize do
        cleanup = @workers.dup
      end

      if cleanup.size > 0
        jobs = cleanup.map {|p| p.job }.compact

        logger.warn { "Terminating #{cleanup.size} busy worker threads" }
        logger.warn { "Work still in progress #{jobs.inspect}" }

        # Re-enqueue unfinished jobs
        # NOTE: You may notice that we may push a job back to redis before
        # the worker thread is terminated. This is ok because Sidekiq's
        # contract says that jobs are run AT LEAST once. Process termination
        # is delayed until we're certain the jobs are back in Redis because
        # it is worse to lose a job than to run it twice.
        strategy = (@options[:fetch] || Sidekiq::BasicFetch)
        strategy.bulk_requeue(jobs, @options)
      end

      cleanup.each do |processor|
        processor.kill
      end
    end

  end
end
Reply to hooopo

之前其实没出现过这个问题。自从用了 sendcloud 作为邮件服务器以后才出现的,所以怀疑是 smtp 超时导致的。但是看代码 net/smtp 这个库在初始化的时候已经设置超时时间了。所以也不知道该咋办了。

# Creates a new Net::SMTP object.
#
# +address+ is the hostname or ip address of your SMTP
# server.  +port+ is the port to connect to; it defaults to
# port 25.
#
# This method does not open the TCP connection.  You can use
# SMTP.start instead of SMTP.new if you want to do everything
# at once.  Otherwise, follow SMTP.new with SMTP#start.
#
def initialize(address, port = nil)
  @address = address
  @port = (port || SMTP.default_port)
  @esmtp = true
  @capabilities = nil
  @socket = nil
  @started = false
  @open_timeout = 30
  @read_timeout = 60
  @error_occurred = false
  @debug_output = nil
  @tls = false
  @starttls = false
  @ssl_context = nil
end

如果 http 请求本身会堵塞,那么重启后,此症状还不是会重现吗?

进程/线程数开大,同时超时时间更短?

都什么年代,为什么不用 rabbitmq,异步 qbs 10000 随随便便,redis 不是让你做队列的。为什么这么多人喜欢用,量一大就炸

You need to Sign in before reply, if you don't have an account, please Sign up first.