testing - 测试 Kafka Streams 拓扑

标签 testing apache-kafka apache-kafka-streams

我正在寻找一种方法来测试 Kafka Streams 应用程序。这样我就可以定义输入事件,测试套件会向我显示输出。

如果没有真正的 Kafka 设置,这可能吗?

最佳答案

更新 Kafka 1.1.0(2018 年 3 月 23 日发布):

KIP-247添加了官方测试工具。根据 Upgrade Guide :

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

来自documentation :

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>1.1.0</version>
        <scope>test</scope>
    </dependency>

测试驱动程序模拟库运行时不断从输入主题中获取记录并通过遍历拓扑来处理它们。您可以使用测试驱动程序来验证您指定的处理器拓扑是否使用手动输入的数据记录计算出正确的结果。测试驱动程序捕获结果记录并允许查询其嵌入式状态存储:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

参见 the documentation了解详情。


ProcessorTopologyTestDriver 从 0.11.0.0 开始可用。它在 kafka-streams 中可用测试工件(在 Maven 中用 <classifier>test</classifier> 指定):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

您还需要添加 kafka-clients测试神器:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

然后就可以使用测试驱动了。根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver :

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

您可以将输入馈送到拓扑中,就好像您实际上已经写入了输入主题之一:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

并读取输出主题:

    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

然后您可以断言这些结果。

关于testing - 测试 Kafka Streams 拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41825753/

相关文章:

apache-kafka - KafkaStream createTopic 不尊重 Kafka 服务器的 auto.create.topics.enable 设置

testing - "ordinary people"的工具可以运行网络测试吗?

django - model.save() 在加载 Django fixtures 时没有被调用?

mysql - 如何测试 SQL 查询/报告?

css - 我可以在响应式设计中测试视觉回归吗?

amazon-web-services - AWS MSK 用户/密码认证/授权

apache-kafka - Kafka - 流与主题

java - Kafka 消费者抛出 java.lang.OutOfMemoryError : Direct buffer memory

php - PHP 的 Kafka 客户端

apache-kafka - 如何对 Kafka 中的数据进行反规范化?