erlang - 使用 Elixir Genstage 的运行时动态计算图

标签 erlang elixir genstage

我希望能够在运行时动态更改计算管道,但似乎 GenStage 需要在编译时通过 subscribe_to: [...] 定义计算图。机制。有没有办法创建动态计算图?例如在下面,我想在运行时在我的管道图中的“减 7”和“减 4”顶点之间切换。
enter image description here
这可以使用 GenStage 吗?我可能会有非常复杂的管道,所以我需要一个能够以复杂方式扩展以更改图形的解决方案,而不是临时解决方案,例如在这种情况下,参数化要减去的整数。我希望能够添加或删除整个子树,在子树之间切换,并将节点添加到图中,包括将它们拼接到包括主树在内的任何子树的中间。
请参阅下面的编辑
这是最初的生产者:

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end
end
这是生产者消费者之一:
defmodule GenstageTest.PcTimesFive do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 * 5))
    {:noreply, numbers, state}
  end
end
这是最终消费者:
defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect({self(), event, state})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end
一世
全部以 Elixir School Genstage tutorial 为模型。 .
所有模块和 mix.exs 都可以是 found on github .
在@AquarHEAD L 部分回答后 3 天后编辑。
我设法让运行时订阅正常工作。下面分别修改一些producer、producer_consumers、consumers:
制片人:
defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end

  def handle_info({:doprint}, state) do
    IO.puts "yep"
    {:noreply, [], state}
  end

  def handle_info({:cancel, sublink}, state) do
    GenStage.cancel sublink, []
    {:noreply, [], state}
  end

end
生产者_消费者:
defmodule GenstageTest.PcAddOne do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 + 1))
    {:noreply, numbers, state}
  end
end
消费者:
defmodule GenstageTest.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect event
      #File.write("/home/tbrowne/scratch/output.txt", 
      #  Kernel.inspect(event) <> " ", [:append])
      :timer.sleep(100)
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end
现在,一旦这些都在 lib 目录中可用(记得将 {:gen_stage, "~> 0.11"} 添加到您的 mix.exs deps),或者复制并粘贴到 IEX 中,那么以下内容将完美运行:
{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)
现在的问题是,我仍然不知道如何取消订阅。有一个cancel function还有一个stop function . GenStage.stop(c)例如似乎什么也没做,而我在 GenStage.cancel/3 的各种尝试只给出错误。
回顾一下,我现在需要的是能够停止某些阶段并用其他阶段替换它们。取消订阅的语法是什么,从哪里调用它?由于没有具体示例,因此在文档中没有很好地解释。

最佳答案

您绝对可以在运行时更改管道,checkout the first example in GenStage documentation , you can also use the :manual mode to fine control the demand .还有API to cancel subscription .我认为这些足以动态管理 GenStage 管道。

关于erlang - 使用 Elixir Genstage 的运行时动态计算图,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52968929/

相关文章:

date - 在 erlang 函数的保护语句中使用日期

登录 Erlang : Why there is no library to send an email when an error happens?

web-services - Elixir - Simple Plug 示例在每次请求时两次调用调用方法

websocket - 测试客户端 channel 是否收到消息

postgresql - Docker 撰写链接似乎不起作用

elixir - 如何测试 Elixir GenStage Consumer?

erlang - 不在 EUnit 中输出异常堆栈跟踪

erlang shell 模块功能帮助