我正在寻找一种方法来测试 Kafka Streams 应用程序。这样我就可以定义输入事件,测试套件会向我显示输出。
如果没有真正的 Kafka 设置,这可能吗?
最佳答案
更新 Kafka 1.1.0(2018 年 3 月 23 日发布):
KIP-247添加了官方测试工具。根据 Upgrade Guide :
There is a new artifact
kafka-streams-test-utils
providing aTopologyTestDriver
,ConsumerRecordFactory
, andOutputVerifier
class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.
来自documentation :
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
测试驱动程序模拟库运行时不断从输入主题中获取记录并通过遍历拓扑来处理它们。您可以使用测试驱动程序来验证您指定的处理器拓扑是否使用手动输入的数据记录计算出正确的结果。测试驱动程序捕获结果记录并允许查询其嵌入式状态存储:
// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
参见 the documentation了解详情。
ProcessorTopologyTestDriver
从 0.11.0.0 开始可用。它在 kafka-streams
中可用测试工件(在 Maven 中用 <classifier>test</classifier>
指定):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
您还需要添加 kafka-clients
测试神器:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
然后就可以使用测试驱动了。根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver
:
StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
您可以将输入馈送到拓扑中,就好像您实际上已经写入了输入主题之一:
driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
并读取输出主题:
ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
然后您可以断言这些结果。
关于testing - 测试 Kafka Streams 拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41825753/