java - 如何使用 Java lambda 仅窗口化 Kafka Streams 中的输入?

标签 java lambda apache-kafka kafka-consumer-api apache-kafka-streams

我有使用 Kafka Stream 获取的输入数据。我需要实现的只是 5 秒的翻转窗口并将数据输出到 Kafka 主题。但是,我无法使用 lambda 来完成此任务。有人可以帮忙吗?

以下是我写的内容,但出现错误:

    KTable<TimeWindowedKStream<String, String> , String> result = source.
            groupByKey().windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(5000)));

    result.to(Serdes.String(), Serdes.Long(), "outputtopic");

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

但是,对于结果变量 eclipse 给我一个错误:“类型不匹配:无法从 TimeWindowedKStream 转换为 KTable,String>”。

另外,在将结果的值写入另一个主题时,Eclipse 给我错误:“KTable,String> 类型中的 (Serde>, Serde, String) 方法不适用于参数 (Serde, Serde, String) )”。

据我了解,如果没有某种聚合,就无法实现窗口化。但是,我只想将每 5 秒窗口的数据输出到另一个输出主题。

最佳答案

"Type mismatch: cannot convert from TimeWindowedKStream to KTable,String>".

您必须在 TimeWindowedKStream 上调用一些聚合函数才能获取表格,例如。 count()aggregate(...)

"The method to(Serde>, Serde, String) in the type KTable,String> is not applicable for the arguments (Serde, Serde, String)"

您无法使用首先调用 KTable::toStream()KTable 写入主题。返回的KStream具有to(...)函数。

关于java - 如何使用 Java lambda 仅窗口化 Kafka Streams 中的输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55325262/

相关文章:

apache-kafka - Kafka 消费者再平衡需要很长时间

java - 使用模仿;是否可以模拟一个将 lambda 作为参数并断言 lambda 捕获的变量的方法?

java - 有错误的密码凯撒java程序

java - 当指定自动增量时,GreenDao 生成一个带有 id 的构造函数

尝试模拟时出现 java.lang.ExceptionInInitializerError

c++ - 复制分配相同类型的 C++ lambda

java - 一行检查字符串是否包含禁止的子字符串

ssl - SSL 配置后 Apache Kafka 不启动

apache-kafka - 如何将单个 csv 文件缓存到 Kafka 中的 KTable 中?

java - 使用 Java 客户端库从 Google Calendar API v3 查询事件?