搜索引擎 在 Rails 项目中管理、自定义配置你的 ElasticSearch indexes

pathbox · 2017年03月03日 · 最后由 pathbox 回复于 2017年03月25日 · 8691 次阅读
本帖已被设为精华帖!

这周对 Rails 项目中的 ElasticSearch 进行了总结并写成了文档,觉得有一些内容值得记录和分享。原文连接:超链接 这篇文章主要是在 Rails 项目中对索引,mapping 的设置、管理操作的总结,不涉及搜索方面的内容。 源数据是存在 MySQL 中,ES 的数据是 MySQL 写操作的时候进行了回调同步到 ES 中,这个应该和很多人的使用同步策略一样。

1.Model#mapping

在 model 文件中,使用 mapping 的声明方式。

class Article < ActiveRecord::Base
  include Elasticsearch::Model
  include Elasticsearch::Model::Callbacks
  index_name self.table_name  # 这里可以自定义Article的ES索引名称

  mapping do
    indexes :subject, type: 'multi_field' do
      indexes :raw, index: :not_analyzed
      indexes :tokenized, analyzer: 'ik'
    end
    indexes :content, type: :string, analyzer: 'ik'
  end
end
indexes :subject, type: 'multi_field' do
  indexes :original, index: :not_analyzed
  indexes :tokenized, analyzer: 'ik'
end

使用了 multi_field 定义了两个 indexes, 这样在 ES 中会产生下面的 mapping 结构

"subject": {
   "type": "string",
   "index": "no",
   "fields": {
      "original": {
         "type": "string",
         "index": "not_analyzed"
      },
      "tokenized": {
         "type": "string",
         "analyzer": "ik"
      }
   }
}

对于 MySQL articles 表的 subject 数据,在 ES 的 articles 索引中用了两个 field 来存储 subject.original 和 subject.tokenized 。他们存储的数据是一样的,不同的是 subject.original 没有被分词,subject.tokenized 使用了ik进行了分词。这样做的效果是当想要对 subject 进行全文搜索时,就可以使用 subject.tokenized, 想要对 subject 进行条件过滤的时候,就可以使用 subject.original 了。

Tip: ik 是一个优秀的中文分析器 github 地址

我们知道 include Elasticsearch::Model::Callbacks 这个模块帮我们做了:

def self.included(base)
  base.class_eval do
    after_commit lambda { __elasticsearch__.index_document  },  on: :create
    after_commit lambda { __elasticsearch__.update_document },  on: :update
    after_commit lambda { __elasticsearch__.delete_document },  on: :destroy
  end
end

就是写操作的模块。

我们也可以这样做:

after_commit :create_es_index, on: :create

def create_es_index
  begin
    __elasticsearch__.index_document
  rescue => e
    hash = {}
    hash['article_id'] = self.id
    hash['time'] = Time.now
    hash['error'] = {}
    hash['error']['message']   = e.message
    hash['error']['backtrace'] = e.app_backtrace
    ErrorESMailer.send_error_email(hash).deliver  # send a error email
  end
end

这样,如果有回调的同步 ES 的操作有异常,则会捕获这个异常并且发送 error 的相关信息邮件给开发人员,开发人员可以及时的处理异常。同理 update 和 destroy 操作。

2. Model.import force:true

Article.import force:true

ES 会根据 Article 中 setting 和 mapping 的配置,在 ES 中 构建 articles 的索引 (清空新建索引 + 导入数据),对应的 type 为 article。

下面的三个操作,同样可以创建索引并导入数据

1. Article.__elasticsearch__.create_index! force: true  # 根据mapping和setting 创建articles索引,该索引没有任何数据
2. Article.__elasticsearch__.refresh_index!  # refresh 操作
3. Article.find_in_batches do |articles|   # 批量同步导入MySQL articles的数据
    Article.__elasticsearch__.client.bulk({
      index: 'articles',
      type: 'article',
      body: articles.map do |article|
        {index: {_id: article.id, data: article.as_indexed_json }}
      end
    })
   end

