Ruby RabbitMQ 在手工 ack 的情况下如何实现多线程访问

randoll · 2014年11月27日 · 8145 次阅读

我有个 RabbitMQ 用于做图片的同步队列,为了保证图片能够同步成功,我是用了 ack 标志。 本来一切还好,随着数据量的增加,发现同步的速度非常不理想,服务器资源还很富裕,但是单线程跟不上时代的进步,所以打算多线程来跑。开始用的 bunny 这个包,在试验中发现无论不能打开多个连接的,只能打开多个通道,不过到头来还是单线程在做。 经过百般 google,最后采用了 EventMachine 的形式,以为是多线程了,但观察 netstat 连接发现,其实还是单线程的。 还请各位大侠指导下,如何才能实现多线程并发 我把核心代码贴过来

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')

  channel  = AMQP::Channel.new(connection)
  exchange = channel.direct("uploadfile_exchange", :durable => true)
  q   = channel.queue("file_queue", :durable => true, :auto_delete => false).bind(exchange, :routing_key => "file", :durable => true)

  q.subscribe(:block => true, :manual_ack => true) do |delivery_info, payload|
    begin
      data = JSON.parse payload
      method = data['method']
      target = data['target']
      local_target = @webroot + target
      # 把目标文件数据流化
      file = File.open(local_target, 'r')
      options[:store_path] = target

      # 把结构体实例化
      uploader = Uploader.new(options)

      if method == 'add'
         result = upyun.store!(uploader, file)
        if result != true
          # 追加到队列尾部
          pushToMQ(method, target)
        end
      elsif method == 'delete'
        result = upyun.delete!(uploader, file)
        if result != true
          # 追加到队列尾部upyun
          pushToMQ(method, target)
        end
      end

      # 手工发送ACK应答
      channel.acknowledge(delivery_info.delivery_tag, false)
      file.close

      EventMachine.add_timer(1) do
        connection.close { EventMachine.stop }
        exit
      end


        # debug 时用来打印日志
  #      today = Time.new
  #      File.open('/tmp/consumer.log','a') {|line| 
  #        line.puts today.strftime("%Y-%m-%d %H:%M:%S")
  #        line.puts target
  #      }


    rescue Exception => e
      today = Time.new
      File.open('/tmp/consumerException.log','a') {|line| 
        line.puts today.strftime("%Y-%m-%d %H:%M:%S")
        line.puts e.to_s
        line.puts target
      }
      pushToMQ(method, target)
      # 异常记录日志后手工发送ACK应答,删除队列中的垃圾数据
      channel.acknowledge(delivery_info.delivery_tag, false)
    end

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请 注册新账号