java - 带窗口的 KTable 会产生错误的类型

标签 java apache-kafka apache-kafka-streams

我在 Kafka 中创建带有时间窗口的 KTable 时遇到一些问题。

我想创建一个表来计算流中 ID 的数量,如下所示。

ID (String) |  Count (Long)
    X       |       5
    Y       |       6
    Z       |       7

等等。我希望能够使用 Kafka REST-API 获取表,最好是 .json。

这是我现在的代码:

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> streams = builder.stream(srcTopic);

    KTable<Windowed<String>, Long> numCount = streams
            .flatMapValues(value -> getID(value))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo"));

我现在面临的问题是该表没有创建为 <String, Long>但如<String, String>反而。这意味着我无法获得正确的计数,但我收到了正确的 key ,但计数已损坏。我试图将其强制为 Long使用Long.valueOf(value)没有成功。我不知道如何从这里继续。我需要将KTable写入新主题吗?因为我希望可以使用 kafka REST-API 查询该表,所以我认为不需要它,对吗? Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo")应该使其可查询为“foo”,对吗?

KTable 创建一个 changelog -topic,这足以使其可查询吗?或者我是否必须创建一个新主题才能写入?

我现在正在使用另一个 KStream 来验证输出。

KStream<String, String> streamOut = builder.stream(srcTopic);

streamOut.foreach((key, value) -> System.out.println(key + " => " + value));

它输出:

 ID    COUNT
2855 => ~
2857 => �
2859 => �
2861 => V(
2863 => �
2874 => �
2877 => J
2880 => �2
2891 => �=

无论哪种方式,我真的不想使用 KStream 来收集输出,我想查询 KTable。但正如前面提到的,我不太明白查询是如何工作的..

更新

设法让它工作

    ReadOnlyWindowStore<String, Long> windowStore =
            kafkaStreams.store("tst", QueryableStoreTypes.windowStore());
        long timeFrom = 0;
        long timeTo = System.currentTimeMillis(); // now (in processing-time)
        WindowStoreIterator<Long> iterator = windowStore.fetch("x", timeFrom, timeTo);
        while (iterator.hasNext()) {
          KeyValue<Long, Long> next = iterator.next();
          long windowTimestamp = next.key;
          System.out.println(windowTimestamp + ":" + next.value);
        }

提前非常感谢,

最佳答案

KTable的输出类型是 <Windowed<String>,String>因为在 Kafka Streams 中并行维护多个窗口以允许处理无序数据。因此,情况并非只有一个窗口实例,而是有许多并行的窗口实例。 (参见https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows)

保留“旧”窗口可以在数据延迟到达时更新它们。请注意,Kafka Streams 语义基于事件时间。

您仍然可以查询KTable -- 你只需要知道你要查询哪个窗口即可。

更新

JavaDoc描述了如何查询表:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java#L94-L101

KafkaStreams streams = ... // counting words
Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());

String key = "some-word";
long fromTime = ...;
long toTime = ...;
WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)

关于java - 带窗口的 KTable 会产生错误的类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50232732/

相关文章:

apache-kafka-streams - 如何在 Kafka 流中使用 HashMap 作为值创建状态存储?

java - 使用 KStream 过滤掉阈值以外的值

apache-kafka - 共享 Kafka StateStore 最佳实践

java - 求解有限域上的线性方程组 python 或 java

java - 如何在我自己的 JAVA GUI 中查看 jar 文件的源代码?

kubernetes - 在Kafka中,随着分区之间的整体滞后时间减少,您是否应该减少一组消费者的数量?

amazon-web-services - 诊断 Kafka 连接问题

java - 用于故障转移和恢复的最佳 Kafka 生产者选项

java - 包名称为 'com.google.android.gms' 的多个库

Java无法确定应用程序类