搜索引擎 在 Rails 项目中管理、自定义配置你的 ElasticSearch indexes

pathbox · 2017年03月03日 · 最后由 pathbox 回复于 2017年03月25日 · 10200 次阅读
本帖已被管理员设置为精华贴

这周对 Rails 项目中的 ElasticSearch 进行了总结并写成了文档,觉得有一些内容值得记录和分享。原文连接:超链接 这篇文章主要是在 Rails 项目中对索引,mapping 的设置、管理操作的总结,不涉及搜索方面的内容。 源数据是存在 MySQL 中,ES 的数据是 MySQL 写操作的时候进行了回调同步到 ES 中,这个应该和很多人的使用同步策略一样。

1.Model#mapping

在 model 文件中,使用 mapping 的声明方式。

class Article < ActiveRecord::Base
  include Elasticsearch::Model
  include Elasticsearch::Model::Callbacks
  index_name self.table_name  # 这里可以自定义Article的ES索引名称

  mapping do
    indexes :subject, type: 'multi_field' do
      indexes :raw, index: :not_analyzed
      indexes :tokenized, analyzer: 'ik'
    end
    indexes :content, type: :string, analyzer: 'ik'
  end
end
indexes :subject, type: 'multi_field' do
  indexes :original, index: :not_analyzed
  indexes :tokenized, analyzer: 'ik'
end

使用了 multi_field 定义了两个 indexes,这样在 ES 中会产生下面的 mapping 结构

"subject": {
   "type": "string",
   "index": "no",
   "fields": {
      "original": {
         "type": "string",
         "index": "not_analyzed"
      },
      "tokenized": {
         "type": "string",
         "analyzer": "ik"
      }
   }
}

对于 MySQL articles 表的 subject 数据,在 ES 的 articles 索引中用了两个 field 来存储 subject.original 和 subject.tokenized。他们存储的数据是一样的,不同的是 subject.original 没有被分词,subject.tokenized 使用了ik进行了分词。这样做的效果是当想要对 subject 进行全文搜索时,就可以使用 subject.tokenized,想要对 subject 进行条件过滤的时候,就可以使用 subject.original 了。

Tip: ik 是一个优秀的中文分析器 github 地址

我们知道 include Elasticsearch::Model::Callbacks 这个模块帮我们做了:

def self.included(base)
  base.class_eval do
    after_commit lambda { __elasticsearch__.index_document  },  on: :create
    after_commit lambda { __elasticsearch__.update_document },  on: :update
    after_commit lambda { __elasticsearch__.delete_document },  on: :destroy
  end
end

就是写操作的模块。

我们也可以这样做:

after_commit :create_es_index, on: :create

def create_es_index
  begin
    __elasticsearch__.index_document
  rescue => e
    hash = {}
    hash['article_id'] = self.id
    hash['time'] = Time.now
    hash['error'] = {}
    hash['error']['message']   = e.message
    hash['error']['backtrace'] = e.app_backtrace
    ErrorESMailer.send_error_email(hash).deliver  # send a error email
  end
end

这样,如果有回调的同步 ES 的操作有异常,则会捕获这个异常并且发送 error 的相关信息邮件给开发人员,开发人员可以及时的处理异常。同理 update 和 destroy 操作。

2. Model.import force:true

Article.import force:true

ES 会根据 Article 中 setting 和 mapping 的配置,在 ES 中 构建 articles 的索引 (清空新建索引 + 导入数据),对应的 type 为 article。

下面的三个操作,同样可以创建索引并导入数据

1. Article.__elasticsearch__.create_index! force: true  # 根据mapping和setting 创建articles索引,该索引没有任何数据
2. Article.__elasticsearch__.refresh_index!  # refresh 操作
3. Article.find_in_batches do |articles|   # 批量同步导入MySQL articles的数据
    Article.__elasticsearch__.client.bulk({
      index: 'articles',
      type: 'article',
      body: articles.map do |article|
        {index: {_id: article.id, data: article.as_indexed_json }}
      end
    })
   end

