在探索如何对 Kafka Stream 进行单元测试时,我遇到了 ProcessorTopologyTestDriver
,不幸的是,这个类似乎在 0.10.1.0
版本中被破坏了 ( KAFKA-4408 )
是否有解决 KTable 问题的方法?
我看到了“Mocked Streams”项目,但首先它使用版本 0.10.2.0
,而我使用的是 0.10.1.1
,其次它是 Scala,而我的测试是 Java/Groovy。
这里有关如何对流进行单元测试而无需引导 Zookeeper/kafka 的任何帮助都会很棒。
注意:我确实有使用嵌入式服务器的集成测试,这是用于单元测试,又名快速、简单的测试。
编辑
谢谢拉蒙·加西亚
For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver
This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils
最佳答案
我找到了解决这个问题的方法,我不确定这是答案,尤其是在https://stackoverflow.com/users/4953079/matthias-j-sax之后评论。无论如何,分享我迄今为止所拥有的......
我完全复制了ProcessorTopologyTestDriver
from the 0.10.1 branch (这是我正在使用的版本)。
发送至地址KAFKA-4408我做了private final MockConsumer<byte[], byte[]> restoreStateConsumer
可访问并移动 block task = new StreamTask(...
到一个单独的方法,例如bootstrap
.
在测试的设置阶段,我执行以下操作
driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()
就是这样......
奖金
我还遇到了KAFKA-4461 ,幸运的是,自从我复制了整个类(class)后,我能够“挑选”accepted fix稍作调整。
一如既往地欢迎反馈。虽然显然不是官方测试类,但该驱动程序被证明非常有用!
关于apache-kafka - 如何对 Kafka Streams 进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43220110/