假设,你需要往 ES articles 索引中同步一批数据,这批数据已经被导入到了 MySQL 中,如果数据量不大,使用 import: true 的方法快速导入完。如果数据量比较大,导入耗时很多。还使用 import 的方式导入的话,会导致在 ES articles 被清空后,如果线上有用户在对 articles 的数据进行 ES 的搜索,很有可能会导致没有搜索结果。这时候,你可以使用上面的第三个操作,不需要重建整个 articles 索引。而是往 articles 索引中添加索引数据。

3. as_indexed_json

as_indexed_json 是一个很重要的方法。如果你明白了它,你就能知道 ES 索引中存的 documents 数据是怎样的。我们知道 ES 不是关系型数据库,ES 中的 documents 数据和 MongoDB 的 documents 类似,每个 document 是一个 json。所以,知道索引中 documents 数据存了哪些 fields,你才能更好地结合灵活多变的搜索条件和方式,构造多种多样的搜索情况。

下面我们看as_indexed_json 的源码:

# File 'lib/elasticsearch/model/serializing.rb', line 26
def as_indexed_json(options={})
  # TODO: Play with the `MyModel.indexes` method -- reject non-mapped attributes, `:as` options, etc
  self.as_json(options.merge root: false)
end

这里的 self 其实是 model 的一个 instance。我们可以在 model 中 monkey patch 这个方法。

# article.rb
class Article < ActiveRecord::Base
  include Elasticsearch::Model
  include Elasticsearch::Model::Callbacks
  index_name self.table_name  # 这里可以自定义Article的ES索引名称

  has_many :comments
  has_many :followers

  mapping do
    indexes :subject, type: 'multi_field' do
      indexes :raw, index: :not_analyzed
      indexes :tokenized, analyzer: 'ik'
    end
    indexes :content, type: :string, analyzer: 'ik'
    indexes :created_at, type: :date
  end 

  def as_indexed_json(options={})
    hash = as_json(
      except: [:update_at], 
      methods: [:parse_content],
      include: {
        comments: {only: [:id, :content]},
        followers: {only: [:id]}
      }
    )
    hash.merge!(other_hash)
    hash
  end

  def other_hash
    {title: "My title", owner: "My owner"}
  end

  def parse_content
    "Article: "+ self.content
  end
end

我们重写的as_indexed_json 方法构造了一个 hash 然后返回。except 表示排除某个字段,这个字段不会出现在 hash 的 key 中,methods 表示将某个方法作为 hash 的 key,对应的 value 就是该方法的返回值。include 表示会找相关联的表,并取定义的字段作为 hash 的 key,value 则为该字段的值。

这样,最后得到的 hash 大概是这样的, 比如 Article.first.as_indexed_json:

{
  'id'=> 1,
  'subject' => '这是第一篇文章的主题',
  'content' => '这是第一篇文章的内容',
  'title' => 'My title',
  'owner' => 'My owner',
  'created_at' => 'Tue, 02 Aug 2016 20:40:08 CST +08:00'
  'parse_content' => 'Article: 这是第一篇文章的内容',
  'comments' => [{'id'=> 1, 'content'=> '点赞'},{'id'=> 2, 'content'=> 'good'}],
  'followers' => [{'id'=> 1},{'id'=> 2}]
}

通过这个事例,你应该能明白as_indexed_json 中可以如何构造想要的 hash 数据了。

在什么时候用到 as_indexed_json方法呢?

在 index_document 方法的源码中:

# File 'lib/elasticsearch/model/indexing.rb', line 333
def index_document(options={})
  document = self.as_indexed_json # Hi! I'm here!

  client.index(
    { index: index_name,
      type:  document_type,
      id:    self.id,
      body:  document }.merge(options)
  )
end

