Gem sidekiq 并发控制的问题

kingwkb · 2014年11月04日 · 最后由 vkill 回复于 2015年12月04日 · 6181 次阅读

问题是这样,我们使用 sidekiq 完成一些耗时的任务,在这个任务中访问外部资源,在一个任务结束前不能重复使用外部资源

现在我想假如 sidekiq 并发设置为 5,那么我准备 5 个外部资源

现在问题是如何才能让 sidekiq 的并发同时使用这 5 个外部资源

例子如下:

比如有个 API 接口,参数 token 标识一个用户,这个接口规定了在任务开始时候请求一次,并且任务完成前同一个用户不能再次请求,那么现在我使用 5 个 token 对应 sidekiq 的 5 个并发

如何把 5 个 token 分配给 sidekiq 的并发模型呢?

你可以用 redis 实现跨进程分布式锁来解决这个问题。 关于 redis 分布式锁,详见 http://redis.io/topics/distlock

直接使用 redis 的命令实现原理并不复杂,但是蛮繁琐的,好在有一个 gem redis-objects https://github.com/nateware/redis-objects 可以简化操作。

举个例子,假如对某一个 user 有一个长时间处理的任务,希望避免同时执行,就可以用下面的代码实现。

class User
   include Redis::Objects

   lock :large_job, expiration: 120.seconds

   #...

   def execute_large_job
       large_job_lock.lock do 
          # do something slowly
       end
   end

   # ...
end

把执行 execute_large_job 过程放入 sidekiq 处理,sidekiq 本身有重试机制,当出现同时争取执行这个任务时,只有一个可以执行,另一个会抛异常,然后过一段时间之后重试。

希望对你有帮助。

#1 楼 @vincent 非常感谢您的回复,这种方式略显麻烦,我想既然 sidekiq 的并发数量是可以控制的,那么我准备一个列队,这个列队中的资源数量等于 sidekiq 的并发数量,甚至大于 sidekiq 的并发,这里有个问题就是让 sidekiq 每次从这个列队中读取一个资源,保证顺序读就可以了,这种有什么办法实现呢?

@kingwkb 不客气。其实我对你的问题有一些疑惑,我理解应该是两种问题:

  1. 如何保证多线程(以及多进程)环境下对某个资源的互斥使用。 这个问题通过一个分布式锁处理就可以了,redis 很好实现这个机制。

  2. 如何在多线程下对有限个资源的使用,保证每个资源不会被多个线程同时使用。 例如 sidekiq 中对 database connection 的使用,资源有限,线程数多于资源数,通过 ActiveRecord 自带的 connection pool 实现这种方式的资源使用。有一个 gem https://github.com/mperham/connection_pool 就是用于解决这个问题的,作者也是 sidekiq 的作者。

#2 楼 @kingwkb 我觉得自己做队列的方式本质上和 connection_pool 的机制是一样的,不管线程数和资源数是否数量一致,都需要控制获取资源,释放资源的过程,还不如 connection_pool 来得灵活。

#3 楼 @vincent 是的,这 2 种方式都能解决我的问题,对于我的特定问题,使用 1 有些麻烦,不好理解,如果用你说的 2 的方式理解就比较好了

对于你说的 connection pool 看似是可以解决这个问题,我对这个 gem 不熟悉,容许我再请教个问题

假如我有一个数组

%w(111 222 333 444 5555)

如何按照顺序每次读取到一个值,或者用 connection pool 如何实现呢?

#5 楼 @kingwkb 那我就好人做到底吧,你看看下面的代码:

class MyToken
   @tokens = %w(111 222 333 444 5555)

   def self.acquire
      @tokens.pop
   end
end

$token_pool = ConnectionPool.new(size: 5, timeout: 5) { MyToken.acquire }

$token_pool.with do |token|
    # use token
    puts token
end

把每一个 token 作为 connection pool 里一个资源,通过 MyToken.acquire 生成新的资源,这里有一个要注意的问题是,connection pool 的 size 不能超过 tokens 的长度,否则可能引起 pool 里存在空 token 的情况。 另外,不能保证顺序读到 token,但能保证某一个 token 同一时间只被一个线程使用,保证如果 token 不够使用时,申请使用的线程等待执行。

1.如果单纯并发限制那就把 concurrency 设置为最大并发量 2.如果多个任务存在某种逻辑互斥那就用 redis 的锁(分布式的任务本来就讲求低耦合,最好是之间不要有任何关联的,任务间互相影响的成分太多那就是任务分配的有问题) 3.如果需要按照某种顺序执行,干脆把这个序列设置为一个任务,比拆分的执行成本小

#6 楼 @vincent 再次非常感谢,其实我的问题主要在这里

如果使用 @tokens.pop 的话,那么 tokens 里面的值就少了,导致不能循环使用的问题,这些并不是使用一次,

不过你的代码倒是启发我

class MyToken
   @tokens = %w(111 222 333 444 5555)

   def self.acquire
      token = @tokens.shift
      @tokens.push(token)
      token
   end
end

$token_pool = ConnectionPool.new(size: 5, timeout: 5) { MyToken.acquire }

$token_pool.with do |token|
    # use token
    puts token
end

这个 acquire 不知道并发是否能导致问题,还是这里要加个锁?

MyToken.acquire 是资源生成的过程,生产后就加入到 connection pool 内部的队列里了,所以只会调用 5 次,connection pool 在调用 MyToken.acquire 会加锁的。没有必要再 token 放到 @tokens 中了。

#9 楼 @vincent 这样说的话就解决了我的问题,再次感谢哈,等忙完手上的工作测试下,非常非常感谢

需要 登录 后方可回复, 如果你还没有账号请 注册新账号