Erlang/Elixir [译] 使用 Elixir OTP 实现 MapReduce

gyorou · 2016年11月22日 · 最后由 gyorou 回复于 2016年12月10日 · 3943 次阅读
本帖已被管理员设置为精华贴

最近开始学习 Elixir。顺带推荐一下几本书。

有 Safaribooks 的账号其实这些书都可以随便看。公司没福利的话个人账号一个月其实也没多少钱。

下面正文。


使用 Elixir OTP 实现 MapReduce

原文

Elixir 给我们提供了使用多进程实现 application 的框架 OTP(实际是 Erlang 的东西)。使用 OTP 可以实现进程间的消息交换,当通信出现错误时候的重启,以及进程状态管理。

MapReduce

Apache Lucene

下面介绍一下利用 MapReduce 机制的一些软件。 Lucene 是基于 Java 的文档搜索引擎,在 ElasticSearch 的内部也用到了 MapReduce。 Lucene 使用 MapReduce 的机制生成单词以及出现位置的索引。(据说 Lucene 的作者最早是使用 Lisp 实现的)。 (简单起见,这次我们使用词法分析完毕的文档)

map 函数

下面是 map 的简单的例子。

Enum.map([1,2,3,4,5], fn(elem) -> elem * elem end)
#=> [1, 4, 9, 16, 25]

这次的例子中,我们对于多个文档,使用将其变换为"文档编号,单词,单词索引"的 map 函数。 假设我们已经有了下面这种形式的词法分析完毕的文档。

{:doc1, "ワタシ ハ エリクサー チョット デキル"}

(doc1 作为文档的 ID,各个文档的 ID 都是独一无二的。) 我们将其变换为如下的"文档编号,单词,单词索引"的形式。

[{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}]

reduce 函数

reduce 函数接受多个元素以及一个函数,虽然也是将参数带入函数,不过一开始是针对起始的两个元素,接下来将前面对应的结果以及第三个元素带入函数,然后是其结果和第四个元素带入,然后第五个,这样逐渐带入之后元素层层减少最终返回结果。 reduce 函数的简单的例子如下

Enum.reduce([1,2,3,4,5], 0, fn(elem, acc) -> elem + acc end) # 第二个参数是初始值
#=> 15

这次的例子中,对于刚刚经过 map 函数处理过的文档,我们使用 reduce 函数进行统合。将 map 函数变换过的多个文档

[
[{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2}, {:doc1, "チョット", 3}, {:doc1, "デキル", 4}],
[{:doc2, "ワタシ", 0}, {:doc2, "ハ", 1}, {:doc2, "ルビー", 2}, {:doc2, "チョット", 3}, {:doc2, "デキル", 4}]
]

转换成如下形式

%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3], "デキル" => [doc1: 4, doc2: 4],
  "ハ" => [doc1: 1, doc2: 1], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0]}

到这里我们就获得了文档索引,通过检索单词就可以获得出现该单词的文档名 (文档 ID) 以及相应的出现的位置。

使用 OTP 进行 MapReduce

单线程实现

首先不使用 OTP 来实现。

defmodule MapReduce do
  # Application入口  
  def entry do
    doc1 = {:doc1, "ワタシ ハ エリクサー チョット デキル"}
    doc2 = {:doc2, "ワタシ ハ ルビー チョット デキル"}

    [doc1, doc2]
    |> Enum.map(fn(doc) -> MapReduce.word_map(doc) end) # 使用word_map函数实现map
    |> Enum.reduce(%{}, fn(items, acc) -> MapReduce.invert_array(items, acc) end) # 使用invert_array实现reduce。
  end

  # 作为map函数的参数的函数。各个单词将变换成{:doc1, "ワタシ", 0}的形式。
  def word_map({doc_id, words}) do
    String.split(words)
    |> Enum.with_index
    |> Enum.map(fn(tup) -> Tuple.insert_at(tup, 0, doc_id) end)
  end

  # 作为reduce函数的参数的函数。将上面word_map的结果进行减缩。
  # 形成%{"ワタシ" => [doc1: 0, doc2: 0], ....}的字典表。
  def invert_array([{doc_id, word, index} | tail], shuffle_map) do
    invert_array(tail, Map.put(shuffle_map, word, [{doc_id, index} | Map.get(shuffle_map, word, [])]))
  end
  def invert_array([], shuffle_map), do: shuffle_map  
end

IO.inspect MapReduce.entry

输出结果

%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3], "デキル" => [doc1: 4, doc2: 4],
  "ハ" => [doc1: 1, doc2: 1], "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0]}

出现错误的时候怎么办?

这次的 sample 相对比较少,出错的话大不了把程序再跑一遍就是了。但是要是要处理的文档很多,那就没这么简单了。

我们想要这么一种机制,将 map 函数生成的"文档编号,单词,单词索引"的形式作为状态保存下来,当某个文档处理出现问题的时候,我们仅仅重启处理那个文档的进程,这样就可以继续处理剩下的文档。