这样就明白了,当一个 model instance(一条记录) 从 MySQL 同步到 ES 的时候, 将这条记录转为 hash 结构。其实,as_indexed_json 返回的这个 hash 就是 ES 对应的一条 document。这个过程就是:

MySQL one record => as_indexed_json => ES one document

现在我们明白了as_indexed_json,就可以根据自己的需要,往 ES 索引中存储 documents 数据了。并不是 ES 索引的 mapping field 只能和数据库对应表的字段一样,如果你什么都不做,ES 会用默认的配置帮你做这些事情。如果你想要存储更多的数据字段到 ES 的 document 中进行索引分词,你就需要自己动手做更多的自定义的配置。合理的 documents 数据是 ES 进行高效搜索功能的保证。

4. ES 的 mapping 只能增加

ES 的 mapping 一旦创建,只能增加字段,而不能修改已经 mapping 的字段。

client = Elasticsearch::Model.client
client.indices.put_mapping index: "articles", type: "article", body: {
  article: {
    properties: {
      organization: {
        properties: {
          id:   { type: :integer },
          name: { type: :string, index: :not_analyzed }
        }
      }
    }
  }
}

得到的 mappin 是:

"organization": {
  "properties": {
    "id": {
      "type": "integer"
      },
    "name": {
      "type": "string",
      "analyzer": "not_analyzed"
      }
    }
  }

上面的方法代码可以写成 rails 脚本,使用 rails runner 执行。也可以写成 rake 命令。然后,建议把新增的 mapping field 写在 Article 的 mapping 定义中。让 mapping 定义在代码层面保持显示最完整的定义结构。

5. 设置自定义的 analysis

什么是 analysis、analyzer、tokenizer 和 filter,如果不知道或者有遗忘了,请看ElasticSearch 的官方文档 。官方文档描述的简单清晰,例子也易懂。

在上面的 Article 例子中,我们没有对 analysis 进行设置,这样,ES 会使用默认的 analysis 设置。但是,也许默认的设置并不是我们真正想要的 analysis。我们现在开始自定义 analysis。

