RPC 模块本身是一个 gen_server 会随着 kernel 模块启动,也就是说,在 Erlang/OTP 启动后我们就免费获得了一个 RPC 进程。 RPC 进程启动的时候,会在 Erts 中通过 local 注册一个名字 rex 的进程,这样没有经过修改的 Erlang/OTP 都会有这个名字在它的名字列表上。
不管是同步调用还是广播调用,在 RPC 模块中的调用都是依赖 gen_server 的相关方和 erlang:send 方法来完成。这样尽最大可能的重用代码,保证了整个 OTP 中对远程调用的表现的一致性。 并且 RPC 模块不单单可以调用远程节点的方法或进程,也可以调用本地节点的方法或进程,这样保证了整个 RPC 的系统位置透明性,并且 RPC 模块针对本地节点作了相关优化。
例如说 call 方法针对本地节点就采用了下面的方法:
local_call(M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
case catch apply(M, F, A) of
{'EXIT',_} = V -> {badrpc, V};
Other -> Other
end.
这两个方法都是同步的调用,但是实现的细节非常不同,对 rex 进程的影响也是不同的。当然使用两个方法在并发执行的情况下,得到的结果是完全不同的。 不管是 call 也好,block_call 也好,都会在执行阶段暂时的将被调用者进程的 console 输出重定向到调用者进程所在节点的 group leader 上。
在调用发起者一侧,RPC 模块会立刻建立一个监控下的 Erlang 进程,并在该进程内通过 gen_server:call 方法来调用远程节点。
do_call(Node, Request, infinity) ->
rpc_check(catch gen_server:call({?NAME,Node}, Request, infinity));
do_call(Node, Request, Timeout) ->
Tag = make_ref(),
{Receiver,Mref} =
erlang:spawn_monitor(
fun() ->
process_flag(trap_exit, true),
Result = gen_server:call({?NAME,Node}, Request, Timeout),
exit({self(),Tag,Result})
end),
receive
{'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
rpc_check(Result);
{'DOWN',Mref,_,_,Reason} ->
rpc_check_t({'EXIT',Reason})
end.
RPC 模块会将超时或对方节点失去连接的情况处理为 bad_rpc,让顶层逻辑发现并非业务本身引起的远程调用问题。
在被调用者一些,RPC 模块也会立刻创建一个监控下的 Erlang 进程,并在该进程内处理调用者的 call 消息,同时会将相关信息保存在 rex 进程的进程上下文中。当新的进程完成了业务处理,就会把处理结果返回给被调用者节点的 rex 进程,然后再将结果返回给调用发起者。 我们可以仔细观察它的代码:
handle_call_call(Mod, Fun, Args, Gleader, To, S) ->
RpcServer = self(),
%% Spawn not to block the rpc server.
{Caller,_} =
erlang:spawn_monitor(
fun () ->
set_group_leader(Gleader),
Reply =
case catch apply(Mod, Fun, Args) of
{'EXIT', _} = Exit ->
{badrpc, Exit};
Result ->
Result
end,
RpcServer ! {self(), {reply, Reply}}
end),
{noreply, gb_trees:insert(Caller, To, S)}.
于 call 方法一样,在调用发起者一侧,RPC 模块会立刻建立一个监控下的 Erlang 进程,并在该进程内通过 gen_server:call 方法来调用远程节点。
但是在被调用者一些,RPC 模块会选择使用被调用者所在节点的 rex 直接执行相关代码
handle_call({block_call, Mod, Fun, Args, Gleader}, _To, S) ->
MyGL = group_leader(),
set_group_leader(Gleader),
Reply =
case catch apply(Mod,Fun,Args) of
{'EXIT', _} = Exit ->
{badrpc, Exit};
Other ->
Other
end,
group_leader(MyGL, self()), % restore
{reply, Reply, S};
call 方法可以保证,同一调用者的远程请求按序列执行,但是不保证多个调用者的远程请求按序列执行。 block_call 方法保证,多个调用者的远程请求按序列执行。 不管是 call 还是 block_call 的方法都会给调用者带来大量的进程创建的压力(Erlang 创建进程很快,但不代表没有代价)。 call 方法还会给被调用者节点带来大量的进程创建压力。
RPC 模块的 cast 方法直接依赖于 gen_sever:cast,并没有做更多的事情。
针对本地节点,cast 方法会在调用者节点内创建一个进程来执行相关代码:
cast(Node, Mod, Fun, Args) when Node =:= node() ->
catch spawn(Mod, Fun, Args),
true;
cast(Node, Mod, Fun, Args) ->
gen_server:cast({?NAME,Node}, {cast,Mod,Fun,Args,group_leader()}),
true.
被调用者接收到消息后会立刻创建进程执行相关代码:
handle_cast({cast, Mod, Fun, Args, Gleader}, S) ->
spawn(fun() ->
set_group_leader(Gleader),
apply(Mod, Fun, Args)
end),
{noreply, S};
cast 方法是非常简单的。和 call 方法一样,会给被调用者节点带来大量的进程创建压力。 同样不要忘记了,cast 方法也会将新创建的进程的 console 输出重新定向调用者所在节点的 group leader 上。
这两个方法都是通过 erlang:send 将调用者的消息发送到被调用者节点上。
abcast 采用的是纯异步,发出去就不管了,直接将消息不经过 rex 进程直接发送到目标进程上
abcast(Name, Mess) ->
abcast([node() | nodes()], Name, Mess).
abcast([Node|Tail], Name, Mess) ->
Dest = {Name,Node},
%这么做的好处是不会让进程被trap
%从而保证了异步性
case catch erlang:send(Dest, Mess, [noconnect]) of
noconnect -> spawn(erlang, send, [Dest,Mess]), ok;
_ -> ok
end,
abcast(Tail, Name, Mess);
abcast([], _,_) -> abcast.
此处 abcast 完全是异步的,如果发现了目标节点是没有连接的时候,直接创建一个新的进程来进行消息发送,完全不会进入 Trap 状态等待节点连接。
sbcast 算是同步的广播方式,发送后会回收广播结果,并且当节点没有完成连接的时候,会进入 Trap 状态等待节点连接完成
sbcast(Name, Mess) ->
sbcast([node() | nodes()], Name, Mess).
sbcast(Nodes, Name, Mess) ->
Monitors = send_nodes(Nodes, ?NAME, {sbcast, Name, Mess}, []),
rec_nodes(?NAME, Monitors).
send_nodes([Node|Tail], Name, Msg, Monitors) when is_atom(Node) ->
Monitor = start_monitor(Node, Name),
%% Handle non-existing names in rec_nodes.
catch {Name, Node} ! {self(), Msg},
send_nodes(Tail, Name, Msg, [Monitor | Monitors]);
send_nodes([_Node|Tail], Name, Msg, Monitors) ->
%% Skip non-atom _Node
send_nodes(Tail, Name, Msg, Monitors);
send_nodes([], _Name, _Req, Monitors) ->
Monitors.
rec_nodes(Name, Nodes) ->
rec_nodes(Name, Nodes, [], []).
rec_nodes(_Name, [], Badnodes, Replies) ->
{Replies, Badnodes};
rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) ->
receive
{'DOWN', R, _, _, _} ->
rec_nodes(Name, Tail, [N|Badnodes], Replies);
{?NAME, N, {nonexisting_name, _}} ->
%% used by sbcast()
erlang:demonitor(R, [flush]),
rec_nodes(Name, Tail, [N|Badnodes], Replies);
{Name, N, Reply} -> %% Name is bound !!!
erlang:demonitor(R, [flush]),
rec_nodes(Name, Tail, Badnodes, [Reply|Replies])
end.
转载自TechTalk