我找到了一些关于如何测试生产者的资源,但是我找不到任何可以说明如何测试消费者的资源。
在生产者中,我创建了一个虚拟消费者,一切正常,但是在消费者中,我在测试方面遇到了困难。
defmodule DataProducer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
# {:queue.new, demand, size}
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand + 1)
# Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
{:noreply, events, (state + demand)}
end
end
生产者测试:
defmodule DataProducerTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = DataProducer.start_link([])
{:ok, _cons} = TestConsumer.start_link(stage)
assert_receive {:received, events}
GenStage.stop(stage)
end
end
defmodule TestConsumer do
def start_link(producer) do
GenStage.start_link(__MODULE__, {producer, self()})
end
def init({producer, owner}) do
{:consumer, owner, subscribe_to: [producer]}
end
def handle_events(events, _from, owner) do
send(owner, {:received, events})
{:noreply, [], owner}
end
end
消费者:
defmodule DataConsumer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
def handle_events(events, _from, state) do
for event <- events do
# :timer.sleep(250)
Logger.info inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
提前谢谢您。
最佳答案
没有理由在这里使用ex_mock
。如果您让生产者成为您的消费者订阅这样的参数,那就容易多了:
defmodule DataConsumer do
use GenStage
def start_link(producer) do
GenStage.start_link(__MODULE__, producer)
end
def init(producer) do
{:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
end
然后你就可以有一个TestProducer
:
defmodule TestProducer
use GenStage
def notify(pid, event) do
GenServer.cast(pid, {:notify, event})
end
def start_link do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
def handle_cast({:notify, event}, state) do
{:noreply, [event], state}
end
end
并在您的测试中订阅它并断言预期结果:
defmodule DataConsumerTest do
use ExUnit.Case
test "consumes events" do
{:ok, pid} = TestProducer.start_link()
DataConsumer.start_link(pid)
TestProducer.notify(%{data: :event_data})
# assert thing you expected to happen happens
end
end
TLDR;如果您在代码库中与许多不同的使用者一起工作,则必须使用手动/测试事件生成器。消费者并不真正关心生产者如何产生事件,只关心它可以订阅和消费它们。因此,您的测试只需要确保消费者能够从任何生产者接收事件,并且您可以向他们发送其在测试中寻找的正确事件。
关于elixir - 如何测试 Elixir GenStage Consumer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50617217/