erlang - Erlang服务器与端口连接以向Java应用程序发送和接收Json文件

标签 erlang erlang-ports

我试图用Erlang将服务器实现到我的Java应用程序。
似乎我的服务器正在运行,但仍然充满错误和死点。
我需要接收Java应用程序解析为地图的JSON文件,并将其发送回所有客户端,包括上载该文件的客户端。
同时,我需要跟踪谁发出了请求以及发送了消息的哪一部分,以防万一出现问题,客户端应该从这一点而不是从头开始重新启动。除非客户端离开应用程序,否则它应该重新启动。

我的三段代码如下:

app.erl

-module(erlServer_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

%%%===================================================================
%%% Application callbacks
%%%===================================================================

start(_StartType, _StartArgs) ->
    erlServer_sup:start_link().


stop(_State) ->
    ok.

该supervisor.erl:
-module(erlServer_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1, start_socket/0, terminate_socket/0, empty_listeners/0]).

-define(SERVER, ?MODULE).

%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%%
%% @end
%%--------------------------------------------------------------------
start_link() ->
  supervisor:start_link({local, ?SERVER}, ?MODULE, []).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

init([]) -> % restart strategy 'one_for_one': if one goes down only that one is restarted
  io:format("starting...~n"),
  spawn_link(fun() -> empty_listeners() end),
  {ok,
    {{one_for_one, 5, 30}, % The flag - 5 restart within 30 seconds
      [{erlServer_server, {erlServer_server, init, []}, permanent, 1000, worker, [erlServer_server]}]}}.


%%%===================================================================
%%% Internal functions
%%%===================================================================

start_socket() ->
  supervisor:start_child(?MODULE, []).

terminate_socket() ->
  supervisor:delete_child(?MODULE, []).

empty_listeners() ->
  [start_socket() || _ <- lists:seq(1,20)],
  ok.

server.erl :(我有很多调试io:format。)
-module(erlServer_server).

%% API
-export([init/0, start_server/0]).

%% Defining the port used.
-define(PORT, 8080).

%%%===================================================================
%%% API
%%%===================================================================

init() ->
  start_server().

%%%===================================================================
%%% Server callbacks
%%%===================================================================

start_server() ->
  io:format("Server started.~n"),
  Pid = spawn_link(fun() ->
    {ok, ServerSocket} = gen_tcp:listen(?PORT, [binary, {packet, 0}, 
      {reuseaddr, true}, {active, true}]),
    io:format("Baba~p", [ServerSocket]),
    server_loop(ServerSocket) end),
  {ok, Pid}.

server_loop(ServerSocket) ->
  io:format("Oba~p", [ServerSocket]),
  {ok, Socket} = gen_tcp:accept(ServerSocket),
  Pid1 = spawn(fun() -> client() end),
  inet:setopts(Socket, [{packet, 0}, binary, 
    {nodelay, true}, {active, true}]),
  gen_tcp:controlling_process(Socket, Pid1), 
  server_loop(ServerSocket).

%%%===================================================================
%%% Internal functions
%%%===================================================================

client() ->
  io:format("Starting client. Enter \'quit\' to exit.~n"),
  Client = self(),
  {ok, Sock} = gen_tcp:connect("localhost", ?PORT, [{active, false}, {packet, 2}]),
  display_prompt(Client),
  client_loop(Client, Sock).

%%%===================================================================
%%% Client callbacks
%%%===================================================================

send(Sock, Packet) ->
  {ok, Sock, Packet} = gen_tcp:send(Sock, Packet),
  io:format("Sent ~n").

recv(Packet) ->
  {recv, ok, Packet} = gen_tcp:recv(Packet),
  io:format("Received ~n").

display_prompt(Client) ->
  spawn(fun () ->
    Packet = io:get_line("> "),
    Client ! {entered, Packet}
        end),
  Client ! {requested},
  ok.


client_loop(Client, Sock) ->
  receive
    {entered, "quit\n"} ->
      gen_tcp:close(Sock);
    {entered, Packet}        ->
      % When a packet is entered we receive it,
      recv(Packet),
      display_prompt(Client),
      client_loop(Client, Sock);
    {requested, Packet}        ->
      % When a packet is requested we send it,
      send(Sock, Packet),
      display_prompt(Client),
      client_loop(Client, Sock);
    {error, timeout} ->
      io:format("Send timeout, closing!~n", []),
      Client ! {self(),{error_sending, timeout}},
      gen_tcp:close(Sock);
    {error, OtherSendError} ->
      io:format("Some other error on socket (~p), closing", [OtherSendError]),
      Client ! {self(),{error_sending, OtherSendError}},
      gen_tcp:close(Sock)
  end.

