java - Kafka Stream 真的是实时的吗?

标签 java apache-kafka apache-kafka-streams

我正在使用 Kafka Stream API 来测试一些功能。 我有一个像这样的流:

KStream<String, UnifiedData> stream = builder.stream("topic", Consumed.with(Serdes.String(), new JsonSerde<>(Data.class)));

stream.groupBy((key, value) -> value.getMetadata().getId())
                .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(1000)))
                .count()
                .toStream()
                .map((key, value) -> {
                    System.out.println(value);
                    return KeyValue.pair(key.toString(), value);
                });

我发现两个奇怪的行为会在我的主题中产生一些数据:

  • 首先:我没有得到每个生成数据的输出。例如,如果我没有延迟地生成 20 条消息,我只会得到 20 作为输出,而不是类似 1 2 3....
  • 第二:从我生成消息到 System.out.println(value) 在控制台中打印结果之间大约有 20 秒的延迟

那么,您认为这种行为完全正常吗?或者我的 kafka 有配置问题吗?

我正在使用 Kafka 1.0.1、Kafka Stream 1.0.1、Java 8 和 Spring-Boot

最佳答案

默认情况下,Kafka Streams 使用缓存对聚合中的连续输出进行“重复数据删除”,以减少下游负载。

您可以通过在 KafkaStreams 配置中设置 cache.max.bytes.buffering=0 来禁用全局缓存。作为替代方案,还可以通过将 Materialized 参数传递到聚合运算符中来单独禁用每个存储的缓存。

此外,所有缓存都会在提交时刷新,默认提交间隔为 30 秒。因此,您在 30 秒后看到输出是有意义的。如果禁用缓存,提交间隔将不再对行为产生任何影响。

更多详情请参阅:https://kafka.apache.org/documentation/streams/developer-guide/memory-mgmt.html

关于java - Kafka Stream 真的是实时的吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50644837/

相关文章:

java - 尝试使用资源引入无法访问的字节码

java - 无法从 Scanner 类中的 nextLine() 分配字符串值

java - Spring Kafka - 动态创建流

apache-kafka - Kafka Streams "map-side"像字典查找一样加入

java - 将一个数组分配给另一个数组时抛出异常

java - 为什么 1dp 在 Android 上同时表示 3px 和 2px?

java - KafkaAvroDeserializer-NoClassDefFoundError : io/confluent/common/config/ConfigException

java - Flink 没有向 kafka 提交偏移量

java - 将 kafka-streams 与自定义分区器一起使用

java - 卡夫卡流 : Proper way to exit on error