假设,你需要往 ES articles 索引中同步一批数据,这批数据已经被导入到了 MySQL 中,如果数据量不大,使用 import: true 的方法快速导入完。如果数据量比较大,导入耗时很多。还使用 import 的方式导入的话,会导致在 ES articles 被清空后,如果线上有用户在对 articles 的数据进行 ES 的搜索,很有可能会导致没有搜索结果。这时候,你可以使用上面的第三个操作,不需要重建整个 articles 索引。而是往 articles 索引中添加索引数据。

3. as_indexed_json

as_indexed_json 是一个很重要的方法。如果你明白了它,你就能知道 ES 索引中存的 documents 数据是怎样的。我们知道 ES 不是关系型数据库,ES 中的 documents 数据和 MongoDB 的 documents 类似,每个 document 是一个 json。所以,知道索引中 documents 数据存了哪些 fields,你才能更好地结合灵活多变的搜索条件和方式,构造多种多样的搜索情况。

下面我们看as_indexed_json 的源码:

# File 'lib/elasticsearch/model/serializing.rb', line 26
def as_indexed_json(options={})
  # TODO: Play with the `MyModel.indexes` method -- reject non-mapped attributes, `:as` options, etc
  self.as_json(options.merge root: false)
end

这里的 self 其实是 model 的一个 instance。我们可以在 model 中 monkey patch 这个方法。

# article.rb
class Article < ActiveRecord::Base
  include Elasticsearch::Model
  include Elasticsearch::Model::Callbacks
  index_name self.table_name  # 这里可以自定义Article的ES索引名称

  has_many :comments
  has_many :followers

  mapping do
    indexes :subject, type: 'multi_field' do
      indexes :raw, index: :not_analyzed
      indexes :tokenized, analyzer: 'ik'
    end
    indexes :content, type: :string, analyzer: 'ik'
    indexes :created_at, type: :date
  end 

  def as_indexed_json(options={})
    hash = as_json(
      except: [:update_at], 
      methods: [:parse_content],
      include: {
        comments: {only: [:id, :content]},
        followers: {only: [:id]}
      }
    )
    hash.merge!(other_hash)
    hash
  end

  def other_hash
    {title: "My title", owner: "My owner"}
  end

  def parse_content
    "Article: "+ self.content
  end
end

我们重写的as_indexed_json 方法构造了一个 hash 然后返回。except 表示排除某个字段,这个字段不会出现在 hash 的 key 中,methods 表示将某个方法作为 hash 的 key,对应的 value 就是该方法的返回值。include 表示会找相关联的表,并取定义的字段作为 hash 的 key,value 则为该字段的值。

这样,最后得到的 hash 大概是这样的,比如 Article.first.as_indexed_json:

{
  'id'=> 1,
  'subject' => '这是第一篇文章的主题',
  'content' => '这是第一篇文章的内容',
  'title' => 'My title',
  'owner' => 'My owner',
  'created_at' => 'Tue, 02 Aug 2016 20:40:08 CST +08:00'
  'parse_content' => 'Article: 这是第一篇文章的内容',
  'comments' => [{'id'=> 1, 'content'=> '点赞'},{'id'=> 2, 'content'=> 'good'}],
  'followers' => [{'id'=> 1},{'id'=> 2}]
}

通过这个事例,你应该能明白as_indexed_json 中可以如何构造想要的 hash 数据了。

在什么时候用到 as_indexed_json方法呢?

在 index_document 方法的源码中:

# File 'lib/elasticsearch/model/indexing.rb', line 333
def index_document(options={})
  document = self.as_indexed_json # Hi! I'm here!

  client.index(
    { index: index_name,
      type:  document_type,
      id:    self.id,
      body:  document }.merge(options)
  )
end