这是我正在使用的第一台服务器,中间可能迷路了。当我运行时,它似乎正在工作,但是挂起了。有人能帮我吗?我的本地主机从不加载任何会永远加载的东西。

我的Java应用程序如何从同一端口接收它?

我必须使用Erlang,并且必须使用端口连接到Java应用程序。

感谢您的帮助!

最佳答案

让我们稍微修改一下...

首先:命名

我们不在Erlang中使用camelCase。之所以令人困惑,是因为大写的变量名和小写(或单引号)原子含义不同。另外,模块名称必须与文件名称相同,这会在不区分大小写的文件系统上引起问题。

另外,我们确实想要一个比“服务器”更好的名称。服务器在这样的系统中可能意味着很多事情,尽管整个系统可能是用Erlang编写的服务,但这并不一定意味着我们可以将“服务器”中的所有内容称为超级服务器!令人困惑。我现在将您的项目命名为“ES”。因此,您将拥有es_appes_sup等。稍后,当我们要开始定义新模块时,这将派上用场,也许其中一些模块称为“服务器”,而不必到处都写“server_server”。

第二:输入数据

一般来说,我们希望将参数传递给函数,而不是将文字(或更糟的是,宏重写)埋入代码中。如果我们将拥有幻数和常数,那么我们会尽力将它们放入配置文件中,以便我们可以access them in a programmatic way,甚至更好,让我们在初始启动调用中将它们用作下级流程的参数,以便我们可以对系统的行为(一次编写)只能通过弄乱主应用程序模块中的启动调用函数来实现。

-module(es).
-behaviour(application).

-export([listen/1, ignore/0]).
-export([start/0, start/1]).
-export([start/2, stop/1]).

listen(PortNum) ->
    es_client_man:listen(PortNum).

ignore() ->
    es_client_man:ignore().

start() ->
    ok = application:ensure_started(sasl),
    ok = application:start(?MODULE),
    io:format("Starting es...").


start(Port) ->
    ok = start(),
    ok = es_client_man:listen(Port),
    io:format("Startup complete, listening on ~w~n", [Port]).

start(normal, _Args) ->
    es_sup:start_link().

stop(_State) ->
    ok.

我在上面添加了一个start / 1函数以及一个start / 0,一个listen / 1和一个ignore / 0,稍后您将在es_client_man中再次看到它们。这些大多是方便包装器,用于您可以更明确地调用的内容,但可能不想一直输入。

这个应用程序模块通过让应用程序主控为我们启动项目(通过调用application:start/1),然后下一行调用erl_server_server来告诉它开始监听,从而开始一切。在早期的开发中,我发现这种方法比将自动启动埋在各处的每个组件都有用,后来,它为我们提供了一种非常简单的方法来编写可以打开和关闭各种组件的外部接口。

啊,也...我们将以一个真实的Erlang应用程序开始它,因此我们需要ebin/中的应用程序文件(或者如果您使用erlang.mk或类似的app.src文件, src/):

ebin / es.app看起来像这样:
{application,es,
             [{description,"An Erlang Server example project"},
              {vsn,"0.1.0"},
              {applications,[stdlib,kernel,sasl]},
              {modules,[es,
                        es_sup,
                          es_clients,
                            es_client_sup,
                              es_client,
                            es_client_man]},
              {mod,{es,[]}}]}.
modules下的列表实际上反映了监督树的布局,如下所示。

上面的start / 2函数现在断言我们只会在normal模式下启动(可能适当,也可能不合适),而忽略启动args(也可能适当)。

第三名:The Supervision Tree
-module(es_sup).
-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    RestartStrategy = {one_for_one, 1, 60},
    Clients   = {es_clients,
                 {es_clients, start_link, []},
                 permanent,
                 5000,
                 supervisor,
                 [es_clients]},
    Children  = [Clients],
    {ok, {RestartStrategy, Children}}.

然后...
-module(es_clients).
-behavior(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, none).

