Redis 用 Sidekiq 和 Redis 实现不同项目中通信的消息队列

lithium4010 · September 11, 2015 · Last by qq2729877005 replied at December 07, 2018 · 10019 hits

有两个 Rails 项目,爬虫 和 展示,他们共享一个 redis 服务器,爬虫的 sidekiq 配置的默认 redis_db 是 1,展示的 sidekiq 配置的默认 redis_db 是 2。 希望做到当爬虫爬到一个网页后,通知展示项目去跑一个 ReadCrawlerJob 的任务。

有一种做法是在展示中起一个 循环监听,来监听爬虫的 redis 中的一个 list,爬虫每次爬到东西就往 list 里面加。

我希望能把这个过程整合到 sidekiq 里面去,而不用去维护多余的进程,所以想出了下面的办法,测试暂时 OK,不知道会有什么潜在的问题。

1.在 initialize 中 monkey patch 一下 Sidekiq ::Client,参照 sidekiq 在 github 上最新的代码,用 new 取代了 default

class Sidekiq::Client
  class << self
    def default
      new
    end
  end
end

2.配置一个指向 展示 redis_db 的 redis connection_pool

$show_redis_pool = ConnectionPool.new { Redis.new({db: 2}) }

3.在爬虫里面写一个空的 ReadCrawlerJob,类名和队列名要和 展示项目中一致。在 展示 里面实现有功能 ReadCrawlerJob, 并开启 sidekiq。

 爬虫
class ReadCrawlerJob < ActiveJob::Base
  queue_as :read_crawler_job
  def perform(data)
  # 这里啥也不做
  end
end

 展示
class ReadCrawlerJob < ActiveJob::Base
  queue_as :read_crawler_job
  def perform(data)
  # 这里实现功能
  end
end

4.在爬虫抓到一个网页后

old = Thread.current[:sidekiq_via_pool]
Thread.current[:sidekiq_via_pool] = $show_redis_pool
ReadCrawlerJob.perform_later(data)
Thread.current[:sidekiq_via_pool] = old

这样,在爬虫抓到内容后,会向 redis_db 2 的 queue 添加 job,展示项目就可以自动处理了。

直接操作 redis,就能实现共享。写入 redis 的数据,消息队列就可以直接消耗了呢。

给你个例子参考下

1、使用 redis 左入队列 (lpush) 的方式,插入数据

插入队列的名字:zzzz:queue:cpc_for_deals

Redis.current.lpush("zzzz:queue:cpc_for_deals", example_record.to_json)

务必为左入队列!!!

2、具体数据格式如下

example_record = { :retry=>5, :queue=>"cpc_for_deals", :class => "CpcLogsWorker", :args => [deal_id, added_count], :jid => "4f54290c1224e44007a02f49", :enqueued_at => 1427009697.350305 }

其中:

  • retry 表示 重试次数

  • queque 表示 入哪个队列

  • class 表示 sidekiq 处理的 Worker 的名字

  • args 表示 需要处理的数据。第一个参数为 deal_id , 第二个参数为 该 deal 增加的点击数

  • jid 表示 24 位唯一随机数 Ruby 使用 SecureRandom.hex(12) 实现

  • enqueued_at 表示 从 1970.01.01 00:00:00 到现在的毫秒数,注意是带小数点的
    Ruby 中的表示方式为 Time.now.to_f

Redis.current.lpush("zzzz:queue:cpc_for_deals", example_record.to_json)

#2 楼 @ibugs 请教一下 jid 作用什么?如果重复会不会造成什么影响?

Reply to ibugs

sidekiq 是怎么实现这功能的?有具体文档可以看吗?

You need to Sign in before reply, if you don't have an account, please Sign up first.