将 Learn You Some Erlang 教程从 gen_fsm 转换为 gen_statem

问题描述

我已经阅读了本教程的第 Rage Against The Finite State Machine 章,它使用了 gen_fsm,它已被弃用而支持 gen_statem。在运行测试时,我总是卡住,因为其中一个客户端处于 negotiate 状态 并收到 accept_negotiate 事件。 (也许还有其他错误,但现在我无法弄清楚为什么会发生这种情况。

trade_statem.erl

-module(trade_statem).
-behaviour(gen_statem).

% Public API
-export([start/1,start_link/1,trade/2,accept_trade/1,make_offer/2,retract_offer/2,ready/1,cancel/1]).
% gen_statem callbacks
-export([init/1,callback_mode/0,code_change/4,terminate/3]).
% Custom states
-export([idle/3,idle_wait/3,negotiate/3,wait/3,ready/3]).

% Data record
-record(data,{name = "",other,own_items = [],other_items = [],monitor,from}).

% Public API
start(Name) -> gen_statem:start(?MODULE,[Name],[]).

start_link(Name) -> gen_statem:start_link(?MODULE,[]).

trade(OwnPid,OtherPid) -> gen_statem:call(OwnPid,{negotiate,OtherPid},30000).

accept_trade(OwnPid) -> gen_statem:call(OwnPid,accept_negotiate).

make_offer(OwnPid,Item) -> gen_statem:cast(OwnPid,{make_offer,Item}).

retract_offer(OwnPid,{retract_offer,Item}).

ready(OwnPid) -> gen_statem:call(OwnPid,ready,infinity).

cancel(OwnPid) -> gen_statem:stop(OwnPid).

% Client-To-Client API
ask_negotiate(OtherPid,OwnPid) -> gen_statem:cast(OtherPid,{ask_negotiate,OwnPid}).

accept_negotiate(OtherPid,{accept_negotiate,OwnPid}).

do_offer(OtherPid,Item) -> gen_statem:cast(OtherPid,{do_offer,Item}).

undo_offer(OtherPid,{undo_offer,Item}).

are_you_ready(OtherPid) -> gen_statem:cast(OtherPid,are_you_ready).

not_yet(OtherPid) -> gen_statem:cast(OtherPid,not_yet).

am_ready(OtherPid) -> gen_statem:cast(OtherPid,ready).

ack_trans(OtherPid) -> gen_statem:cast(OtherPid,ack).

ask_commit(OtherPid) -> gen_statem:call(OtherPid,ask_commit).

do_commit(OtherPid) -> gen_statem:call(OtherPid,do_commit).

% gen_statem API
init(Name) -> {ok,idle,#data{name = Name}}.

callback_mode() -> state_functions.

code_change(_,StateName,Data,_) -> {ok,Data}.

terminate(normal,D = #data{}) -> notice(D,"FSM leaving.",[]);
terminate(_,_,_) -> ok.

% Custom states
idle(cast,D = #data{}) ->
    Ref = monitor(process,OtherPid),notice(D,"~p asked for a trade negotiation",[OtherPid]),{next_state,idle_wait,D#data{other = OtherPid,monitor = Ref}};
idle({call,From},D = #data{}) ->
    ask_negotiate(OtherPid,self()),"asking user ~p for a trade",Ref = monitor(process,monitor = Ref,from = From}};
idle(_,Event,_) ->
    unexpected(Event,idle),keep_state_and_data.

idle_wait(cast,D = #data{other = OtherPid}) ->
    gen_statem:reply(D#data.from,ok),"starting negotiation",[]),negotiate,D};
idle_wait(cast,D = #data{other = OtherPid}) ->
    accept_negotiate(OtherPid,"accepting negotiation",D};
idle_wait({call,accept_negotiate,D,{reply,From,ok}};
idle_wait(_,idle_wait),keep_state_and_data.

negotiate(cast,Item},D = #data{own_items = OwnItems}) ->
    do_offer(D#data.other,Item),"offering ~p",[Item]),{keep_state,D#data{own_items = add(Item,OwnItems)}};
negotiate(cast,D = #data{own_items = OwnItems}) ->
    undo_offer(D#data.other,"cancelling offer on ~p",D#data{own_items = remove(Item,D = #data{other_items = OtherItems}) ->
    notice(D,"other player offering ~p",D#data{other_items = add(Item,OtherItems)}};
negotiate(cast,"Other player cancelling offer on ~p",D#data{other_items = remove(Item,are_you_ready,D = #data{other = OtherPid}) ->
    io:format("Other user ready to trade~n"),"Other user ready to transfer goods:~nYou get ~p,the other side gets ~p",[D#data.other_items,D#data.own_items]),not_yet(OtherPid),keep_state_and_data;
negotiate({call,D = #data{other = OtherPid}) ->
    are_you_ready(OtherPid),"asking if ready,waiting",wait,D#data{from = From}};
negotiate(EventType,Data) ->
    unexpected({EventType,Data},negotiate),keep_state_and_data.

wait(cast,D = #data{}) ->
    am_ready(D#data.other),"asked if ready,and I am. Waiting for same reply",keep_state_and_data;
wait(cast,not_yet,D = #data{}) ->
    notice(D,"Other not ready yet",ack_trans(D#data.other),gen_statem:reply(D#data.from,"other side is ready. Moving to ready state",D};
wait(_,wait),keep_state_and_data.

ready({call,_},ack,D = #data{}) ->
    case priority(self(),D#data.other) of
        true ->
            try
                notice(D,"asking for commit",ready_commit = ask_commit(D#data.other),"ordering commit",ok = do_commit(D#data.other),"committing...",commit(D),% Sus
                {stop,normal,D}
            catch Class:Reason ->
                notice(D,"commit failed",{Class,Reason},D}
            end;
        false ->
            keep_state_and_data
    end;
ready({call,ask_commit,D) ->
    notice(D,"replying to ask commit",% Sus
    {keep_state_and_data,ready_commit}};
ready({call,do_commit,% Sus
    {stop,ok,D};
ready(_,ready),keep_state_and_data.

% Private functions
add (Item,Items) -> [Item | Items].

remove(Item,Items) -> Items -- [Item].

notice(#data{name = N},Str,Args) -> io:format("~s: " ++ Str ++ "~n",[N | Args]).

unexpected(Msg,State) -> io:format("~p received unknown event ~p while in state ~p~n",[self(),Msg,State]).

priority(OwnPid,OtherPid) when OwnPid > OtherPid -> true;
priority(OwnPid,OtherPid) when OwnPid < OtherPid -> false.

commit(D = #data{}) -> io:format("Transaction completed for ~s. Items sent are:~n~p,~n received are:~n~p.~nThis operation should have some atomic save in a database.~n",[D#data.name,D#data.own_items,D#data.other_items]).

trade_calls.erl

-module(trade_calls).
-export([main_ab/0,main_cd/0,main_ef/0]).

%% test a little bit of everything and also deadlocks on ready state
%% -- leftover messages possible on race conditions on ready state
main_ab() ->
    S = self(),PidCliA = spawn(fun() -> a(S) end),receive PidA -> PidA end,spawn(fun() -> b(PidA,PidCliA) end).

a(Parent) ->
    {ok,Pid} = trade_statem:start_link("Carl"),Parent ! Pid,io:format("Spawned Carl: ~p~n",[Pid]),%sys:trace(Pid,true),timer:sleep(800),trade_statem:accept_trade(Pid),timer:sleep(400),io:format("~p~n",[trade_statem:ready(Pid)]),timer:sleep(1000),trade_statem:make_offer(Pid,"horse"),"sword"),io:format("a synchronizing~n"),sync2(),trade_statem:ready(Pid),timer:sleep(200),timer:sleep(1000).

b(PidA,PidCliA) ->
    {ok,Pid} = trade_statem:start_link("Jim"),io:format("Spawned Jim: ~p~n",timer:sleep(500),trade_statem:trade(Pid,PidA),"boots"),trade_statem:retract_offer(Pid,"shotgun"),io:format("b synchronizing~n"),sync1(PidCliA),%% race condition!
    trade_statem:ready(Pid),timer:sleep(1000).

%% force a race condition on cd trade negotiation
main_cd() ->
    S = self(),PidCliC = spawn(fun() -> c(S) end),receive PidC -> PidC end,spawn(fun() -> d(S,PidC,PidCliC) end),receive PidD -> PidD end,PidCliC ! PidD.
    
c(Parent) ->
    {ok,Pid} = trade_statem:start_link("Marc"),io:format("Spawned Marc: ~p~n",PidD),%% no need to accept_trade thanks to the race condition
    timer:sleep(200),"car"),timer:sleep(600),trade_statem:cancel(Pid),timer:sleep(1000).

d(Parent,PidCliC) ->
    {ok,Pid} = trade_statem:start_link("Pete"),sync1(PidCliC),PidC),"manatee"),timer:sleep(100),timer:sleep(1000).

main_ef() ->
    S = self(),PidCliE = spawn(fun() -> e(S) end),receive PidE -> PidE end,spawn(fun() -> f(PidE,PidCliE) end).

e(Parent) ->
    {ok,timer:sleep(1000).

f(PidE,PidCliE) ->
    {ok,PidE),sync1(PidCliE),timer:sleep(1000).

%%% Utils
sync1(Pid) ->
    Pid ! self(),receive ack -> ok end.

sync2() ->
    receive
        From -> From ! ack
    end.

感谢您抽出宝贵时间!

解决方法

存在一些错误,您可以对它们运行 diff 以获取差异。您可能会在交易结束时看到一些 otp 报告,但它们完全在意料之中。

trade_statem.erl

-module(trade_statem).
-behaviour(gen_statem).

% Public API
-export([start/1,start_link/1,trade/2,accept_trade/1,make_offer/2,retract_offer/2,ready/1,cancel/1]).
% gen_statem callbacks
-export([init/1,callback_mode/0,code_change/4,terminate/3]).
% Custom states
-export([idle/3,idle_wait/3,negotiate/3,wait/3,ready/3]).

% Data record
-record(data,{name = "",other,own_items = [],other_items = [],monitor,from}).

% Public API
start(Name) -> gen_statem:start(?MODULE,[Name],[]).

start_link(Name) -> gen_statem:start_link(?MODULE,[]).

trade(OwnPid,OtherPid) -> gen_statem:call(OwnPid,{negotiate,OtherPid},30000).

accept_trade(OwnPid) -> gen_statem:call(OwnPid,accept_negotiate).

make_offer(OwnPid,Item) -> gen_statem:cast(OwnPid,{make_offer,Item}).

retract_offer(OwnPid,{retract_offer,Item}).

ready(OwnPid) -> gen_statem:call(OwnPid,ready,infinity).

cancel(OwnPid) -> gen_statem:stop(OwnPid).

% Client-To-Client API
ask_negotiate(OtherPid,OwnPid) -> gen_statem:cast(OtherPid,{ask_negotiate,OwnPid}).

accept_negotiate(OtherPid,{accept_negotiate,OwnPid}).

do_offer(OtherPid,Item) -> gen_statem:cast(OtherPid,{do_offer,Item}).

undo_offer(OtherPid,{undo_offer,Item}).

are_you_ready(OtherPid) -> gen_statem:cast(OtherPid,are_you_ready).

not_yet(OtherPid) -> gen_statem:cast(OtherPid,not_yet).

am_ready(OtherPid) -> gen_statem:cast(OtherPid,ready).

ack_trans(OtherPid) -> gen_statem:cast(OtherPid,ack).

ask_commit(OtherPid) -> gen_statem:call(OtherPid,ask_commit).

do_commit(OtherPid) -> gen_statem:call(OtherPid,do_commit).

% gen_statem API
init(Name) -> {ok,idle,#data{name = Name}}.

callback_mode() -> state_functions.

code_change(_,StateName,Data,_) -> {ok,Data}.

terminate(normal,D = #data{}) -> notice(D,"FSM leaving.",[]);
terminate(_,_,_) -> ok.

% Custom states
idle(cast,D = #data{}) ->
    Ref = monitor(process,OtherPid),notice(D,"~p asked for a trade negotiation",[OtherPid]),{next_state,idle_wait,D#data{other = OtherPid,monitor = Ref}};
idle({call,From},D = #data{}) ->
    ask_negotiate(OtherPid,self()),"asking user ~p for a trade",Ref = monitor(process,monitor = Ref,from = From}};
idle(_,Event,_) ->
    unexpected(Event,idle),keep_state_and_data.

idle_wait(cast,D = #data{other = OtherPid}) ->
    gen_statem:reply(D#data.from,ok),"starting negotiation",[]),negotiate,D};
idle_wait(cast,D};
idle_wait({call,accept_negotiate,D = #data{other = OtherPid}) ->
    accept_negotiate(OtherPid,"accepting negotiation",D,{reply,From,ok}};
idle_wait(_,idle_wait),keep_state_and_data.

negotiate(cast,Item},D = #data{own_items = OwnItems}) ->
    do_offer(D#data.other,Item),"offering ~p",[Item]),{keep_state,D#data{own_items = add(Item,OwnItems)}};
negotiate(cast,D = #data{own_items = OwnItems}) ->
    undo_offer(D#data.other,"cancelling offer on ~p",D#data{own_items = remove(Item,D = #data{other_items = OtherItems}) ->
    notice(D,"other player offering ~p",D#data{other_items = add(Item,OtherItems)}};
negotiate(cast,"Other player cancelling offer on ~p",D#data{other_items = remove(Item,are_you_ready,D = #data{other = OtherPid}) ->
    io:format("Other user ready to trade~n"),"Other user ready to transfer goods:~nYou get ~p,the other side gets ~p",[D#data.other_items,D#data.own_items]),not_yet(OtherPid),keep_state_and_data;
negotiate({call,D = #data{other = OtherPid}) ->
    are_you_ready(OtherPid),"asking if ready,waiting",wait,D#data{from = From}};
negotiate(EventType,Data) ->
    unexpected({EventType,Data},negotiate),keep_state_and_data.

wait(cast,D = #data{other_items = OtherItems}) ->
    gen_statem:reply(D#data.from,offer_changed),"Other side offering ~p",OtherItems)}};
wait(cast,"other side cancelling offer ~p",D = #data{}) ->
    am_ready(D#data.other),"asked if ready,and I am. Waiting for same reply",keep_state_and_data;
wait(cast,not_yet,D = #data{}) ->
    notice(D,"Other not ready yet",ack_trans(D#data.other),gen_statem:reply(D#data.from,"other side is ready. Moving to ready state",D};
wait(_,wait),keep_state_and_data.

ready(cast,ack,D = #data{}) ->
    case priority(self(),D#data.other) of
        true ->
            try
                notice(D,"asking for commit",ready_commit = ask_commit(D#data.other),"ordering commit",ok = do_commit(D#data.other),"committing...",commit(D),% Sus
                {stop,normal,D}
            catch Class:Reason ->
                notice(D,"commit failed",{Class,Reason},D}
            end;
        false ->
            keep_state_and_data
    end;
ready({call,ask_commit,D) ->
    notice(D,"replying to ask commit",% Sus
    {keep_state_and_data,ready_commit}};
ready({call,_},do_commit,% Sus
    {stop,ok,D};
ready(_,ready),keep_state_and_data.

% Private functions
add (Item,Items) -> [Item | Items].

remove(Item,Items) -> Items -- [Item].

notice(#data{name = N},Str,Args) -> io:format("~s: " ++ Str ++ "~n",[N | Args]).

unexpected(Msg,State) -> io:format("~p received unknown event ~p while in state ~p~n",[self(),Msg,State]).

priority(OwnPid,OtherPid) when OwnPid > OtherPid -> true;
priority(OwnPid,OtherPid) when OwnPid < OtherPid -> false.

commit(D = #data{}) -> io:format("Transaction completed for ~s. Items sent are:~n~p,~n received are:~n~p.~nThis operation should have some atomic save in a database.~n",[D#data.name,D#data.own_items,D#data.other_items]).

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...