要实现这种机制,我们就需要在文档变换处理结束后将其状态保存,以及实现进程的监视来对应出错时候进程的重启。 为了实现该机制,我们使用 OTP 中的 GenServer 和 Supervisor 来实现。 下面是对这两个东西的简单说明。

名称 概要
GenServer 用来实现能够保持进程状态的 Application 的模块
Supervisor 用来监视进程,提供出错重启的机制

具体请参考 Elixir 的官网上的 GenServer 以及 Supervisor 的说明。

Supervision Tree

接下来我们按照 Supervisor 的原理来构建 MapReduce 中监控各个处理,保存状态,的进程之间的上下关系表。

名称 概要
GenServer 用来实现能够保持进程状态的 Application 的模块
Supervisor 用来监视进程,提供出错重启的机制
MapServer 负责将文档转换成"文档编号,单词,单词索引"的形式的进程。处理结束之后将结果保存在 Stash 中。(因此需要其持有 stash 的进程号),这个进程会启动多个进程
ReduceSupervisor 负责监视 ReduceServer 的进程。设置 ReduceServer 的重启策略
ReduceServer 负责从 Stash 中获取通过 map 变换得到的文档,然后将其变换成倒置矩阵的进程 (因为要从 Stash 获取数据所以持有 Stash 的进程号)

Supervisor

把先启动的 Stash 的进程号传递给 MapSupervisor 和 ReduceSupervisor。

defmodule MapReduce.Supervisor do
  use Supervisor

  def start_link do
    result = {:ok, sup } = Supervisor.start_link(__MODULE__, [])
    {:ok, stash_pid} =
      Supervisor.start_child(sup, worker(MapReduce.Stash, []))
    Supervisor.start_child(sup, supervisor(MapReduce.MapSupervisor, [stash_pid]))
    Supervisor.start_child(sup, supervisor(MapReduce.ReduceServer, [stash_pid]))
    result
  end

  def init(_) do
    supervise [], strategy: :one_for_one
  end
end

MapSupervisor, ReduceSupervisor

将从 Supervisor 获取的 Stash 的进程号传递给各自启动的子进程。

MapSupervisor