# article.rb
class Article < ActiveRecord::Base
  include Elasticsearch::Model
  include Elasticsearch::Model::Callbacks
  index_name self.table_name  # 这里可以自定义Article的ES索引名称

  has_many :comments
  has_many :followers

  settings analysis:{
    analyzer: {
      my_custom_analyzer:{ type: 'custom', tokenizer: 'ngram_tokenizer'}
    },
    tokenizer: {
      ngram_tokenizer: { type: 'nGram', min_gram: 2, max_gram: 3, token_chars: ['lettler, 'digit', 'punctuation']}
    }
} do
    mapping do
      indexes :subject, type: 'multi_field' do
        indexes :raw, index: :not_analyzed
        indexes :tokenized, analyzer: 'ik'
      end
      indexes :content, type: :string, analyzer: 'ik'
      indexes :created_at, type: :date
    end 
  end

  def as_indexed_json(options={})
    hash = as_json(
      except: [:update_at], 
      methods: [:parse_content],
      include: {
        comments: {only: [:id, :content]},
        followers: {only: [:id]}
      }
    )
    hash.merge!(other_hash)
    hash
  end

  def other_hash
    {title: "My title", owner: "My owner"}
  end

  def parse_content
    "Article: "+ self.content
  end
end

自定义的主要代码是这部分:

settings analysis:{
    analyzer: {
      my_custom_analyzer:{ type: 'custom', tokenizer: 'ngram_tokenizer'}
    },
    tokenizer: {
      ngram_tokenizer: { type: 'nGram', min_gram: 2, max_gram: 3, token_chars: ['lettler, 'digit', 'punctuation']}
    }
} 

这里你可能需要倒回来看,我们自定义定义了一个 tokenizer, 取名为 ngram_tokenizer ,type 表示 使用的 tokenizer。我们使用的是 ES built-in 的 nGram tokenizer ,具体配置参数请看它的文档。ES built-in 了不同的 tokenizer,开发人员可以自由选择使用。

我们还定义了一个 analyzer,取名为 my_custom_analyzer。 type custom 表示这个 analyzer 是自定义的。使用了 ngram_tokenizer 这个 tokenizer。 这时你应该发现了,我们使用的就是这里我们自定义的 ngram_tokenizer 。这里我们没有对 filter 进行自定义,看过 ES 文档的朋友,应该知道 filter 也是可以自定义的。

官方的示例:

PUT /my_index
{
    "settings": {
        "analysis": {
            "char_filter": { ... custom character filters ... },
            "tokenizer":   { ...    custom tokenizers     ... },
            "filter":      { ...   custom token filters   ... },
            "analyzer":    { ...    custom analyzers      ... }
        }
    }
}

ElasticSearch 确实是一个优秀的全文搜索引擎。了解和实践更多的 ElasticSearch 的设置和搜索,能够体会到 ElasticSearch 更多的功能。即使看过 ElasticSearch 入门教程的朋友,我觉得 ElasticSearch 的官方文档也是非常值得阅读的。

huacnlee 将本帖设为了精华贴 03月03日 11:02

我记得 Multi fields 的新语法已经不用 type: multi_field 了。

注意 update 回调有坑

我 monkey_patch 了一下 as_index_json, 省得重复写两遍。

class Elasticsearch::Model::Indexing::Mappings
  def to_hash
    { @type.to_sym => @options.merge( properties: @mapping.as_json(except: :as) ) }
  end
end

module Elasticsearch::Model::Serializing::InstanceMethods

  def as_indexed_json(options={})
    build_indexed_json(
      target.class.mappings.instance_variable_get(:@mapping),
      target,
      {id: target.id.to_s}
    ).as_json(options.merge root: false)
  end

private

  def build_indexed_json(mappings, model, json)
    return json unless model.respond_to? :[]

    if model.kind_of? Array
      build_array_json(mappings, model, json)
    else
      build_hash_json(mappings, model, json)
    end

    json
  end

  def build_array_json(mappings, model, json)
    return json unless model.respond_to?(:[]) && json.kind_of?(Array)

    model.each do |elem|
      elem_json = if elem.kind_of? Array then [] else {} end
      json << elem_json
      build_indexed_json(mappings, elem, elem_json)
    end
  end

  def build_hash_json(mappings, model, json)
    return json unless model.respond_to?(:[]) && json.kind_of?(Hash)

    mappings.each_pair do |field, option|

      # Custom transformer
      if option.has_key?(:as) && option[:as].kind_of?(Proc)
        json[field] = target.instance_exec(get_field(model, field), &option[:as])

      # A nested field
      elsif option.has_key?(:properties)
        json[field] = if get_field(model, field).kind_of? Array then [] else {} end
        build_indexed_json(option[:properties], get_field(model, field), json[field])

      # Normal case
      else
        json[field] = get_field(model, field)
      end
    end
  end

  def get_field(model, field_name)
    model.try(:[], field_name) || model.try(field_name)
  end
end
hlcfan 回复

嗯,我用的是 1.4 的 ElasticSearch 和 gem "elasticsearch", '1.0.8'。 在 mode 的 mappingl 中也许还可以这样使用 multi_field 的方式定义,如果是更新索引,直接使用 fields 的方法,官方的一个例子。在 rails 中也是构造这样的 hash 结构。连接 谢谢提醒。 所以,老版本的 ElasticSearch 想要升级到新版本可能会很麻烦,很多语法可能都改了

lithium4010 回复

是否能指明一下这个 “坑”

tassandar 回复

是的。我现在也是这样写

8楼 已删除
pathbox 回复

我是和 mongoid 一起用的,直接更新无法正确更新字段。你可以去看一下 es rails 的源码,我记得是有问题。

lithium4010 回复

原来如此

pathbox ElasticSearch 搜索中文不准的问题 中提及了此贴 04月14日 19:04
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册