Hi all,
最近尝试把 find_each 方法并行化,因为经常要在数据库里修改错误数据,传统的方式实在效率不高,希望能够并行加快速度。 现在遇到一个问题就是,如果同时有几个 find_each 在跑的话,那必须设法规定每个 find_each 只负责多少范围的数据,这就使得必须有一个可以设置范围的参数。 但是从 find_each/find_in_batches 的实现当中,却只能找到 start 参数,却没有一个与之相对应的 end 参数,(虽然也可以用 batch_size 参数,但是这个参数代表另一个意义,不能乱用)感觉非常奇怪。为什么 Rails 不提供这样的功能呢?难道设置一个 end 参数会导致什么问题嘛? 谢谢
这里贴一下 2.3 的实现:
module ActiveRecord
module Batches # :nodoc:
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def find_each(options = {})
find_in_batches(options) do |records|
records.each { |record| yield record }
end
self
end
def find_in_batches(options = {})
raise "You can't specify an order, it's forced to be #{batch_order}" if options[:order]
raise "You can't specify a limit, it's forced to be the batch_size" if options[:limit]
start = options.delete(:start).to_i
batch_size = options.delete(:batch_size) || 1000
proxy = scoped(options.merge(:order => batch_order, :limit => batch_size))
records = proxy.find(:all, :conditions => [ "#{table_name}.#{primary_key} >= ?", start ])
while records.any?
yield records
break if records.size < batch_size
last_value = records.last.id
raise "You must include the primary key if you define a select" unless last_value.present?
records = proxy.find(:all, :conditions => [ "#{table_name}.#{primary_key} > ?", last_value ])
end
end
private
def batch_order
"#{table_name}.#{primary_key} ASC"
end
end
end
end
3.2 实现也没有什么进步:
require 'active_support/core_ext/object/blank'
module ActiveRecord
module Batches
def find_each(options = {})
find_in_batches(options) do |records|
records.each { |record| yield record }
end
end
def find_in_batches(options = {})
relation = self
unless arel.orders.blank? && arel.taken.blank?
ActiveRecord::Base.logger.warn("Scoped order and limit are ignored, it's forced to be batch order and batch size")
end
if (finder_options = options.except(:start, :batch_size)).present?
raise "You can't specify an order, it's forced to be #{batch_order}" if options[:order].present?
raise "You can't specify a limit, it's forced to be the batch_size" if options[:limit].present?
relation = apply_finder_options(finder_options)
end
start = options.delete(:start).to_i
batch_size = options.delete(:batch_size) || 1000
relation = relation.reorder(batch_order).limit(batch_size)
records = relation.where(table[primary_key].gteq(start)).all
while records.any?
records_size = records.size
primary_key_offset = records.last.id
yield records
break if records_size < batch_size
if primary_key_offset
records = relation.where(table[primary_key].gt(primary_key_offset)).to_a
else
raise "Primary key not included in the custom select clause"
end
end
end
private
def batch_order
"#{quoted_table_name}.#{quoted_primary_key} ASC"
end
end
end