apache-kafka-streams - Kafka 流 DSL : aggregate, 丰富并发送

标签 apache-kafka-streams

我们有以下问题需要用 Kafka Streams 解决:

1-收到一条消息。每条消息都标有 eventId(消息更新事件)和correlationId(每条消息唯一)。

2- 从该消息中聚合一些状态(基于 eventId)并将其附加到本地存储中的现有状态

3- 为该事件的完整聚合状态丰富该消息并将其发送到输出主题

重点是我们不能真正丢失单个消息,并且它必须始终使用最新的聚合状态(我们在消息处理期间实际评估)来丰富传入的消息。

从我目前看到的情况来看,我们不能只使用简单的聚合(类似这样:)

stateMessageStream
  .map((k, v) => new KeyValue[String, StateMessage](k, v))
  .mapValues[StateMessageWithMarkets](sm => {StateMessageWithMarkets(Some(sm), extract(sm))})
  .groupBy((k, _) => k, stringSerde, marketAggregatorSerde)
  .aggregate[StateMessageWithMarkets](() => StateMessageWithMarkets(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
  .to(stringSerde, marketAggregatorSerde, kafkaOutTopic)

因为聚合仅在间隔中产生新记录,这意味着对于两条传入消息,我们可能只生成单个聚合输出消息(因此我们丢失了一条消息)

我第二次尝试如何实现这一点基本上是两个流,一个用于聚合,第二个用于普通消息。最后,我们可以使用 join 操作将两个流重新连接在一起,基于correlationId 作为键 - 我们可以将正确的状态与正确的消息匹配:

val aggregatedStream : KStream[String, MarketAggregator] = stateMessageStream
  .map((k, v) => new KeyValue[String, StateMessage](k, v))
  .mapValues[StateMessage](v => {
    log.debug("Received State Message, gameId: " + v.metadata().gtpId() + ", correlationId: " + v.correlationId)
    v})
  .mapValues[MarketAggregator](sm => {MarketAggregator(sm.correlationId, extract(sm))})
  .groupBy((k, v) => k, stringSerde, marketAggregatorSerde)
  .aggregate[MarketAggregator](() => MarketAggregator(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
  .toStream((k, v) => v.correlationId)

stateMessageStream
  .selectKey[String]((k, v) => v.correlationId)
  .leftJoin[MarketAggregator, StateMessageWithMarkets](aggregatedStream, (stateMessage : StateMessage, aggregatedState : MarketAggregator) => StateMessageWithMarkets(Some(stateMessage), aggregatedState.modelMarkets, stateMessage.correlationId),
      JoinWindows.of(10000),
      stringSerde, stateMessageSerde, marketAggregatorSerde)
  .mapValues[StateMessageWithMarkets](v => {
        log.debug("Producing aggregated State Message, gameId: " + v.stateMessage.map(_.metadata().gtpId()).getOrElse("unknown") +
          ", correlationId: " + v.stateMessage.map(_.correlationId).getOrElse("unknown"))
          v
        })
  .to(stringSerde, stateMessageWithMarketsSerde, kafkaOutTopic)

但是,这似乎也不起作用 - 对于两条传入消息,我仍然只收到具有输出主题最新聚合状态的单个消息。

有人可以解释为什么以及正确的解决方案是什么吗?

最佳答案

您可以使用方法一并通过禁用缓存来获取每个输入消息的输出消息。在您的 StreamsConfig您只需设置 StreamConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG 的值到零。

更多详情请见http://docs.confluent.io/current/streams/developer-guide.html#memory-management

关于apache-kafka-streams - Kafka 流 DSL : aggregate, 丰富并发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42166308/

相关文章:

apache-kafka - 如何将 Kafka 消息从一个主题发送到另一个主题?

apache-kafka - kafka-consumer-groups.sh 何时将 CURRENT_OFFSET 显示为 "-"?

java - 为什么 kafka 流线程会在源主题分区更改时死亡?谁能指出这方面的阅读 Material ?

apache-kafka - 哪个 kafka 属性决定了 KafkaConsumer 的轮询频率?

java - Kafka Stream KTable-KTable 键控连接产生重复事件(检测到乱序 KTable)

apache-kafka - 编写 Kafka Streams 来持久保存到数据库中是个好方法吗?

java - 如何使用java修改一个kafka主题的消息并将其发送到另一个kafka主题?

apache-kafka - 无法锁定任务 0_13 的状态目录

java - 如何在内存中的 Kafka Streams 状态存储上启用缓存

java - 如何使用kafka流加入主题