问题是这样,我们使用 sidekiq 完成一些耗时的任务,在这个任务中访问外部资源,在一个任务结束前不能重复使用外部资源
现在我想假如 sidekiq 并发设置为 5,那么我准备 5 个外部资源
现在问题是如何才能让 sidekiq 的并发同时使用这 5 个外部资源
例子如下:
比如有个 API 接口,参数 token 标识一个用户,这个接口规定了在任务开始时候请求一次,并且任务完成前同一个用户不能再次请求,那么现在我使用 5 个 token 对应 sidekiq 的 5 个并发
如何把 5 个 token 分配给 sidekiq 的并发模型呢?
你可以用 redis 实现跨进程分布式锁来解决这个问题。 关于 redis 分布式锁,详见 http://redis.io/topics/distlock。
直接使用 redis 的命令实现原理并不复杂,但是蛮繁琐的,好在有一个 gem redis-objects https://github.com/nateware/redis-objects 可以简化操作。
举个例子,假如对某一个 user 有一个长时间处理的任务,希望避免同时执行,就可以用下面的代码实现。
class User
include Redis::Objects
lock :large_job, expiration: 120.seconds
#...
def execute_large_job
large_job_lock.lock do
# do something slowly
end
end
# ...
end
把执行 execute_large_job 过程放入 sidekiq 处理,sidekiq 本身有重试机制,当出现同时争取执行这个任务时,只有一个可以执行,另一个会抛异常,然后过一段时间之后重试。
希望对你有帮助。
@kingwkb 不客气。其实我对你的问题有一些疑惑,我理解应该是两种问题:
如何保证多线程(以及多进程)环境下对某个资源的互斥使用。 这个问题通过一个分布式锁处理就可以了,redis 很好实现这个机制。
如何在多线程下对有限个资源的使用,保证每个资源不会被多个线程同时使用。 例如 sidekiq 中对 database connection 的使用,资源有限,线程数多于资源数,通过 ActiveRecord 自带的 connection pool 实现这种方式的资源使用。有一个 gem https://github.com/mperham/connection_pool 就是用于解决这个问题的,作者也是 sidekiq 的作者。
#5 楼 @kingwkb 那我就好人做到底吧,你看看下面的代码:
class MyToken
@tokens = %w(111 222 333 444 5555)
def self.acquire
@tokens.pop
end
end
$token_pool = ConnectionPool.new(size: 5, timeout: 5) { MyToken.acquire }
$token_pool.with do |token|
# use token
puts token
end
把每一个 token 作为 connection pool 里一个资源,通过 MyToken.acquire 生成新的资源,这里有一个要注意的问题是,connection pool 的 size 不能超过 tokens 的长度,否则可能引起 pool 里存在空 token 的情况。 另外,不能保证顺序读到 token,但能保证某一个 token 同一时间只被一个线程使用,保证如果 token 不够使用时,申请使用的线程等待执行。
1.如果单纯并发限制那就把 concurrency 设置为最大并发量 2.如果多个任务存在某种逻辑互斥那就用 redis 的锁(分布式的任务本来就讲求低耦合,最好是之间不要有任何关联的,任务间互相影响的成分太多那就是任务分配的有问题) 3.如果需要按照某种顺序执行,干脆把这个序列设置为一个任务,比拆分的执行成本小
#6 楼 @vincent 再次非常感谢,其实我的问题主要在这里
如果使用 @tokens.pop 的话,那么 tokens 里面的值就少了,导致不能循环使用的问题,这些并不是使用一次,
不过你的代码倒是启发我
class MyToken
@tokens = %w(111 222 333 444 5555)
def self.acquire
token = @tokens.shift
@tokens.push(token)
token
end
end
$token_pool = ConnectionPool.new(size: 5, timeout: 5) { MyToken.acquire }
$token_pool.with do |token|
# use token
puts token
end
这个 acquire 不知道并发是否能导致问题,还是这里要加个锁?
MyToken.acquire
是资源生成的过程,生产后就加入到 connection pool 内部的队列里了,所以只会调用 5 次,connection pool 在调用 MyToken.acquire 会加锁的。没有必要再 token 放到 @tokens 中了。