apache-kafka - 如何对 Kafka Streams 进行单元测试

标签 apache-kafka apache-kafka-streams

在探索如何对 Kafka Stream 进行单元测试时,我遇到了 ProcessorTopologyTestDriver,不幸的是,这个类似乎在 0.10.1.0 版本中被破坏了 ( KAFKA-4408 )

是否有解决 KTable 问题的方法?

我看到了“Mocked Streams”项目,但首先它使用版本 0.10.2.0,而我使用的是 0.10.1.1,其次它是 Scala,而我的测试是 Java/Groovy。

这里有关如何对流进行单元测试而无需引导 Zookeeper/kafka 的任何帮助都会很棒。

注意:我确实有使用嵌入式服务器的集成测试,这是用于单元测试,又名快速、简单的测试。

编辑

谢谢拉蒙·加西亚

For people arriving here in Google searches, please note that the test driver class is now org.apache.kafka.streams.TopologyTestDriver

This class is in the maven package groupId org.apache.kafka, artifactId kafka-streams-test-utils

最佳答案

我找到了解决这个问题的方法,我不确定这是答案,尤其是在https://stackoverflow.com/users/4953079/matthias-j-sax之后评论。无论如何,分享我迄今为止所拥有的......

我完全复制了ProcessorTopologyTestDriver from the 0.10.1 branch (这是我正在使用的版本)。

发送至地址KAFKA-4408我做了private final MockConsumer<byte[], byte[]> restoreStateConsumer可访问并移动 block task = new StreamTask(...到一个单独的方法,例如bootstrap .

在测试的设置阶段,我执行以下操作

driver = new ProcessorTopologyTestDriver(config, builder)
ArrayList partitionInfos = new ArrayList();
partitionInfos.add(new PartitionInfo('my_ktable', 1, (Node) null, (Node[]) null, (Node[]) null));
driver.restoreStateConsumer.updatePartitions('my_ktable', partitionInfos);
driver.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition('my_ktable', 1), Long.valueOf(0L)));
driver.bootstrap()

就是这样......

奖金

我还遇到了KAFKA-4461 ,幸运的是,自从我复制了整个类(class)后,我能够“挑选”accepted fix稍作调整。

一如既往地欢迎反馈。虽然显然不是官方测试类,但该驱动程序被证明非常有用!

关于apache-kafka - 如何对 Kafka Streams 进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43220110/

相关文章:

apache-kafka - Kafka Streams 中的内存与持久状态存储?

apache-kafka - 使用 Kafka Streams DSL 多次使用相同的主题作为源

java - 如何将外部源的上下文添加到 Kafka Streams 中的记录的正确方法

java - 在一段时间内使用 kafka-streams 处理和检查事件

apache-kafka - Kafka JSON 控制台生产者

docker - 无法连接到 wurstmeister/kafka

apache-kafka - Kafka 服务器配置 - 监听器与广告监听器

apache-kafka - Apache Kafka 的交互式管理 shell

apache-kafka - 使用选择键和转换在 DSL 拓扑上进行流重新分区

java - KStream-KStream inner join 抛出java.lang.ClassCastException