init(none) ->
    RestartStrategy = {rest_for_one, 1, 60},
    ClientSup = {es_client_sup,
                 {es_client_sup, start_link, []},
                 permanent,
                 5000,
                 supervisor,
                 [es_client_sup]},
    ClientMan = {es_client_man,
                 {es_client_man, start_link, []},
                 permanent,
                 5000,
                 worker,
                 [es_client_man]},
    Children  = [ClientSup, ClientMan],
    {ok, {RestartStrategy, Children}}.

哇!这里发生了什么?!?嗯,es_sup是主管,而不是只会产生其他一次性事物的一次性事物。 (对supervisors的误解是您核心问题的一部分。)

主管很无聊。主管应该很无聊。他们作为代码阅读者真正所做的只是内部监督树的结构。他们在OTP结构方面为我们所做的工作非常重要,但是他们不需要我们编写任何程序代码,只需声明它应具有的子代即可。我们在这里实现的称为service -> worker结构。因此,我们为您的整个应用程序提供了一个名为es_sup的顶级主管。在此之下,我们暂时拥有一个名为es_clients的服务组件。

es_clients进程也是主管。这样做的原因是为客户端连接部分定义一种明确的方式,以使其不影响以后系统其余部分中可能存在的任何正在进行的状态。仅仅接受来自客户端的连接是没有用的-当然,在其他地方,某些状态很重要,例如与某个Java节点的长期连接等等。那将是一个单独的服务组件,可能称为es_java_nodes,并且该程序的那部分将从其自己的独立管理程序开始。这就是为什么它被称为“监督树”而不是“监督列表”的原因。

回到客户...我们将吸引客户。这就是为什么我们称它们为“客户端”的原因,因为从此Erlang系统的角度来看,连接的事物是客户端,而接受这些连接的过程将客户端抽象化,因此我们可以将每个连接处理程序视为客户端本身,因为确切地代表了什么。如果以后再连接到上游服务,则无论它们是抽象的,我们都想调用它们,以使我们在系统内的语义保持理智。

然后,您可以考虑“es_client向es_java_node发送一条消息以查询[thingy]”,而不是像“server_server要求java_server_client将server_server的service_server一样”保持正直(这实际上是愚蠢的事情,如果您不会从内部系统的角度保持命名原则的正确性。

等等等等等等...

因此,这是es_client_sup:
-module(es_client_sup).
-behaviour(supervisor).

-export([start_acceptor/1]).
-export([start_link/0]).
-export([init/1]).

start_acceptor(ListenSocket) ->
    supervisor:start_child(?MODULE, [ListenSocket]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, none).

init(none) ->
    RestartStrategy = {simple_one_for_one, 1, 60},
    Client    = {es_client,
                 {es_client, start_link, []},
                 temporary,
                 brutal_kill,
                 worker,
                 [es_client]},
    {ok, {RestartStrategy, [Client]}}.

您是否正在挑选一种模式?当我说“主管应该很无聊...”时,我并不是在开玩笑:-)请注意,这里实际上是在传递参数,并且已经定义了接口函数。因此,如果需要套接字接受器来启动,我们可以在逻辑上调用该位置。

第四:客户服务本身

让我们看一下客户经理:
-module(es_client_man).
-behavior(gen_server).

-export([listen/1, ignore/0]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

-record(s, {port_num = none :: none | inet:port_number(),
            listener = none :: none | gen_tcp:socket()}).

listen(PortNum) ->
    gen_server:call(?MODULE, {listen, PortNum}).

ignore() ->
    gen_server:cast(?MODULE, ignore).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).

init(none) ->
    ok = io:format("Starting.~n"),
    State = #s{},
    {ok, State}.

handle_call({listen, PortNum}, _, State) ->
    {Response, NewState} = do_listen(PortNum, State),
    {reply, Response, NewState};
handle_call(Unexpected, From, State) ->
    ok = io:format("~p Unexpected call from ~tp: ~tp~n", [self(), From, Unexpected]),
    {noreply, State}.

handle_cast(ignore, State) ->
    NewState = do_ignore(State),
    {noreply, NewState};
handle_cast(Unexpected, State) ->
    ok = io:format("~p Unexpected cast: ~tp~n", [self(), Unexpected]),
    {noreply, State}.

handle_info(Unexpected, State) ->
    ok = io:format("~p Unexpected info: ~tp~n", [self(), Unexpected]),
    {noreply, State}.

