Elixir:如何作为客户端使用服务器发送的事件流?

标签 elixir server-sent-events

我的 Phoenix 应用程序需要与第 3 方 API 集成,该 API 通过 Server-Sent Events 以流格式发送数据.我是第一次学习 SSE; SSE 流看起来很像 websocket,除了它们是单向的并且更简单。

我看到很多关于如何从作为服务器的 Elixir 应用发送 SSE 数据的建议(例如 123) ) - 但我没有看到关于 Elixir 应用程序如何消费 SSE 数据流作为客户端的建议。我看到的文章似乎假设客户端是浏览器中的 JS,但我知道 Ruby SSE 客户端存在(12)所以我认为没有强大的技术原因可以解释为什么 Elixir 应用程序不能作为 SSE 客户。

在 Elixir 应用中使用 SSE 数据的最简单方法是什么?如果有任何指示和资源,我将不胜感激。

最佳答案

我找到了一个 couple promising SSE 客户端库,但我被奇怪的连接错误关闭了。然后经过一番挖掘,我发现 HTTPoison (可能还有 HTTPotion)可以单独连接到一个服务器发送的事件流,只是没有在任何地方记录。

这是一个快速的 iex 示例(使用 Elixir v1.8 和 HTTPoison v1.5):

> url = "https://some-domain.com/some-server-sent-event-stream/"
> HTTPoison.get!(url, [], [recv_timeout: :infinity, stream_to: self()])
  => %HTTPoison.AsyncResponse{id: #Reference<0.2736682462.4075814917.25838>}
> Process.sleep(10_000) # wait for a few events to come in
> flush()
  => %HTTPoison.AsyncChunk{
        chunk: "event:poke\ndata:{\"kittens\":3}\n\n",
        id: #Reference<0.2736682462.4075814917.25838>
      }
      %HTTPoison.AsyncChunk{
        chunk: "event:poke\ndata:{\"kittens\":3}\n\n",
        id: #Reference<0.2736682462.4075814917.25838>
      }
      %HTTPoison.AsyncChunk{
        chunk: "event:poke\ndata:{\"kittens\":4}\n\n",
        id: #Reference<0.2736682462.4075814917.25838>
      }
      :ok

但在现实世界中,您可能希望使用 GenServer 来处理每条消息、处理断开连接等——这基本上就是上面两个链接库的目的。一个简单的客户端可能如下所示:

# Usage:
# > SseClient.start("https://some-domain.com/some-server-sent-event-stream/")
#
defmodule SseClient do
  use GenServer

  def start(url) do
    GenServer.start_link(__MODULE__, url: url)
  end

  def init([url: url]) do
    IO.puts "Connecting to stream..."
    HTTPoison.get!(url, [], [recv_timeout: :infinity, stream_to: self()])
    {:ok, nil}
  end

  def handle_info(%HTTPoison.AsyncChunk{chunk: chunk}, _state) do
    # My use case assumes that each message contains two rows (event: and data:)
    case Regex.run(~r/^event:(\w+)\ndata:({.+})\n\n$/, chunk) do
      [_, event, data] ->
        _json = Jason.decode!(data)
        case event do
          "poke" -> IO.puts "Poke received: #{data}"
          "data" -> IO.puts "Data received: #{data}"
        end
      nil ->
        raise "Don't know how to parse received chunk: \"#{chunk}\""
    end

    {:noreply, nil}
  end

  # In addition to message chunks, we also may receive status changes etc.
  def handle_info(%HTTPoison.AsyncStatus{} = status, _state) do
    IO.puts "Connection status: #{inspect status}"
    {:noreply, nil}
  end

  def handle_info(%HTTPoison.AsyncHeaders{} = headers, _state) do
    IO.puts "Connection headers: #{inspect headers}"
    {:noreply, nil}
  end
end

关于Elixir:如何作为客户端使用服务器发送的事件流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67739157/

相关文章:

javascript - HTML5 服务器发送的事件 : How to set withCredentials option?

php - 服务器发送的事件 PHP 阻塞流

javascript - 如何只触发一次服务器发送的事件?

elixir - ExUnit-在所有测试之前运行一次数据库设置代码

elixir - 如何避免在 iex 中出现 ecto 日志?

elixir - 如何检查 Elixir 列表或元组中是否存在某个项目?

elixir - 条件和模式与 map 匹配

javascript - 如何查看有关服务器发送事件的错误?

javascript - 服务器端事件 (SSE) 未到达客户端

erlang - 当客户端关闭连接时,Cowboy/Ranch 会终止处理程序进程