java - 如何在聚合和预处理器中重用状态存储?

标签 java apache-kafka-streams

我有消息想聚合成一个状态,但我也 喜欢在可能更新其他的预处理步骤中使用该状态 状态也是如此。一个示例可能是查看此消息是否更改 状态的某个部分,如果是这样,更新一个跟踪那个的主题 状态的特定部分,以及状态本身。

我能想到的最好的方法是使用一个 Transformer 可以访问状态存储,但在聚合状态之前这样做, 这样我就可以在它被删除之前看到状态的值是什么 更新。 (有问题的 Transformer 只会返回原来的 可能更新额外状态后的消息。)

但是,我遇到了一个先有鸡还是先有蛋的问题:

  1. 如果我添加 Materializedaggregate() 如下,我得到一个 拓扑错误表明我正在尝试访问状态存储 尚未添加到拓扑中。

    KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde()));
          streamBuilder
              .transformValues(
                  new MessagePreprocessorSupplier(
                      "state_store_topic_name"
                  ),
                  "state_store_topic_name"
              )
              .groupByKey()
              .aggregate(
                  () -> null,
                  new MyAggregator(),
                  Materialized.as("state_store_topic_name")
              );
    

    这引发了:

    org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is not added yet.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:546)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:538)
    at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
    
  2. 如果我先尝试创建状态存储,然后我可以将它添加到 变压器,但是当我调用 .aggregate() 时出现错误, 表示此时无法添加状态存储,因为 我们之前已经添加了它。

    Materialized<String, MyState, KeyValueStore<Bytes, byte[]>> myStateStoreProvider =
        Materialized.<String, Thermostat, KeyValueStore<Bytes, byte[]>>as("state_store_topic_name")
            .withKeySerde(Serdes.String())
            .withValueSerde(myStateSerde);
    /* really don't think we should need this, but if I don't, the .transformValues
       says it wasn't added to the topology... */
    streamsBuilder.table("state_store_topic_name", myStateStoreProvider);
    
    KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde()));
          streamBuilder
              .transformValues(
                  new MessagePreprocessorSupplier(
                      "state_store_topic_name"
                  ),
                  "state_store_topic_name"
              )
              .groupByKey()
              .aggregate(
                  () -> null,
                  new MyAggregator(),
                  myStateStoreProvider
              );
    

    这引发了:

    org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is already added.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:523)
    at org.apache.kafka.streams.kstream.internals.GroupedStreamAggregateBuilder.build(GroupedStreamAggregateBuilder.java:71)
    at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.doAggregate(KGroupedStreamImpl.java:488)
    at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregateMaterialized(KGroupedStreamImpl.java:175)
    at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregate(KGroupedStreamImpl.java:167)
    at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
    

最佳答案

如果您回退到 Processor API 以将商店连接到转换器,它可能会起作用:

  • 添加转换器而不添加状态(因为它尚不存在)
  • 定期添加聚合以创建状态
  • 调用 builder.build() 后,调用 Topology#connectProcessorAndStateStore()
    • 需要传入变压器名称和店铺名称
    • 您可以通过 Topology#describe()
    • 检索转换器的名称

虽然我还没有尝试过...

关于java - 如何在聚合和预处理器中重用状态存储?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50518198/

相关文章:

python - 如何在kafka消费者中读取和处理高优先级消息?

java - org.apache.commons.net.ftp.FTPClient 出现问题,获取 FTP 响应 421 收到错误

java - 如何将自定义转换器与@DataMongoTest 一起使用?

java - 如何从 OrientDB 检索轻量级边缘

apache-kafka - Kafka Streams - 缺少源主题

java - 处理从一个主题到同一主题的流(循环处理)

java - 无法解析方法 'getLayoutInflater()' - Android Java

java - 错误 org.springframework.web.servlet.DispatcherServlet - 上下文初始化失败 org.springframework.beans.factory.BeanCreationException : Error

java - 处理 json 解析错误而不导致 Kafka 流处理器应用程序崩溃

apache-kafka - Kafka Streams如何与包含不完整数据的分区一起使用?