Erlang/Elixir Erlang 的 RPC 模块代码分析

davidgao · 2017年07月31日 · 8469 次阅读

RPC 的代码分析

进程创建

RPC 模块本身是一个 gen_server 会随着 kernel 模块启动,也就是说,在 Erlang/OTP 启动后我们就免费获得了一个 RPC 进程。 RPC 进程启动的时候,会在 Erts 中通过 local 注册一个名字 rex 的进程,这样没有经过修改的 Erlang/OTP 都会有这个名字在它的名字列表上。

RPC 调用逻辑

不管是同步调用还是广播调用,在 RPC 模块中的调用都是依赖 gen_server 的相关方和 erlang:send 方法来完成。这样尽最大可能的重用代码,保证了整个 OTP 中对远程调用的表现的一致性。 并且 RPC 模块不单单可以调用远程节点的方法或进程,也可以调用本地节点的方法或进程,这样保证了整个 RPC 的系统位置透明性,并且 RPC 模块针对本地节点作了相关优化。

例如说 call 方法针对本地节点就采用了下面的方法:

local_call(M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    case catch apply(M, F, A) of
        {'EXIT',_} = V -> {badrpc, V};
        Other -> Other
    end.

call 和 block_call 方法

这两个方法都是同步的调用,但是实现的细节非常不同,对 rex 进程的影响也是不同的。当然使用两个方法在并发执行的情况下,得到的结果是完全不同的。 不管是 call 也好,block_call 也好,都会在执行阶段暂时的将被调用者进程的 console 输出重定向到调用者进程所在节点的 group leader 上。

call 方法

在调用发起者一侧,RPC 模块会立刻建立一个监控下的 Erlang 进程,并在该进程内通过 gen_server:call 方法来调用远程节点。

do_call(Node, Request, infinity) ->
    rpc_check(catch gen_server:call({?NAME,Node}, Request, infinity));
do_call(Node, Request, Timeout) ->
    Tag = make_ref(),
    {Receiver,Mref} =
    erlang:spawn_monitor(
      fun() ->
          process_flag(trap_exit, true),
          Result = gen_server:call({?NAME,Node}, Request, Timeout),
          exit({self(),Tag,Result})
      end),
    receive
    {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
        rpc_check(Result);
    {'DOWN',Mref,_,_,Reason} ->
        rpc_check_t({'EXIT',Reason})
    end.

RPC 模块会将超时或对方节点失去连接的情况处理为 bad_rpc,让顶层逻辑发现并非业务本身引起的远程调用问题。

在被调用者一些,RPC 模块也会立刻创建一个监控下的 Erlang 进程,并在该进程内处理调用者的 call 消息,同时会将相关信息保存在 rex 进程的进程上下文中。当新的进程完成了业务处理,就会把处理结果返回给被调用者节点的 rex 进程,然后再将结果返回给调用发起者。 我们可以仔细观察它的代码:

handle_call_call(Mod, Fun, Args, Gleader, To, S) ->
    RpcServer = self(),
    %% Spawn not to block the rpc server.
    {Caller,_} =
    erlang:spawn_monitor(
      fun () ->
          set_group_leader(Gleader),
          Reply = 
              case catch apply(Mod, Fun, Args) of
              {'EXIT', _} = Exit ->
                  {badrpc, Exit};
              Result ->
                  Result
              end,
          RpcServer ! {self(), {reply, Reply}}
      end),
    {noreply, gb_trees:insert(Caller, To, S)}.

block_call 方法

于 call 方法一样,在调用发起者一侧,RPC 模块会立刻建立一个监控下的 Erlang 进程,并在该进程内通过 gen_server:call 方法来调用远程节点。

但是在被调用者一些,RPC 模块会选择使用被调用者所在节点的 rex 直接执行相关代码

handle_call({block_call, Mod, Fun, Args, Gleader}, _To, S) ->
    MyGL = group_leader(),
    set_group_leader(Gleader),
    Reply = 
    case catch apply(Mod,Fun,Args) of
        {'EXIT', _} = Exit ->
        {badrpc, Exit};
        Other ->
        Other
    end,
    group_leader(MyGL, self()), % restore
    {reply, Reply, S};

同步调用总结

call 方法可以保证,同一调用者的远程请求按序列执行,但是不保证多个调用者的远程请求按序列执行。 block_call 方法保证,多个调用者的远程请求按序列执行。 不管是 call 还是 block_call 的方法都会给调用者带来大量的进程创建的压力(Erlang 创建进程很快,但不代表没有代价)。 call 方法还会给被调用者节点带来大量的进程创建压力。

cast 方法

RPC 模块的 cast 方法直接依赖于 gen_sever:cast,并没有做更多的事情。

针对本地节点,cast 方法会在调用者节点内创建一个进程来执行相关代码:

cast(Node, Mod, Fun, Args) when Node =:= node() ->
    catch spawn(Mod, Fun, Args),
    true;
cast(Node, Mod, Fun, Args) ->
    gen_server:cast({?NAME,Node}, {cast,Mod,Fun,Args,group_leader()}),
    true.

被调用者接收到消息后会立刻创建进程执行相关代码:

handle_cast({cast, Mod, Fun, Args, Gleader}, S) ->
    spawn(fun() ->
          set_group_leader(Gleader),
          apply(Mod, Fun, Args)
      end),
    {noreply, S};

cast 方法总结

cast 方法是非常简单的。和 call 方法一样,会给被调用者节点带来大量的进程创建压力。 同样不要忘记了,cast 方法也会将新创建的进程的 console 输出重新定向调用者所在节点的 group leader 上。

abcast 和 sbcast

这两个方法都是通过 erlang:send 将调用者的消息发送到被调用者节点上。

abcast

abcast 采用的是纯异步,发出去就不管了,直接将消息不经过 rex 进程直接发送到目标进程上


abcast(Name, Mess) ->
    abcast([node() | nodes()], Name, Mess).

abcast([Node|Tail], Name, Mess) ->
    Dest = {Name,Node},
    %这么做的好处是不会让进程被trap
    %从而保证了异步性
    case catch erlang:send(Dest, Mess, [noconnect]) of
    noconnect -> spawn(erlang, send, [Dest,Mess]), ok;
    _ -> ok
    end,
    abcast(Tail, Name, Mess);
abcast([], _,_) -> abcast.

此处 abcast 完全是异步的,如果发现了目标节点是没有连接的时候,直接创建一个新的进程来进行消息发送,完全不会进入 Trap 状态等待节点连接。

sbcast

sbcast 算是同步的广播方式,发送后会回收广播结果,并且当节点没有完成连接的时候,会进入 Trap 状态等待节点连接完成

sbcast(Name, Mess) ->
    sbcast([node() | nodes()], Name, Mess).

sbcast(Nodes, Name, Mess) ->
    Monitors = send_nodes(Nodes, ?NAME, {sbcast, Name, Mess}, []),
    rec_nodes(?NAME, Monitors).

send_nodes([Node|Tail], Name, Msg, Monitors) when is_atom(Node) ->
    Monitor = start_monitor(Node, Name),
    %% Handle non-existing names in rec_nodes.
    catch {Name, Node} ! {self(), Msg},
    send_nodes(Tail, Name, Msg, [Monitor | Monitors]);
send_nodes([_Node|Tail], Name, Msg, Monitors) ->
    %% Skip non-atom _Node
    send_nodes(Tail, Name, Msg, Monitors);
send_nodes([], _Name,  _Req, Monitors) -> 
    Monitors.

rec_nodes(Name, Nodes) -> 
    rec_nodes(Name, Nodes, [], []).

rec_nodes(_Name, [],  Badnodes, Replies) ->
    {Replies, Badnodes};
rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) ->
    receive
    {'DOWN', R, _, _, _} ->
        rec_nodes(Name, Tail, [N|Badnodes], Replies);
    {?NAME, N, {nonexisting_name, _}} ->  
        %% used by sbcast()
        erlang:demonitor(R, [flush]),
        rec_nodes(Name, Tail, [N|Badnodes], Replies);
    {Name, N, Reply} ->  %% Name is bound !!!
        erlang:demonitor(R, [flush]),
        rec_nodes(Name, Tail, Badnodes, [Reply|Replies])
    end.

转载自TechTalk

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请 注册新账号