这样就明白了,当一个 model instance(一条记录) 从 MySQL 同步到 ES 的时候,将这条记录转为 hash 结构。其实,as_indexed_json 返回的这个 hash 就是 ES 对应的一条 document。这个过程就是:

MySQL one record => as_indexed_json => ES one document

现在我们明白了as_indexed_json,就可以根据自己的需要,往 ES 索引中存储 documents 数据了。并不是 ES 索引的 mapping field 只能和数据库对应表的字段一样,如果你什么都不做,ES 会用默认的配置帮你做这些事情。如果你想要存储更多的数据字段到 ES 的 document 中进行索引分词,你就需要自己动手做更多的自定义的配置。合理的 documents 数据是 ES 进行高效搜索功能的保证。

4. ES 的 mapping 只能增加

ES 的 mapping 一旦创建,只能增加字段,而不能修改已经 mapping 的字段。

client = Elasticsearch::Model.client
client.indices.put_mapping index: "articles", type: "article", body: {
  article: {
    properties: {
      organization: {
        properties: {
          id:   { type: :integer },
          name: { type: :string, index: :not_analyzed }
        }
      }
    }
  }
}

得到的 mappin 是:

"organization": {
  "properties": {
    "id": {
      "type": "integer"
      },
    "name": {
      "type": "string",
      "analyzer": "not_analyzed"
      }
    }
  }

上面的方法代码可以写成 rails 脚本,使用 rails runner 执行。也可以写成 rake 命令。然后,建议把新增的 mapping field 写在 Article 的 mapping 定义中。让 mapping 定义在代码层面保持显示最完整的定义结构。

5. 设置自定义的 analysis

什么是 analysis、analyzer、tokenizer 和 filter,如果不知道或者有遗忘了,请看ElasticSearch 的官方文档 。官方文档描述的简单清晰,例子也易懂。

在上面的 Article 例子中,我们没有对 analysis 进行设置,这样,ES 会使用默认的 analysis 设置。但是,也许默认的设置并不是我们真正想要的 analysis。我们现在开始自定义 analysis。

