• 我觉得这一类的数据 在存储方面可以拓展一下。不一定非得传统数据库,S3,DynamoDB,Redshift 都可以考虑。 模糊搜索这个功能在数据量大的情况下,我觉得这个有点难为数据库了。

  • 征集与 Matz 的合影 at 2020年02月13日

    额 这个。。。假如你生日看到一堆人跟你不熟的人的合影照片墙 是啥感觉。这些照片相对来说对 fans 更有意义。

  • 看不懂,你到底是在用 rspec 1 还是 rspec 3, 还有你的问题是什么?

  • 可以用 sidekiq thread,thread 多了,任务满载,cpu 和 memory 容易 100%。 进程也是一样的,不过你可以在不同的服务器上开进程,不用把所有资源都放在一个服务器上,自己控制 threads,把 cpu,memory 控制在合理范围内就好。

  • 如果你的服务器一下子来了很多任务 你都希望尽快完成,那你就需要考虑 auto scaling,我们有个定时任务 定时去 redis 里查 有多少个 key,如果某个 queue 里的 key 增加到一定数量就会触发 增加服务器,启动更多的 sidekiq 进程。Task Count 就是服务器数量, key 少了就减少服务器。自动扩展需要考虑数据库 max connection,设置个服务器上限和下限。

  • @leijun 多起几个进程,每个进程可以有不同的配置,我们每个 docker container 起两个 sidekiq 进程,

    • 一个进程要保证任务成功用了 sidekiq-pro,
    • 另一个进程只读数据库,不需要写,所以可以不用 sidekiq-pro,崩溃了也无所谓,没用 sidekiq-pro 在 redis 效率上应该会更高一点。

    一个进程太多 threads 感觉不是很好,考虑多起几个服务器多几个进程吧。

  • 用户竞拍的时候, 更改 product.current_bidder 用 lock_version,避免多人同时竞拍,每次 after_commit 后 更新 新的竞拍结束时间。 建议 cronjob 查询竞拍结束时间来确认 product 归谁。 因为

    1. Sidekiq API 并不一定是实时的结果,perform_in 也不一定是准时的 2.你忽略了并发的情况。

    cronjob 你可以用一个死循环去检查 product 时间更精准一点,查询到了后要 lock,你可以用 sidekiq scheduler,设置一个 cron job 每过 5 分钟去执行一个检查任务, 检查任务里不停地 while sleep,超过 5 分钟了自动结束。

  • 如何给 Rails 做 health check at 2018年11月13日

    服务是不健康,需要 scale up。但是 container 是在工作的,只是比较忙而已,不应该 kill 掉忙的 container。

  • 如何给 Rails 做 health check at 2018年11月10日

    如果服务器非常忙,你 ping /okcomputer 也可能 timeout

  • 如何给 Rails 做 health check at 2018年11月10日

    我搞混了,scaling 是依赖别的指标,例如 CPU usage,requests count。但是 health check 是根据 ping 返回的状态来决定是不是要 kill container。

  • 一招秒杀 N+1 agg 问题 at 2018年11月02日

    flyerhzm 已经把这个功能做成 gem 了, https://github.com/xinminlabs/eager_group,我用过,很不错。

  • 用 mysql 存储肯定是不靠谱的,应该用 Cassandra,DynamoDB 等适合存储海量信息的分布式数据库,Cassandra 写性能非常好,DynamoDB 易扩展支持 memory cache,对于热数据需要特别处理。

  • bugsnag, 可以设置同一个错误邮件的频率

  • 你的需求不用 hack 也能实现。 需求无非是下面几点

    1. 导入需要高性能
    2. 导出需要高性能
    3. 需要有跟踪报错功能

    我的做法是把任务分拆,利用 sidekiq 多个 worker 同时执行任务,将结果合并即可。

    实现 1 导出可以利用 find_in_batches, find_each 实现 2 导入可以利用 CSV.foreach 实现 3 跟踪报错可以用一个 model 记录 Job 执行过程中产生的数据和日志即可

    # QueryThread用来把一个大量数据的查询拆分成多个小任务
    # a = Concurrent::Array.new
    # QueryThread.split(Subscription.where("status = ?", "canceled").where("id < 1000"), 5) do |subquery, idx, min_id, max_id|
    #   subquery.find_each do |sub|
    #     a << [idx, sub.id, min_id, max_id]
    #   end
    # end
    #
    class QueryThread
      def initialize(query, min_id, max_id, index, &block)
        @thread = Thread.new do
          subquery = query.where("#{query.table_name}.id >= ? and #{query.table_name}.id <= ?", min_id, max_id)
          block.call(subquery, index, min_id, max_id)
        end
      end
    
      def join
        @thread.join
      end
    
      class << self
    
        def split(query, thread_count, &block)
          arguments_groups = split_to_array(query, thread_count)
    
          query_threads = []
          arguments_groups.each do |argument|
            query_threads << QueryThread.new(query, argument[:min_id], argument[:max_id], argument[:index], &block)
          end
          query_threads.each(&:join)
        end
    
       #根据查询语句的最大id和最小id分组
        def split_to_array(query, worker_number)
          total_min_id = query.pluck("min(#{query.table_name}.id)").first.to_i
          total_max_id = query.pluck("max(#{query.table_name}.id)").first.to_i
          split_range(total_min_id, total_max_id, worker_number)
        end
    
        def split_range(total_min_id, total_max_id, worker_number)
          if total_max_id == 0
            return []
          elsif (total_max_id - total_min_id + 1) < worker_number
            # needn't so much worker
            return [{min_id: total_min_id, max_id: total_max_id, index: 0}]
          end
    
          range_in_thread = (total_max_id - total_min_id + 1) / worker_number
          result = []
          (worker_number - 1).times.each do |i|
            min_id = total_min_id + i * range_in_thread
            max_id = total_min_id + (i + 1) * range_in_thread - 1
            result << {min_id: min_id, max_id: max_id, index: i}
          end
    
          # last thread
          min_id = total_min_id + (worker_number - 1) * range_in_thread
          max_id = total_max_id
          result << {min_id: min_id, max_id: max_id, index: worker_number - 1}
          result
        end
    
      end
    end
    

    设计一个 BatchJob 用来做数据导出

    class BatchJob < ApplicationJob
      queue_as :default
    
      class_attribute :query_block, :worker_number, :job_block
      def self.set_query(&block)
        self.query_block = block
      end
    
      def self.set_worker_number(number = 10)
        self.worker_number = number
      end
    
      def self.set_job(&block)
        self.job_block = block
      end
    
      def self.perform_batch_later
        query = query_block.call
        argu_group = QueryThread.split_to_array(query, worker_number || 10)
        argu_group.each do |argus|
          self.perform_later(argus)
        end
      end
    
      def perform(options = {})
        min_id = options.fetch(:min_id)
        max_id = options.fetch(:max_id)
    
        q = self.class.query_block.call
        subquery = q.where("#{q.table_name}.id >= ? and #{q.table_name}.id <= ?", min_id, max_id)
        self.class.job_block.call(subquery, options)
      end
    
    end
    

    如下用法,写个 job 继承 BatchJob

    • set_query 里写查询语句,
    • set_worker_number 设定分几次执行(可以同时执行),
    • set_job 里写具体查询到的结果进行逻辑
    • 执行的时候 ExportDataJob.perform_batch_later 即可,以下例子会把查询拆成 40 个 job,可同时导出,最后再根据 Attachment 的数据合并即可。
    class ExportDataJob < BatchJob
      set_query {
        Customer.where("updated_at > ?", Time.parse("Tue Apr 3 14:08:33 2018 -0500"))
      }
    
      set_worker_number 40
      set_job do |query, thread_hash|
        Attachment.upload_csv!(Rails.root.join("tmp/fix_grandfater_#{thread_hash[:index]}.csv"), "wb") do |csv|
          query.includes(:user).find_each(batch_size: 100) do |customer|
            csv << [customer.id, customer.name, ...] #自己写逻辑
          end
        end
      end
    
    end
    

    根据以上原理,可以写出类似的方法做导入

    class CsvJob < ApplicationJob
      queue_as :default
    
      class_attribute :worker_number, :job_block
    
      def self.set_worker_number(number = 10)
        self.worker_number = number
      end
    
      def self.set_job(&block)
        self.job_block = block
      end
    
      # CsvJob.perform_batch_later(123, csv: {headers: true}, job: {store: "123"})
      def self.perform_batch_later(attachment_id, options = {})
        options[:csv] ||= {}
        csv_data = Attachment.find(attachment_id).file.read
        total_lines = CSV.parse(csv_data, options[:csv]).count
    
        worker_group = QueryThread.split_range(0, total_lines - 1, worker_number || 10)
        worker_group.each do |worker_options|
          self.perform_later(attachment_id, options, worker_options)
        end
      end
    
      def perform(attachment_id, options = {}, worker_options = {})
        temp_filename = Rails.root.join("tmp/temp_csv_#{attachment_id}_#{options[:index]}.csv")
        File.open(temp_filename, "wb") do |f|
          f.write Attachment.find(attachment_id).file.read
        end
    
        csv = CSV.foreach(temp_filename, options[:csv])
    
        self.class.job_block.call csv, worker_options, options[:job]
      end
    end
    

    用法如下,

    • set_worker_number 设置分几次完成
    • set_job 设置逻辑,这里手动和 worker_options[:min_id] [:max_id] 做对比 代码丑陋了点。

    调用时 参数为含有 csv 文件信息的 Attachment 的 id,以及其他自定义参数 #ImportCustomerJob.perform_batch_later(attachment_id, csv: {headers: true}, job: {store: "stage"})

    
    class ImportCustomerJob < CsvJob
    
      set_worker_number 20
    
      set_job do |csv, worker_options, job_options|
        job_options ||= {}
    
        Attachment.log("import_#{job_options[:store]}_#{worker_options[:index]}") do |logger|
          logger.info "start.."
          index = 0
          csv.each do |row|
    
            if index < worker_options[:min_id]
              index += 1
              next
            end
            if index > worker_options[:max_id]
              break
            end
    
            Customer.create_or_update_by!(store: job_options[:store], remote_id: row["remote_id"]) do |c|
              c.attributes = row.to_h
            end
    
            index += 1
          end
          logger.info "finish.."
        end
      end
    end
    

    最后附上 Attachment,代码随便写了写。起个持久化的作用

    class Attachment < ApplicationRecord
      mount_uploader :file, S3FileUploader
    
    
      def self.upload_file!(filename, name = nil)
        a = Attachment.new
        file = File.open(filename, "r")
        a.file = file
        a.name = name
        a.save!
        a
      ensure
        file.close if file
      end
    
      def self.upload_csv!(filename, csv_options, &block)
        CSV.open(filename, csv_options) do |csv|
          block.call(csv)
        end
      ensure
        upload_file!(filename)
      end
    
      # todo add log and upload to s3
      def self.log(name, &block)
        log_file = open(Rails.root.join("tmp/#{name}.log"), "w")
        log_file.sync = true
        logger = Logger.new(log_file)
        logger.formatter = proc{|severity, time, program, msg|  "#{severity}: #{msg}\n" }
    
        begin
          block.call(logger)
        rescue Exception => e
          logger.error e.message
          logger.error e.backtrace.join("\n")
          raise e
        ensure
          log_file.close
          Attachment.upload_file!(Rails.root.join("tmp/#{name}.log"), name)
        end
      end
    end
    

    这样不修改 Sidekiq 就可以完成导入,导出。也不用担心 Thread 的问题。全程有记录,报错的话 Sidekiq 会重启任务(一般是你自己的逻辑有问题),符合你的要求。

  • 查询条件 from = a 下不重复的数据 T.where("from=?", "a").group("to").having("count(*) = 1").pluck("to")

    若 from = a ,to = xx 或者 from = xx , to = a 只算一条的话 T.where("not exist (select * from T t2 where t2.from = t.to and t2.to = t.from) ").group("from, to").having("count(*) = 1").pluck "from, to"

  • @huacnlee 能不能给帖子加一个 “踩” 的功能,有些帖子太辣眼睛。

  • 谨防比特币和区块链骗局 at 2018年01月25日

    👏

  • 多谢!😀

  • 显示器上的灯哪里买的?看起来很不错的样子。

  • <tbody>在外面包一层

  • 第一点:我还没想过 Class.new(User) 可以这么用,很 cool,ActiveType 没看过,看起来可以一试。 第二点,我考虑过使用 context,但是不想把 errors object 搞混淆了,同一个页面需要同时显示 warnings 和 errors 的时候,我不能执行一下 valid?(context),然后再执行一下 valid。

    总体问题就是 ActiveRecord 不能简单的给单个实例 动态添加验证。必须依赖 callbacks 体系, 在 Class 上提前定义好验证。

    其实还有些验证问题和关联表验证有关,我都是用 FormObject 重写验证,这体验挺糟糕的。

    1. 动态设置验证 根据数据自定义 validation,把 validation 设计成数据库存储。 例如:

      company1.configure_validation_on :User, :email, :present
      company2.configure_validation_on :User, :state, :present
      

      那么当一个 User create 的时候,如果 user 关联的是 company1,那么久要验证 email。如果 user 关联的是 company2 那么验证 state. 这些 validation 需要写在数据库里,而不是代码里。那么定义一个 validate: custom_validation 是最简单的,但是无法简单重用 rails 的 ActiveModel::Errors, 用 meta programming 也不合适,因为 validation 被设计成一种 callback。

    2. 除了 Errors 之外多一层 Warnings 老板希望能让用户创建数据时不强制验证,但是创建完后给出 warning,以及在页面上能显示 warning。 那么如果能类似 record.errors 多一层 record.warnings 会很不错.

  • 看 application.js 里面有哪些代码,rails 默认有 turbolink, 可能触发了一次。

  • 估计有 javascript 代码触发了两次请求。 你的代码没法看,找人带一下吧。