elixir - 如何测试 Elixir GenStage Consumer?

标签 elixir producer-consumer consumer ex-unit genstage

我找到了一些关于如何测试生产者的资源,但是我找不到任何可以说明如何测试消费者的资源。

在生产者中,我创建了一个虚拟消费者,一切正常,但是在消费者中,我在测试方面遇到了困难。

defmodule DataProducer do
      use GenStage

      def start_link([]) do
        GenStage.start_link(__MODULE__, 0, name: __MODULE__)
      end

      # {:queue.new, demand, size}
      def init(counter) do
        {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
      end

      def handle_demand(demand, state) do 
        events = Enum.to_list(state..state + demand + 1)
        # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
        {:noreply, events, (state + demand)}
      end
    end

生产者测试:

 defmodule DataProducerTest do
      use ExUnit.Case

      test "check the results" do
        {:ok, stage} = DataProducer.start_link([])
        {:ok, _cons} = TestConsumer.start_link(stage)
        assert_receive {:received, events}
        GenStage.stop(stage)
      end

    end

    defmodule TestConsumer do
      def start_link(producer) do
        GenStage.start_link(__MODULE__, {producer, self()})
      end
      def init({producer, owner}) do
        {:consumer, owner, subscribe_to: [producer]}
      end
      def handle_events(events, _from, owner) do
        send(owner, {:received, events})
        {:noreply, [], owner}
      end
    end

消费者:

defmodule DataConsumer do
  use GenStage
  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end
  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
  def handle_events(events, _from, state) do
    for event <- events do
      # :timer.sleep(250)
      Logger.info inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

提前谢谢您。

最佳答案

没有理由在这里使用ex_mock。如果您让生产者成为您的消费者订阅这样的参数,那就容易多了:

defmodule DataConsumer do
  use GenStage

  def start_link(producer) do
    GenStage.start_link(__MODULE__, producer)
  end

  def init(producer) do
    {:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
end

然后你就可以有一个TestProducer:

defmodule TestProducer
  use GenStage

  def notify(pid, event) do
    GenServer.cast(pid, {:notify, event})
  end

  def start_link do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end

  def handle_cast({:notify, event}, state) do
    {:noreply, [event], state}
  end
end

并在您的测试中订阅它并断言预期结果:

defmodule DataConsumerTest do
  use ExUnit.Case

  test "consumes events" do
    {:ok, pid} = TestProducer.start_link()
    DataConsumer.start_link(pid)
    TestProducer.notify(%{data: :event_data})

    # assert thing you expected to happen happens
  end
end

TLDR;如果您在代码库中与许多不同的使用者一起工作,则必须使用手动/测试事件生成器。消费者并不真正关心生产者如何产生事件,只关心它可以订阅和消费它们。因此,您的测试只需要确保消费者能够从任何生产者接收事件,并且您可以向他们发送其在测试中寻找的正确事件。

关于elixir - 如何测试 Elixir GenStage Consumer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50617217/

相关文章:

postgresql - 在带有 Postgres 的 Elixir 中,我怎样才能让数据库返回未使用的枚举值?

elixir - Ecto.Query.from/2 的第二个参数

c# - 为一个消费者使用 EasyNetQ 多个处理程序不起作用

elixir - 如何使用 AWS SNS 使用ibng ExAws 发送短信

elixir - Ecto 变更集中的空原子

java - 缓冲后台 InputStream 实现

带定时器的 Android(Java) 生产者/消费者

java - Kafka 在第一秒内生成消息的速度很慢

apache-kafka - Kafka Consumer 收到的旧消息很少(不是全部)(之前已经处理过)

java - 可选地获取字段