defmodule MapReduce.MapSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    {:ok, _pid} = Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(MapReduce.MapServer, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end
end

ReduceSupervisor

defmodule Mr.ReduceSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    {:ok, _pid} = Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(MapReduce.ReduceServer, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end
end

Stash

为了保存 MapServer 变换结果的文档我们使用 GenServer。 我们这里定义了供其进程调用的save_document以及get_document两个函数,对于这两个函数的 callback 我们需要自己进行实现。 只通过实现 callback 函数就能实现进程间的通信了。 这个进程在启动的时候会建立一个空的 list,当其他的进程调用save_document的时候就会往这个 list 中追加 data。

defmodule MapReduce.Stash do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, []) # 启动时候带一个空的list,之后往这里追加文档
  end

  def save_document(pid, indexing_doc) do
    GenServer.cast pid, {:save_document, indexing_doc}
  end

  def get_documents(pid) do
    GenServer.call pid, :get_documents
  end

  #####
  # GenServer implementation

  def handle_cast({:save_document, indexing_doc}, docs) do
    {:noreply, [indexing_doc|docs]} # 这里的docs是启动时候定义的list,将会往这里追加元素
  end

  def handle_call(:get_documents, _from, docs) do
    # 这离的docs同样是启动时候的那个list,这里的tuple的第二个元素是返回给调用者的消息,第三个元素是继续保存在进程中的list(状态)
    {:reply, docs, docs}
  end
end

MapServer

这个进程需要保持启动时候从 Stash 获取的进程号所以使用 Genserver。 实际上对文档进行变换的函数 (word_map) 我们放在别的文件中定义。为了将文档变换的结果保存到 Stash 中,我们调用Stash.save_document

defmodule MapReduce.MapServer do
  use GenServer

  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  def make_indexing_document(words_list) do
    GenServer.cast(__MODULE__, {:make_indexing_document, words_list})
  end

    # 在这里开始进行处理
  def make_indexing_documents(words_list) when is_list(words_list) do
    words_list
    |> Enum.map(&(Task.async(fn -> make_indexing_document(&1) end)))
    |> Enum.map(&Task.await/1)
  end

  def get_documents do
    GenServer.call(__MODULE__, :get_documents)
  end

  def handle_cast({:make_indexing_document, words}, stash_pid) do
    word_map(stash_pid, words)
    {:noreply, stash_pid}
  end

  def handle_call(:get_documents, _from, stash_pid) do
    {:reply, MapReduce.Stash.get_documents(stash_pid), stash_pid}
  end

  # 在这里进行文档的变换,将结果保存到Stash中。
  defp word_map(stash_pid, {doc_id, words}) do
    indexing_doc = MapReduce.WorkerFunction.word_map({doc_id, words})
    :ok = MapReduce.Stash.save_document(stash_pid, indexing_doc)
  end
end

ReduceServer

因为需要从 Stash 中获取经过 MapServer 变换的文档,所以我们需要继续保存 Stash 的进程号。 统合成倒置矩阵的函数我们在别的文件中定义。 在函数处理之前我们调用Stash.get_documents(stash_pid)获取要处理的文档。

defmodule MapReduce.ReduceServer do
  use GenServer

  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  # 从这里开始处理
  def make_inverted_array do
    GenServer.call(__MODULE__, :make_inverted_array)
  end

  # 在这里将从map变换来的文档变换成倒置矩阵。
  # 处理需要从Stash进程中获取文档的list
  def handle_call(:make_inverted_array, _from, stash_pid) do
    indexing_docs = MapReduce.Stash.get_documents(stash_pid)
    {:reply, invert_array(indexing_docs), stash_pid }
  end

  def invert_array(indexing_docs) do
    MapReduce.WorkerFunction.invert_array(indexing_docs)
  end
end

word_map, invert_array 的实现

我们在另一个文件中实现了这些函数。和之前单进程的例子几乎一样。

defmodule MapReduce.WorkerFunction do
  def word_map({doc_id, words}) do
    String.split(words)
    |> Enum.with_index
    |> Enum.map(fn(tup) -> Tuple.insert_at(tup, 0, doc_id) end)
  end

  def invert_array(items) do
    items
    |> Enum.reduce(%{}, fn(item, acc) -> invert_array(item, acc) end)
  end

  defp invert_array([{doc_id, word, index} | tail], shuffle_map) do
    invert_array(tail, Map.put(shuffle_map, word, [{doc_id, index} | Map.get(shuffle_map, word, [])]))
  end

  defp invert_array([], shuffle_map), do: shuffle_map
end

执行结果

因为用的例子和单进程实现相同所以其实并没什么意义,但是总之先跑一下看看。 首先在项目根目录启动 REPL(IEx),然后准备好数据。

iex -S mix
Erlang/OTP 18 [erts-7.1] 1 [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> doc1 = {:doc1, "ワタシ ハ エリクサー チョット デキル"}
{:doc1, "ワタシ ハ エリクサー チョット デキル"}
iex(2)> doc2 = {:doc2, "ワタシ ハ ルビー チョット デキル"}
{:doc2, "ワタシ ハ ルビー チョット デキル"}
iex(3)> doc3 = {:doc3, "ワタシ ハ リナックス チョット デキル"}
{:doc3, "ワタシ ハ リナックス チョット デキル"}

接下来进行 Map 处理。可以看到追加的数据被存放到了 list 中。

iex(4)> MapReduce.MapServer.make_indexing_documents [doc1, doc2, doc3]
iex(5)> MapReduce.MapServer.get_documents
[[{:doc3, "ワタシ", 0}, {:doc3, "ハ", 1}, {:doc3, "リナックス", 2},
  {:doc3, "チョット", 3}, {:doc3, "デキル", 4}],
 [{:doc2, "ワタシ", 0}, {:doc2, "ハ", 1}, {:doc2, "ルビー", 2},
  {:doc2, "チョット", 3}, {:doc2, "デキル", 4}],
 [{:doc1, "ワタシ", 0}, {:doc1, "ハ", 1}, {:doc1, "エリクサー", 2},
  {:doc1, "チョット", 3}, {:doc1, "デキル", 4}]]

最后进行 Reduce 生成倒置矩阵。

iex(6)> MapReduce.ReduceServer.make_inverted_array
%{"エリクサー" => [doc1: 2], "チョット" => [doc1: 3, doc2: 3, doc3: 3],
  "デキル" => [doc1: 4, doc2: 4, doc3: 4],
  "ハ" => [doc1: 1, doc2: 1, doc3: 1], "リナックス" => [doc3: 2],
  "ルビー" => [doc2: 2], "ワタシ" => [doc1: 0, doc2: 0, doc3: 0]}

总结

使用 OTP 可以非常轻松实现多进程的 application。 如果换成别的语言用线程或者进程来实现重启策略会非常麻烦,想到这里笔者就感觉 OTP 是在是难能可贵。 在官网上也有使用 OTP 实现 kvs 的示例,大家有兴趣的话务必试一下。

居然是日文翻译过来的 +1

gyorou 学习 Elixir 有什么新思路么? 提及了此话题。 11月25日 10:16

赞一个,比讨论 elixir 好不好的帖子好多了

jasl 将本帖设为了精华贴。 12月03日 16:19

楼主在日本工作?工作中用 Elixir 吗?

#6 楼 @xcc7624

我在日本最大的电商 Rakuten 工作。Elixir 是业余自学。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号