apache-kafka - 滑动窗口中Kafka KStream相关消息事件

标签 apache-kafka apache-kafka-streams rocksdb

我们有一种情况,我认为 Kafka Streams 可以提供帮助,但我找不到任何说明如何操作的文档或示例。

我发现了一个类似的问题,但它没有任何实现建议:Kafka Streams wait function with depending objects

我想做什么:

我想将来自 Kafka 主题的相关记录关联到单个对象中,并将该新对象发布到单独的输出主题。例如,可能有五个通过唯一键相互关联的消息记录 - 我想从这些相关对象构建一个新对象,并将其生成到一个新主题。

我希望聚合一小时滑动窗口内的所有相关事件。换句话说,一旦 ID 为“123”的消息 A 到达消费者,应用程序必须至少等待一个小时,以便 ID 为“123”的剩余记录到达。在所有记录到达或一小时过去后,这些记录将过期。

最后,一小时内收集到的所有相关消息都用于创建一个新对象,然后将其发送到另一个 Kafka 主题。

我遇到的问题。

Kafka 中的滑动窗口似乎仅在将两个流连接在一起时才起作用。我们将只有一个流连接到该主题 - 我不知道为什么需要两个流或我们将如何实现这一点。我在网上找不到任何例子。
我在 Kafka 中看到的所有流函数在收集相同键的事件时都只是聚合/减少到一个简单的值。例如,某个键出现的次数或将某个值相加

这是一些伪代码来描述我在说什么。如果功能存在,函数名称/语义将有所不同。

    KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )

问题(并感谢您的帮助):
  • 我怎么能在一个流中使用滑动窗口?
  • 如何自定义 KStream/KTable 函数来收集时间窗口内的所有相关事件并将新对象生成到另一个主题?
  • 确认/偏移管理如何与滑动窗口流一起工作?
  • 这能保证 Exactly Once 交货吗?供引用:https://www.confluent.io/blog/enabling-exactly-kafka-streams/
  • 最佳答案

    Apache Kafka 2.7 中添加了对聚合的滑动窗口支持。
    参见 https://issues.apache.org/jira/browse/KAFKA-5636

    关于apache-kafka - 滑动窗口中Kafka KStream相关消息事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49119379/

    相关文章:

    apache-kafka - 为什么 kafka 流状态目录在/tmp/kafka-streams 中?

    java - 使用带有java代码的kafka

    scala - 如何使用 ValueMapper 使用 Scala 更改 Kafka Streams 10.2 中的值类型

    rust - 从 Rocks 数据库中匹配模式的键获取值

    java - Kafka Stream 和 KGlobalTable Join 问题

    java - Kafka Streams 中的消息键为 Long

    java - Rocks DB 的 Java API 是否支持 API GetUpdatesSince?

    amazon-web-services - 适用于Kafka的Amazon Managed Streaming-MSK功能和性能

    java - Kafka 生产者配置 metadata.broker.list 和 url

    apache-kafka - 卡夫卡经纪人花了太长时间才出现