我们有一种情况,我认为 Kafka Streams 可以提供帮助,但我找不到任何说明如何操作的文档或示例。
我发现了一个类似的问题,但它没有任何实现建议:Kafka Streams wait function with depending objects
我想做什么:
我想将来自 Kafka 主题的相关记录关联到单个对象中,并将该新对象发布到单独的输出主题。例如,可能有五个通过唯一键相互关联的消息记录 - 我想从这些相关对象构建一个新对象,并将其生成到一个新主题。
我希望聚合一小时滑动窗口内的所有相关事件。换句话说,一旦 ID 为“123”的消息 A 到达消费者,应用程序必须至少等待一个小时,以便 ID 为“123”的剩余记录到达。在所有记录到达或一小时过去后,这些记录将过期。
最后,一小时内收集到的所有相关消息都用于创建一个新对象,然后将其发送到另一个 Kafka 主题。
我遇到的问题。
Kafka 中的滑动窗口似乎仅在将两个流连接在一起时才起作用。我们将只有一个流连接到该主题 - 我不知道为什么需要两个流或我们将如何实现这一点。我在网上找不到任何例子。
我在 Kafka 中看到的所有流函数在收集相同键的事件时都只是聚合/减少到一个简单的值。例如,某个键出现的次数或将某个值相加
这是一些伪代码来描述我在说什么。如果功能存在,函数名称/语义将有所不同。
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
问题(并感谢您的帮助):
最佳答案
Apache Kafka 2.7 中添加了对聚合的滑动窗口支持。
参见 https://issues.apache.org/jira/browse/KAFKA-5636
关于apache-kafka - 滑动窗口中Kafka KStream相关消息事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49119379/