我有个 RabbitMQ 用于做图片的同步队列,为了保证图片能够同步成功,我是用了 ack 标志。 本来一切还好,随着数据量的增加,发现同步的速度非常不理想,服务器资源还很富裕,但是单线程跟不上时代的进步,所以打算多线程来跑。开始用的 bunny 这个包,在试验中发现无论不能打开多个连接的,只能打开多个通道,不过到头来还是单线程在做。 经过百般 google,最后采用了 EventMachine 的形式,以为是多线程了,但观察 netstat 连接发现,其实还是单线程的。 还请各位大侠指导下,如何才能实现多线程并发 我把核心代码贴过来
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
exchange = channel.direct("uploadfile_exchange", :durable => true)
q = channel.queue("file_queue", :durable => true, :auto_delete => false).bind(exchange, :routing_key => "file", :durable => true)
q.subscribe(:block => true, :manual_ack => true) do |delivery_info, payload|
begin
data = JSON.parse payload
method = data['method']
target = data['target']
local_target = @webroot + target
# 把目标文件数据流化
file = File.open(local_target, 'r')
options[:store_path] = target
# 把结构体实例化
uploader = Uploader.new(options)
if method == 'add'
result = upyun.store!(uploader, file)
if result != true
# 追加到队列尾部
pushToMQ(method, target)
end
elsif method == 'delete'
result = upyun.delete!(uploader, file)
if result != true
# 追加到队列尾部upyun
pushToMQ(method, target)
end
end
# 手工发送ACK应答
channel.acknowledge(delivery_info.delivery_tag, false)
file.close
EventMachine.add_timer(1) do
connection.close { EventMachine.stop }
exit
end
# debug 时用来打印日志
# today = Time.new
# File.open('/tmp/consumer.log','a') {|line|
# line.puts today.strftime("%Y-%m-%d %H:%M:%S")
# line.puts target
# }
rescue Exception => e
today = Time.new
File.open('/tmp/consumerException.log','a') {|line|
line.puts today.strftime("%Y-%m-%d %H:%M:%S")
line.puts e.to_s
line.puts target
}
pushToMQ(method, target)
# 异常记录日志后手工发送ACK应答,删除队列中的垃圾数据
channel.acknowledge(delivery_info.delivery_tag, false)
end