算法 A Basic Paxos Algorithm Demo Using Erlang

yfractal · 2019年08月05日 · 5945 次阅读

简单来说,Paxos 表述了一个,在分布式的情景下,假设每个参与的服务,任何时候都可能挂掉的前提下,让所有服务达成统一的一个算法。

而 Erlang 刚好是一门并发语言,非常适合描述这样的算法。

算法描述

我们来看 wiki 上的描述(如果想要理解个算法,可以看这篇博客)。

Phase 2

Phase 2a: Accept

If a Proposer receives enough Promises from a Quorum of Acceptors, it needs to set a valuealo v to its proposal. If any Acceptors had previously accepted any proposal, then they'll have sent their values to the Proposer, who now must set the value of its proposal, v, to the value associated with the highest proposal number reported by the Acceptors, let's call it z. If none of the Acceptors had accepted a proposal up to this point, then the Proposer may choose the value it originally wanted to propose, say x[17].

The Proposer sends an Accept message, (n, v), to a Quorum of Acceptors with the chosen value for its proposal, v, and the proposal number n (which is the same as the number contained in the Prepare message previously sent to the Acceptors). So, the Accept message is either (n, v=z) or, in case none of the Acceptors previously accepted a value, (n, v=x).

Phase 2b: Accepted

If an Acceptor receives an Accept message, (n, v), from a Proposer, it must accept it if and only if it has not already promised (in Phase 1b of the Paxos protocol) to only consider proposals having an identifier greater than n.

If the Acceptor has not already promised (in Phase 1b) to only consider proposals having an identifier greater than n, it should register the value v (of the just received Accept message) as the accepted value (of the Protocol), and send an Accepted message to the Proposer and every Learner (which can typically be the Proposers themselves). Else, it can ignore the Accept message or request.

Erlang 实现

不敢说掌握了这个算法,有问题,还请大佬们指正。

注:这里实现的是一个 demo,只实现了主要部分。比如,没有实现 acceptor 挂掉后再次回到集群的情况。

paxos 中,在 proposal 给 acceptor 发消息的时候,都会发一个 n,这个可以近似认为是 transaction id。在下面我们叫它 sequence number。

Acceptor

Acceptor 的职责主要有,接收 Proposer 的提议,和把自己接受的提议告诉 Proposer。

由于 Acceptor 只是处理一些简单的事情,所用 Erlang 提供的 gen_server(general server 的缩写)来做。

Acceptor 职责有两个,一个是接收准备阶段的消息,一个是接收提议,代码如下。

-module(paxos_acceptor).

%% 声明使用 gen_server 这个 behaviour。
-behaviour(gen_server).

%% 创建 paxos_acceptor process
start_link(Id) ->
    gen_server:start_link(?MODULE, [Id], []).

prepare(Pid, From, SeqNum) ->
    %% acceptor 接收消息,异步返回,所以使用 gen_server:cast
    gen_server:cast(Pid, {prepare, From, SeqNum}).

accept(Pid, From, {SeqNum, NewProposal}) ->
    gen_server:cast(Pid, {accept, From, {SeqNum, NewProposal}}).

我们看到,当调用 paxos_acceptor:accept 的时候,

向对应的 acceptor process 发了一条消息 {prepare, From, SeqNum},对应的 process 要能处理这个消息,代码如下:

handle_cast({prepare, From, SeqNum}, State) ->
    State2 = do_prepare(From, SeqNum, State),
    {noreply, State2};

do_prepare 的实现:

acceptor 需要接收 proposer 发来的 sequence number(wikipedia 里的的 n),

如果这个 SeqNum,比之前收到过的 sequence number 小,则丢弃这个消息。

如果比之前的大,则返回自己接受的 proposal 和 SeqNum 给发消息来的 proposer,

并且保证,不在接收比这个 SeqNum 小的消息。

代码实现:

do_prepare(From, SeqNum,
           #state{highest_seq_num=HighestSeqNum, accepted_proposal=Proposal, id=Id} = State)
  when SeqNum > HighestSeqNum ->
    paxos_proposer:prepare(From, {SeqNum, Proposal, Id}),
    State#state{highest_seq_num=SeqNum};

do_prepare(_, SeqNum, State) ->
    State.

accept 所做的事情也是类似的。如果收到 sequence number 比之前的大,则接收对应的 proposal,代码如下:

do_accept(From, {SeqNum, NewProposal},
          #state{id=Id, highest_seq_num=HighestSeqNum} = State)
  when SeqNum >= HighestSeqNum ->
    paxos_proposer:proposal_accepted(From, {SeqNum, NewProposal}),

    State#state{highest_seq_num=SeqNum, accepted_proposal=NewProposal};
do_accept(_, {SeqNum, NewProposal},
          #state{id=Id, highest_seq_num=HighestSeqNum} = State) ->
    State.

Proposer

proposer 和 acceptor 一样做两件事。

在准备阶段,proposer 发 sequence number 给所有的 acceptor,并等待 acceptor 返回。

在提议阶段,proposer 根据之前的结果,发 sequence number 和 proposal 给所有 acceptor,并等待返回。

第一个阶段的目的是,通过发消息,得到所有 acceptor 的最大 sequence number。以及,最大 sequence number 对应的 proposal。

第二个阶段,proposer 把当前收到的最新的提议,发给所有 acceptor,等待 acceptor 的确认。

所谓 proposer 最新的提议,就是上一个阶段,收到的,最大 sequence number 对应的 proposal。

因为一开始的时候,acceptor 可能没有收到过 proposal,那么,最大 sequence number 的返回就可能会空,这个时候,proposal 可以自由发挥。

我们看到,proposer 有明确的状态流转,所以使用 gen_statem 来实现。

callback 我用 state_functionsstate_enter。简单来说,就是用不同的方法表示所处的状态。每个状态,都会有 enter 方法。

在准备阶段,有三个子状态要处理,进入、接收消息、timeout。

进入 prepare 的时候,向所有的 acceptor 发消息。之后等待消息返回。

接收到多数的返回的时候,则进入下一个状态。如果没有,就会在这个状态下等待。

由于 acceptor 可能什么都不返回,所以一段时间收不到消息,就触发 timeout。并在 timeout 里再次发消息、再次等待。

prepare(enter, _Msg, Data) ->
    timer:sleep(rand:uniform(5)),
    Data2 = send_prepare_to_all_acceptors(Data),
    {keep_state, Data2, [{state_timeout, 20, timeout}]};
prepare(cast, {prepare, SeqNum, Proposal, _AcceptorId},
        #data{received_messag_count=ReceivedMessagCount, current_seq_number=CurrentSeqNumber}=Data) when CurrentSeqNumber =:= SeqNum ->
    Data2 = maybe_update_received_proposal(Data, {SeqNum, Proposal}),

    MajorityCount = majority_count(Data),
    if ReceivedMessagCount + 1 >= MajorityCount ->
            Data3 = Data2#data{received_messag_count=0},
            {next_state, proposal, Data3};
       true ->
            Data3 = Data2#data{received_messag_count = ReceivedMessagCount + 1},
            {keep_state, Data3, [{state_timeout, 20, timeout}]}
    end;
prepare(state_timeout, timeout, Data) ->
    Data2 = send_prepare_to_all_acceptors(Data),
    {keep_state, Data2, [{state_timeout, 20, timeout}]};

prepare(_, _, Data) ->
    {keep_state, Data}.

消息是异步的,也就是说,一个 proposer 发了 sequence number 为 19 的消息后,又发了为 20 的消息。

这个时候可能收到 sequence number 为 19 的返回,只要丢弃就可以了。

prepare(_, _, Data) ->
    {keep_state, Data}.

提议阶段的代码和准备阶段的代码类似

proposal(enter, _Msg, Data) ->
    Data2 = send_proposal_to_all_acceptors(Data),
    {keep_state, Data2, [{state_timeout, 15, timeout}]};
proposal(cast, {proposal_accepted, SeqNum, Proposal},
         #data{received_messag_count=ReceivedMessagCount,
               current_seq_number=SeqNum,
               current_proposal=Proposal} = Data) ->
    MajorityCount = majority_count(Data),
    if ReceivedMessagCount + 1 >= MajorityCount ->
            Data3 = Data#data{current_proposal=Proposal},
            {next_state, consensus, Data3};
       true ->
            Data2 = Data#data{received_messag_count = ReceivedMessagCount + 1},
            {keep_state, Data2, [{state_timeout, 20, timeout}]}
    end;
proposal(state_timeout, timeout, Data) ->
    {next_state, prepare, Data};
proposal(_, _, Data) ->
    {keep_state, Data}.

小结

还有一些其他的小细节,比如如果生成 sequence number,由于和主算法关系不大,就不详细说明了,可以具体看代码。

个人认为,Erlang 的实现,要比描述更容易理解,这可能是因为 Erlang 在语言(process、pattern matching)和框架(gen_xxx)层面,对分布式做了非常好的抽象。

参考

需要 登录 后方可回复, 如果你还没有账号请 注册新账号