# article.rb
class Article < ActiveRecord::Base
  include Elasticsearch::Model
  include Elasticsearch::Model::Callbacks
  index_name self.table_name  # 这里可以自定义Article的ES索引名称

  has_many :comments
  has_many :followers

  settings analysis:{
    analyzer: {
      my_custom_analyzer:{ type: 'custom', tokenizer: 'ngram_tokenizer'}
    },
    tokenizer: {
      ngram_tokenizer: { type: 'nGram', min_gram: 2, max_gram: 3, token_chars: ['lettler, 'digit', 'punctuation']}
    }
} do
    mapping do
      indexes :subject, type: 'multi_field' do
        indexes :raw, index: :not_analyzed
        indexes :tokenized, analyzer: 'ik'
      end
      indexes :content, type: :string, analyzer: 'ik'
      indexes :created_at, type: :date
    end 
  end

  def as_indexed_json(options={})
    hash = as_json(
      except: [:update_at], 
      methods: [:parse_content],
      include: {
        comments: {only: [:id, :content]},
        followers: {only: [:id]}
      }
    )
    hash.merge!(other_hash)
    hash
  end

  def other_hash
    {title: "My title", owner: "My owner"}
  end

  def parse_content
    "Article: "+ self.content
  end
end

自定义的主要代码是这部分:

settings analysis:{
    analyzer: {
      my_custom_analyzer:{ type: 'custom', tokenizer: 'ngram_tokenizer'}
    },
    tokenizer: {
      ngram_tokenizer: { type: 'nGram', min_gram: 2, max_gram: 3, token_chars: ['lettler, 'digit', 'punctuation']}
    }
} 

这里你可能需要倒回来看,我们自定义定义了一个 tokenizer,取名为 ngram_tokenizer ,type 表示 使用的 tokenizer。我们使用的是 ES built-in 的 nGram tokenizer ,具体配置参数请看它的文档。ES built-in 了不同的 tokenizer,开发人员可以自由选择使用。

我们还定义了一个 analyzer,取名为 my_custom_analyzer。type custom 表示这个 analyzer 是自定义的。使用了 ngram_tokenizer 这个 tokenizer。这时你应该发现了,我们使用的就是这里我们自定义的 ngram_tokenizer 。这里我们没有对 filter 进行自定义,看过 ES 文档的朋友,应该知道 filter 也是可以自定义的。

官方的示例:

PUT /my_index
{
    "settings": {
        "analysis": {
            "char_filter": { ... custom character filters ... },
            "tokenizer":   { ...    custom tokenizers     ... },
            "filter":      { ...   custom token filters   ... },
            "analyzer":    { ...    custom analyzers      ... }
        }
    }
}

ElasticSearch 确实是一个优秀的全文搜索引擎。了解和实践更多的 ElasticSearch 的设置和搜索,能够体会到 ElasticSearch 更多的功能。即使看过 ElasticSearch 入门教程的朋友,我觉得 ElasticSearch 的官方文档也是非常值得阅读的。

huacnlee 将本帖设为了精华贴。 03月03日 11:02

我记得 Multi fields 的新语法已经不用 type: multi_field 了。

注意 update 回调有坑

我 monkey_patch 了一下 as_index_json, 省得重复写两遍。

class Elasticsearch::Model::Indexing::Mappings
  def to_hash
    { @type.to_sym => @options.merge( properties: @mapping.as_json(except: :as) ) }
  end
end

module Elasticsearch::Model::Serializing::InstanceMethods

  def as_indexed_json(options={})
    build_indexed_json(
      target.class.mappings.instance_variable_get(:@mapping),
      target,
      {id: target.id.to_s}
    ).as_json(options.merge root: false)
  end

private

  def build_indexed_json(mappings, model, json)
    return json unless model.respond_to? :[]

    if model.kind_of? Array
      build_array_json(mappings, model, json)
    else
      build_hash_json(mappings, model, json)
    end

    json
  end

  def build_array_json(mappings, model, json)
    return json unless model.respond_to?(:[]) && json.kind_of?(Array)

    model.each do |elem|
      elem_json = if elem.kind_of? Array then [] else {} end
      json << elem_json
      build_indexed_json(mappings, elem, elem_json)
    end
  end

  def build_hash_json(mappings, model, json)
    return json unless model.respond_to?(:[]) && json.kind_of?(Hash)

    mappings.each_pair do |field, option|

      # Custom transformer
      if option.has_key?(:as) && option[:as].kind_of?(Proc)
        json[field] = target.instance_exec(get_field(model, field), &option[:as])

      # A nested field
      elsif option.has_key?(:properties)
        json[field] = if get_field(model, field).kind_of? Array then [] else {} end
        build_indexed_json(option[:properties], get_field(model, field), json[field])

      # Normal case
      else
        json[field] = get_field(model, field)
      end
    end
  end

  def get_field(model, field_name)
    model.try(:[], field_name) || model.try(field_name)
  end
end
hlcfan 回复

嗯,我用的是 1.4 的 ElasticSearch 和 gem "elasticsearch", '1.0.8'。在 mode 的 mappingl 中也许还可以这样使用 multi_field 的方式定义,如果是更新索引,直接使用 fields 的方法,官方的一个例子。在 rails 中也是构造这样的 hash 结构。连接 谢谢提醒。所以,老版本的 ElasticSearch 想要升级到新版本可能会很麻烦,很多语法可能都改了

lithium4010 回复

是否能指明一下这个“坑”

tassandar 回复

是的。我现在也是这样写

8 楼 已删除
pathbox 回复

我是和 mongoid 一起用的,直接更新无法正确更新字段。你可以去看一下 es rails 的源码,我记得是有问题。

lithium4010 回复

原来如此

pathbox ElasticSearch 搜索中文不准的问题 提及了此话题。 04月14日 19:04
需要 登录 后方可回复, 如果你还没有账号请 注册新账号