java - Kafka-Streams 加入 2 个带有 JSON 值的主题 |背压机制?

标签 java json apache-kafka apache-kafka-streams

我正在学习 Kafka Streams 并尝试实现以下目标:

创建了 2 个 Kafka 主题(比如 topic1、topic2),以 null 作为键,JSONString 作为值。来自主题 1(无重复)的数据在主题 2 中有多个匹配条目。 IE。 topic1 有一些主流数据,当与 topic2 连接时,可以生成新的多数据流。

例子:

topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}

预期输出:{"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
想保留/保留来自 topic1 的数据流以供将来引用,而来自 topic2 的数据流仅用于实现上述用例,不需要任何持久性/保留。

我有几个问题:
1) 应该保留/存储 topic1 数据流几天(可能?),以便可以加入来自 topic2 的传入数据流。是否可以?
2)我应该用什么来实现这一点,KStream 还是 KTable?
3)这叫背压机制吗?

Kafka Stream 是否支持此用例,还是我应该注意其他事项?拜托,建议。

我已经尝试了一段带有 5 分钟窗口的 KStream 代码,但看起来我无法在流中保存 topic1 数据。

请帮助我做出正确的选择并加入。我正在使用 Confluent 中的 Kafka 和 Docker 实例。
public void run() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);

        // Hold data from this topic to 30 days
        KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
        cs.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        // Data is involved in one time process.
        KStream<String, JsonNode> css = builder.stream("topic2", consumed);
        css.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        KStream<String, JsonNode> resultStream = cs.leftJoin(css,
                valueJoiner,
                JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
                Joined.with(
                        Serdes.String(), /* key */
                        jsonSerde,       /* left value */
                        jsonSerde)       /* right value */
        );

        resultStream.foreach((k, v) -> {
            System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
    }

最佳答案

Kafka 中的联接始终基于键。 (*) 因此,要进行任何连接,您需要在进行实际连接之前将要连接的字段提取到键中(唯一的部分异常(exception)是 KStream-GlobalKTable 连接)。在您的代码示例中,您不会得到任何结果,因为所有记录都有 null键,因此无法加入。

对于连接本身,似乎 KStream-KTable 连接将是您的用例的正确选择。要完成这项工作,您需要:

  • topic1 正确设置连接 key 并将数据写入一个附加主题(我们称之为 topic1Keyed)
  • 阅读 topic1Keyed作为表
  • topic2 正确设置连接 key
  • 加入 topic2KTable

  • 有关连接语义的完整详细信息,请查看此博客文章:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

    (*) 更新:

    从 2.4 版本开始,Kafka Streams 也支持外键表-表连接。

    关于java - Kafka-Streams 加入 2 个带有 JSON 值的主题 |背压机制?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50546017/

    相关文章:

    javascript - Google Apps 脚本 Web 应用程序 AJAX、jQuery、JSON?

    c# - 如何在不编写适配器的情况下将 JSON 数据从 MVC 4 传递到 Float

    apache-kafka - Kafka 服务器节点出现 “Too many open files” 错误

    apache-kafka - Kafka Consumer 收到的旧消息很少(不是全部)(之前已经处理过)

    apache-zookeeper - KafkaServerStable 启动期间 kafka 无法连接到 Zookeeper-FATAL fatal error

    java - 如何通过命令行参数覆盖属性文件值?

    java - 为什么这条线没有适当的抗锯齿渲染?

    java - 将 ArrayList 刷新为 ClickListener

    java - 如何从 Hibernate Session 获取 java.sql.Connection 对象

    json - 使用 JSON API 表示无资源的聚合数据