code_change(_, State, _) ->
    {ok, State}.

terminate(_, _) ->
    ok.

do_listen(PortNum, State = #s{port_num = none}) ->
    SocketOptions =
        [{active,    once},
         {mode,      binary},
         {keepalive, true},
         {reuseaddr, true}],
    {ok, Listener} = gen_tcp:listen(PortNum, SocketOptions),
    {ok, _} = es_client:start(Listener),
    {ok, State#s{port_num = PortNum, listener = Listener}};
do_listen(_, State = #s{port_num = PortNum}) ->
    ok = io:format("~p Already listening on ~p~n", [self(), PortNum]),
    {{error, {listening, PortNum}}, State}.

do_ignore(State = #s{listener = none}) ->
    State;
do_ignore(State = #s{listener = Listener}) ->
    ok = gen_tcp:close(Listener),
    State#s{listener = none}.

嗯,这是怎么回事?这里的基本思想是,我们有一个客户整个概念的服务监督者(如上所述,es_clients),然后我们拥有simple_one_for_one来处理刚刚发生的任何客户端(es_client_sup),这里我们拥有子系统的管理接口。该管理器的全部工作是跟踪我们正在侦听的端口,并拥有当前正在侦听的套接字(如果当前处于打开状态)。请注意,可以很容易地重写此代码,以允许同时侦听任意数量的端口,或跟踪所有活动的客户端,或任何其他端口。您可能想要做的事实际上没有任何限制。

那么我们如何启动可以接受连接的客户端呢?通过告诉它们生成并监听我们作为参数传入的监听套接字。再来看一下上面的es_client_sup。我们传入一个空列表作为它的第一个参数。当我们调用它的start_link函数时,将会发生的事情是,作为列表传递的其他所有内容都将被添加到整个参数列表中。在这种情况下,我们将传入监听套接字,因此它将以[ListenSocket]参数开始。

每当客户端侦听器接受连接时,下一步便是派生其后继者,并为其提供原始的ListenSocket参数。啊,生命的奇迹。
-module(es_client).

-export([start/1]).
-export([start_link/1, init/2]).
-export([system_continue/3, system_terminate/4,
         system_get_state/1, system_replace_state/2]).

-record(s, {socket = none :: none | gen_tcp:socket()}).

start(ListenSocket) ->
    es_client_sup:start_acceptor(ListenSocket).

start_link(ListenSocket) ->
    proc_lib:start_link(?MODULE, init, [self(), ListenSocket]).

init(Parent, ListenSocket) ->
    ok = io:format("~p Listening.~n", [self()]),
    Debug = sys:debug_options([]),
    ok = proc_lib:init_ack(Parent, {ok, self()}),
    listen(Parent, Debug, ListenSocket).

listen(Parent, Debug, ListenSocket) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} ->
            {ok, _} = start(ListenSocket),
            {ok, Peer} = inet:peername(Socket),
            ok = io:format("~p Connection accepted from: ~p~n", [self(), Peer]),
            State = #s{socket = Socket},
            loop(Parent, Debug, State);
        {error, closed} ->
            ok = io:format("~p Retiring: Listen socket closed.~n", [self()]),
            exit(normal)
     end.

loop(Parent, Debug, State = #s{socket = Socket}) ->
    ok = inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"bye\r\n">>} ->
            ok = io:format("~p Client saying goodbye. Bye!~n", [self()]),
            ok = gen_tcp:send(Socket, "Bye!\r\n"),
            ok = gen_tcp:shutdown(Socket, read_write),
            exit(normal);
        {tcp, Socket, Message} ->
            ok = io:format("~p received: ~tp~n", [self(), Message]),
            ok = gen_tcp:send(Socket, ["You sent: ", Message]),
            loop(Parent, Debug, State);
        {tcp_closed, Socket} ->
            ok = io:format("~p Socket closed, retiring.~n", [self()]),
            exit(normal);
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, Debug, State);
        Unexpected ->
            ok = io:format("~p Unexpected message: ~tp", [self(), Unexpected]),
            loop(Parent, Debug, State)
    end.

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, _State) ->
    exit(Reason).

system_get_state(Misc) -> {ok, Misc}.

system_replace_state(StateFun, Misc) ->
    {ok, StateFun(Misc), Misc}.

请注意,上面我们已经编写了Pure Erlang进程,该进程以gen_server的方式与OTP集成,但是具有仅处理套接字的更直接的循环。这意味着我们没有gen_server调用/广播机制(可能需要自己实现这些机制,但通常仅使用asynch是更好的套接字处理方法)。这是started through the proc_lib module,专门用于引导任意类型的OTP兼容进程。

如果您要使用主管,那么您真的想一直使用并正确使用OTP。

因此,我们现在上面有一个非常基本的Telnet回显服务。与其在服务器模块中编写一个神奇的客户端进程以使您的大脑陷入困境(Erlanger不喜欢他们的大脑陷入困境),您可以开始执行此操作,告诉它侦听某个端口,然后自己远程登录该端口并查看结果。

我添加了一些脚本来自动启动程序,但基本上取决于codemake模块。您的项目的布局像
es/
   Emakefile
   ebin/es.app
   src/*.erl

Emakefile的内容将使我们更轻松。在这种情况下,只有一行:
enter code here {“src / *”,[debug_info,{i,“include /”},{outdir,“ebin /”}]}。

在主es/目录中,如果我们输入erl shell,我们现在可以执行...
1> code:add_patha("ebin").
true
2> make:all().
up_to_date
3> es:start().

您会在屏幕上看到一堆SASL启动报告。

从那里开始es:listen(5555):
4> es:listen(5555).
<0.94.0> Listening.
ok

凉!因此看来一切正常。让我们尝试远程登录自己:
ceverett@changa:~/vcs/es$ telnet localhost 5555
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello es thingy
You sent: Hello es thingy
Yay! It works!
You sent: Yay! It works!
bye
Bye!
Connection closed by foreign host.

另一面是什么样的?
<0.96.0> Listening.
<0.94.0> Connection accepted from: {{127,0,0,1},60775}
<0.94.0> received: <<"Hello es thingy\r\n">>
<0.94.0> received: <<"Yay! It works!\r\n">>
<0.94.0> Client saying goodbye. Bye!

啊,在这里我们看到了“听”。来自下一个侦听器<0.96.0>的消息,该消息由第一个<0.94.0>产生。

并发连接如何?
<0.97.0> Listening.
<0.96.0> Connection accepted from: {{127,0,0,1},60779}
<0.98.0> Listening.
<0.97.0> Connection accepted from: {{127,0,0,1},60780}
<0.97.0> received: <<"Screen 1\r\n">>
<0.96.0> received: <<"Screen 2\r\n">>
<0.97.0> received: <<"I wonder if I could make screen 1 talk to screen 2...\r\n">>
<0.96.0> received: <<"Time to go\r\n">>
<0.96.0> Client saying goodbye. Bye!
<0.97.0> Client saying goodbye. Bye!

哦,尼阿托并发服务器!

在这里,您可以调整工具并进行基本结构更改,以完成您可能想像的任何事情。

请注意,此代码缺少很多内容。我已经除去了edoc标记和typespecs(由Dialyzer使用,它是大型项目中的重要工具),这对于生产系统来说是一件很糟糕的事情。

有关生产型项目的示例,该项目足够小以至于无法动弹(只有3个模块+完整的文档),请参阅zuuid。尽管它恰好是功能齐全的UUID生成器,但它是专门为用作代码示例而编写的。

原谅您(简短得多)问题的答案如此之多。这不时出现,我想写一个完整的网络套接字服务示例,以后我可以推荐给它,当我读到你的问题时,碰巧渴望这样做。 :-)希望SO纳粹宽恕这种严重的违法行为。

关于erlang - Erlang服务器与端口连接以向Java应用程序发送和接收Json文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46937566/

相关文章:

Erlang 相当于 Haskell 的 Data.List 等?

go - Erlang/Golang 端口示例中的缓冲区大小

Erlang:钩子(Hook)与 gen_event

c++ - 通过端口从 Erlang 调用 C 函数的最快和最简单的方法是什么?

go - 将 Erlang-C 端口示例转换为 Erlang-Golang

erlang - 打开 C++ 共享库作为 Erlang 端口驱动程序时出错

Erlang 机器立即停止(发行版名称冲突?)。由于 OnFail 设置为忽略,因此服务未重新启动

concurrency - 了解 Erlang 中通用服务器实现中消息的工作流程