Elixir 作为一门 Erlang 虚拟机上的语言,由于 Erlang 的设计风格和 Erlang 虚拟机的特性,在网络编程中大展身手。在这个大前提下,我们经常需要在网络世界中处理外部链接。 举个例子:一个典型的 Web 应用需要连接一个关系型数据库和一个 kvdb,或者一个嵌入式的系统需要连接其他的 node。
大多数情况下,这些网络连接对程序员来说是无需关心的,因为已经有很多已经封装好的网络驱动 (例如数据库驱动),但是我认为了解这些连接如何手动编写是一件很有趣的事情。如果某些特殊的网络服务没有外部的驱动代码可用,又或者你想了解这些驱动是怎样工作的,这些知识就会很有用。
这篇文章中我们只会讨论 TCP 协议的连接,因为 TCP 协议可能是网络世界中最基础和使用最多的协议了。但是我们所使用的方法和原理,在其他协议面前也是通用的,例如 UDP 协议。
作为这篇文章的目标,我们想编写一个差不多能 work 的 redis 驱动。Redis 服务是能够收发 message 的 TCP 服务。Redis 在 TCP 之上使用了一个自定义的应用层协议,并没有使用通用的 HTTP 协议。而我们并不关心这些,我们只关心怎样处理我们的 Elixir 应用和 Redis Server 之间的 TCP 连接。
一点题外话:显然,社区里已经有很多的 Erlang 和 Elixir 的 Redis 驱动,不过,懒惰的我懒得再去想一个聪明的名字,我们就叫他 Redis 好了。
这就开始吧。
在 Erlang/Elixir 中,tcp 连接是用:gen_tcp
模块来处理的。这篇文章中我们只编写客户端部分来与 Redis 服务交互,实际上:gen_tcp
也可以用来编写 TCP 服务端。
所有的发向 Server 的消息都用:gen_tcp.send/2
函数来发送。而从服务端发送至客户端的消息我们总是倾向于把它们当作 Erlang Message 来处理,因为这样处理起来比较直观。后面我们会看到,我们将通过设置 TCP socket 的:active
option 选项来控制发送至客户端的消息。
我们通过传递 host、port 等参数至:gen_tcp.connect/3
来建立与服务端的连接。默认情况下,调用 connect 函数的进程会被认为是这个 tcp 连接的“controlling process”,意思就是这个进程将会处理所有发到这个 socket 的 tcp 消息。
以上是我们对 tcp 连接所需要了解的知识,我们继续。
我们将使用GenServer
作为我们 TCP 连接的接口。我们需要一个 GenServer 以便于我们在 state 中保持 socket 的状态和在所有消息通信中复用这个 socket。
因为我们使用 GenServer 作为 TCP 连接的接口,所以我们一次只能在 state 的 socket 中维护单个连接的状态,我们希望它总是和 Server 保持连接的状态。最优的策略实在 GenServer 启动的时候来做连接的工作,具体是在 init 的回调函数中实现。init/1
是在GenServer.start_link/2
被调用的时候触发的回调,GenServer 在 init 被调用前不会做多余的工作,所有是我们建立连接的理想场所。
defmodule Redis do
use GenServer
@initial_state %{socket: nil}
def start_link do
GenServer.start_link(__MODULE__, @initial_state)
end
def init(state) do
opts = [:binary, active: false]
{:ok, socket} = :gen_tcp.connect('localhost', 6379, opts)
{:ok, %{state | socket: socket}}
end
end
我们给:gen_tcp.connect/3
设定的参数非常直观。:binary
要求 socket 从 TCP server 中接收的消息以 binary 的格式接收而不是 Erlang 默认的 charlist 格式:在 Elixir 中这可能是我们想要的,而且可能是最高效的选择。active: false
告诉 socket 永远不要把 TCP message 转换成发送给 GenServer 的 Erlang message;我们将用:gen_tcp.recv/2
函数来显式的接收 tcp 消息。我们这样做是为了我们的 GenServer 不被汹涌而来的 tcp 消息淹没:我们只在我们想要的时候去接收并处理它们。
现在我们已经有了一个连接上 Redis 服务的 GenServer 了,现在让我们给 Redis 发送一些指令。
这里需要简单提一下 Redis 的二进制协议,RESP:这是 Redis 用于编解码它的 Requst/Reply 的协议,协议的细节简单明了,如果你想了解更多,我建议你看看。为了这篇文章的中心目标,我们假设我们有了 RESP 的完全实现:它提供了encode/decode
两个函数:
Redis.RESP.encode/1
: 将 list 编码成 redis command,例如:
Redis.RESP.encode(["GET", "mykey"])
#=> <<...>>
resp_to_get_command = <<...>>
Redis.RESP.decode(resp_to_get_command)
#=> 1
:gen_tcp.send/2
我们在文章开头提到过,我们利用:gen_tcp.send/2
来向 tcp 连接发送消息。我们的 Redis 模块将提供单独一个函数来向 Redis Server 发送命令:Redis.command/2
。具体实现也很直观:
defmodule Redis do
# ...as before...
def command(pid, cmd) do
GenServer.call(pid, {:command, cmd})
end
def handle_call({:command, cmd}, from, %{socket: socket} = state) do
:ok = :gen_tcp.send(socket, Redis.RESP.encode(cmd))
# `0` means receive all available bytes on the socket.
{:ok, msg} = :gen_tcp.recv(socket, 0)
{:reply, Redis.RESP.decode(msg), state}
end
end
这段代码没啥瑕疵。
{:ok, pid} = Redis.start_link()
Redis.command(pid, ["SET", "mykey", 1])
Redis.command(pid, ["GET", "mykey"])
#=> 1
... 但这里有个问题。
长话短说::gen_tcp.recv/2
函数是阻塞的。
这段代码能顺利工作的前提是这个 GenServer 只被单个 Elixir 进程调用。当一个进程想发送命令给 Redis Server 的时候会发生如下事件:
command/2
命令,然后进程阻塞的等待结果:gen_tcp.recv/2
上你能看出问题出在哪里了吗?GenServer 在等待 Redis Server 回复的过程中是阻塞的。当然在单个进程的情况下这样是没问题的,但当多个进程同时想通过 GenServer 跟 Redis Server 做交互的时候情况就会变得很糟糕。幸好,我们可以做一个更好的实现。
你可能知道这样一个事实,GenServer 的handle_call/3
函数可以不用立即返回结果,它可以先返回一个{:noreply, state}
作为应答,然后通过GenServer.reply/2
函数返回真实的结果给请求进程。
在客户端请求然后阻塞的等待结果的同时 GenServer 继续工作直到它有了对这个客户端的回复,这样一种方法正式我们所需要的。
为了执行我们这一策略,我们需要摆脱:gen_tcp.recv/2
函数,转而用 Erlang Message 的形式来接收 TCP message。我们可以在连接 Redis 服务的时候将 socket 参数中的active: false
转换成active: true
,当 active 被设置为 true 的时候,所有 tcp socket 接收的消息都会转换成{:tcp, socket, message}
形式的 Erlang Message 发送给 GenServer。
这些事情将会发生:
command/2
,然后阻塞自己等待结果{:noreply, state}
,所以它自身不会被阻塞{:tcp, socket, message}
的形式接收到handle_info/2
函数中处理这条消息,并回应调用的 Elixir 进程不难看出,从 GenServer 发出命令给 Redis Server 到它接收到 Redis Server 的回应这段时间内,GenServer 是非阻塞的,它还能继续发送其他的命令给 GenServer,Nice!
剩下需要解决的问题就是,GenServer 怎样回执给正确的调用进程:当 GenServer 接收到一条{:tcp, ....}
的消息时,它怎么知道GenServer.reply/2
函数该发给谁呢?我们知道 Redis 是严格按照 fifo 的顺序来应答的,我们可以利用一个简单的队列来把请求的进程存储起来。我们将在 GenServer 的 state 中维护一个队列,当进程请求的时候入队,当有应答到来的时候出队。
defmodule Redis do
@initial_state %{socket: nil, queue: :queue.new()}
# ...as before...
def handle_call({:command, cmd}, from, %{queue: queue} = state) do
# We send the command...
:ok = :gen_tcp.send(state.socket, Redis.RESP.encode(cmd))
# ...enqueue the client...
state = %{state | queue: :queue.in(from, queue)}
# ...and we don't reply right away.
{:noreply, state}
end
def handle_info({:tcp, socket, msg}, %{socket: socket} = state) do
# We dequeue the next client:
{{:value, client}, new_queue} = :queue.out(state.queue)
# We can finally reply to the right client.
GenServer.reply(client, Redis.RESP.decode(msg))
{:noreply, %{state | queue: new_queue}}}
end
end
在上面的篇幅中,为了能够以 Erlang Message 的形式接收 TCP 消息,我们从一个active: false
的 socket 转移到了active: true
的 socket。它能正常运行,但在一种情况下会出现问题:当 TCP 服务发送大量数据给 GenServer 的时候,因为 Erlang 本身并没有对消息接收的队列大小做限制,这样很容易造成 GenServer 的消息雪崩;这也是我们最开始选择active: false
的原因。为了解决这个问题,我们可以将active: true
改成更保守的active: once
:这样每次只会有一个 tcp 消息被转换成 Erlang Message,然后 socket 又回到了active: false
的状态。我们可以重新设置active: once
来接收下一条消息,如此循环。我们每次只转换一条 TCP 消息为 Erlang Message,这样可以保证我们能够处理它们。
我们只要记得在接收一条{:tcp, ...}
的消息的时候重新激活 Socket 即可,我们可以利用:inet:setopt/2
函数来实现。
defmodule Redis do
# ...as before...
def handle_info({:tcp, socket, msg}, %{socket: socket} = state) do
# Allow the socket to send us the next message.
:inet.setopts(socket, active: :once)
# exactly as before
end
end
上文描述的模式并不是我想出来的,很意外对吧?我所形容的模式在一票 Erlang 和 Elixir 应用中非常常见。这个模式在任何需要连接 tcp 服务的场合 (或者类似的场合) 都表现的十分良好,它经常被用在数据库驱动,这也是我为啥选 Redis 来做例子的理由。
很多现实世界中的库都使用着我所描述的模式:举个例子,eredis(Erlang 最常用的 Redis 驱动) 就跟我们的例子很类似:看看这部分代码注释,基本上就是这篇文章的总结。另外一个跟我们的模式大致相似的例子就是PostgreSQL和MongoDB的 Elixir 驱动。目前我正在为OrientDB编写 Elixir 驱动,也使用的是这个模式。所以这个肯定是可行的。
上文中我们愉快的忽略了一个令人烦躁的问题 -- 错误处理!
我们将继续愉快的忽略一系列可能发生的错误,例如,消息到来的时候遇到空队列 (它会报一个{{:value, val}, new_queue}
的模式匹配错误),或是接收到不完整的 TCP 消息。但是在 TCP 连接中可能发生的一系列问题例如断线和超时这些我们是可以尝试解决的。
我们可以自己手动的来处理这些异常,幸运的是,Elixir 的核心开发者James Fish已经在他的库connection中做完了大部分工作。这个类库十分年轻,它已经被用在上文提到的MongoDB 驱动和OrientDB 驱动之中了。
这个库协议定义了一个名为connection
的协议:这个协议所规定的 API 是 GenServer 协议的一个超集,所以它易于理解也容易整合进现有的项目。
这篇文档详细的解释了Connection
协议,这个库的主旨是实现一个连接着另一端且能做断线处理的进程。为了实现这一目标,Connection
协议定义了两个附加函数并且修改了部分 GenServer 的返回值。
我们这里只研究部分Connection
的函数,如果你想了解更多细节,请阅读文档。
我们的Redis.init/1
回调函数实现了连接 Redis 服务的行为,阻塞了调用Redis.start_link/0
函数的进程直到回调函数返回。如果我们不希望 GenServer 在连接上 Redis 服务之前做其他事情的话是没太大问题的。但是我们的start_link/0
函数可能是被监控树所调用,或者是被专门来启动 GenServer 的进程所调用:在这种情况下,我们希望start_link/0
函数尽快的返回{:ok, pid}
的结果,然后在后台来完成连接的动作。我们也希望 GenServer 能用队列缓存住建立连接期间的请求。这个协议能够使进程非阻塞的启动 GenServer,但是会阻塞后续的请求直到 GenServer 连接上 Redis。
有了Connection
我们可以完全做到这一点。init/1
回调函数返回{:connect, info, state}
而非{:ok, state}
迫使start_link/0
立即返回{:ok, pid}
,同时调用了connect/2
的 GenServer 回调阻塞 GenServer 接收其他的请求直到连接完成。{:connect, info, state}
中的info
应该包含我们建立连接的所有信息,这些信息我们并不想放在 GenServer 的 state 中保存。
我们把代码做点改进:
defmodule Redis do
use Connection
@initial_state %{socket: nil}
def start_link do
# We need Connection.start_link/2 now,
# not GenServer.start_link/2
Connection.start_link(__MODULE__, @initial_state)
end
def init(state) do
# We use `nil` as we don't need any additional info
# to connect
{:connect, nil, state}
end
def connect(_info, state) do
opts = [:binary, active: :once]
{:ok, socket} = :gen_tcp.connect('localhost', 6379, opts)
{:ok, %{state | socket: socket}}
end
end
这对我们之前的实现来说是个巨大改进,但是Connection
库还可以做的更好。
我们在使用:gen_tcp.connect/3
连接 Redis 服务的地方直接使用{:ok, socket} = ...
模式匹配非常不妥,这个地方有个很大隐患。如果连接意外中断,此处的模式匹配失败,那么整个 GenServer 都会挂掉。最明显的处理方法就是用 case 语句来匹配:gen_tcp.connect/3
函数的返回值:
case :gen_tcp.connect('localhost', 6379, opts) do
{:ok, socket} ->
{:ok, %{state | socket: socket}}
{:error, reason} ->
# now what?
end
现在我们便能够决定在有错误发生的情况下该如何处理。挂起 GenServer 或是返回 error 都很平常,现实世界中,我们通常会做重连的操作。我们可以令connect/2
返回一个{:backoff, timeout, state}
元组,这样connect/2
会在timeout
时间后被再次调用,尝试重连。我们的connect/2
看起来是这样:
def connect(_info, state) do
opts = [:binary, active: :once]
case :gen_tcp.connect('localhost', 6379, opts) do
{:ok, socket} ->
{:ok, %{state | socket: socket}}
{:error, reason} ->
IO.puts("TCP connection error: #{inspect reason}")
# Try again in one second:
{:backoff, 1000, state}
end
end
Connection
的好处在于你可以在几乎任意一个回调函数中返回{:backoff, timeout, state}
,这样断线的错误处理就变得很直观。
当{:backoff, timeout, state}
被返回时,connect/2
被调用且用:backoff
作它的第一个参数:这让我们很容易的区分这是初始连接还是重连的动作,方便我们做区别对待。比如说,我们想实现一个指数重连,即初次 1 秒后重试,第二次 2 秒后重试,第三次 4 秒后重试,如此直到达到最大重试次数。
最后一个小技巧,我们的 GenServer 在poolboy库的帮助下可以更平滑的使用。网上有许许多多关于poolboy
的文档,所以我并不准备去解释它是怎么工作的。我只是展示下一个例子。
首先,我们用:poolboy.start_link/2
函数为 GenServer 创建一个固定大小的池。
poolboy_opts = [worker_module: Redis, size: 50]
redis_opts = []
{:ok, pool} = :poolboy.start_link(poolboy_opts, redis_opts)
然后,我们从池中拿出一个资源(即一个 GenServer),做完 Redis 操作之后再归还至池中。
worker = :poolboy.checkout(pool)
Redis.command(worker, ["SET", "mykey", 1])
:ok = :poolboy.checkin(pool, worker)
没啥比这更舒服了!
我们见识到了如何利用 GenServer 来实现一个 tcp 服务。我们构建了一个非阻塞的,能够在等待返回值的同时并发的发送请求。我们使用了connection库的回退策略来处理 TCP 错误。最后我们简单看了看poolboy库是怎样池化我们多个 GenServer 进程的。
感谢您的阅读!
Written on June 19, 2015